Rick

Rick
Rick

Monday, April 4, 2016

Introducing Reakt : Reactive interfaces for Java Promises, Streams, Callbacks, and Async results

Introducing Reakt Reactive interfaces for Java Promises, Streams, Callbacks, and Async results. To be used by QBit 2 and conekt.


Reakt is reactive interfaces for Java:
  • Promises,
  • Streams,
  • Callbacks,
  • Async results
The emphasis is on defining interfaces that enable lambda expressions, and fluent APIs for asynchronous programming for Java.

Note: This mostly just provides the interfaces not the implementations. There are some starter implementations but the idea is that anyone can implement this. It is all about interfaces. There will be adapters for Vertx, RxJava, Reactive Streams, Guava Async Futures, etc. 

Fluent Promise API

  Promise<Employee> promise = promise()
                .then(e -> saveEmployee(e))
                .catchError(error -> 
                     logger.error("Unable to lookup employee", error));

  employeeService.lookupEmployee(33, promise);
Or you can handle it in one line.

Fluent Promise API example 2

  employeeService.lookupEmployee(33, 
        promise().then(e -> saveEmployee(e))
                 .catchError(error -> logger.error(
                                           "Unable to lookup ", error))
        );
Promises are both a callback and a Result; however, you can work with Callbacks directly.

Using Result and callback directly

        employeeService.lookupEmployee(33, result -> {
            result.then(e -> saveEmployee(e))
                  .catchError(error -> {
                    logger.error("Unable to lookup", error);
            });
        });
In both of these examples, lookupService would look like:

Using Result and callback directly

   public void lookup(long employeeId, Callback<Employee> callback){...}
QBit version 2 is going to use ReaktCommunikate, a slimmed down fork of Vert.x, will also use Reakt.
(See QBit micorservies lib for more details. See our wiki for more details on Reakt.)

Ref is a similar concept to Optional in Java JDK and Option in Scala.
We added a new concept because this one is expected to come through callbacks and is used in places where Optional does not make sense. Also Reakt wants to use interfaces for all core concepts so others can provide their own implementations. In addition we wanted consumers forifPresent and ifEmpty.
Ref contains an value object which may or may not be set. This is like {@code Optional} but could be the value from an async operation. If a value is present, isPresent() will return trueand get() will return the value. Ref is heavily modeled after optional (java.util.Optional).

Sample Usage

final Ref<Employee> empty = Ref.empty();
... //OR

final Ref<Employee> empty = Ref.ofNullable(null);
...


//isEmpty isPresent
assertTrue(   empty.isEmpty()    );
assertFalse(  empty.isPresent()  );

//ifEmpty ifPresent
empty.ifEmpty(() -> logger.info("Empty"));
empty.ifPresent(employee -> logger.info("Employee", employee));

//empty get throws a NoSuchElementException for empty ref
try {
            empty.get();
            fail();
} catch (NoSuchElementException nsee) {
}

Sample Usage non-empty

        final Ref<Employee> rick = Ref.ofNullable(new Employee("Rick"));
        ... //OR
        final Ref<Employee> rick = Ref.of(new Employee("Rick"));

        ...


       //ifEmpty ifPresent
       rick.ifEmpty(() -> logger.info("Empty"));
       rick.ifPresent(employee -> logger.info("Employee", employee));


       //Convert employee into a sheep using map
       final Sheep sheep = rick.map(employee -> new Sheep(employee.id)).get();
       assertEquals("Rick", sheep.id);


       //Use Filter
       assertTrue(
          rick.filter(employee -> employee.id.equals("Rick")
       ).isPresent()); //Rick is Rick
       assertFalse(
          rick.filter(employee -> employee.id.equals("Bob")
       ).isPresent()); //Rick is not Bob

Result is the result of an async operation. This was modeled after Vert.x AsyncResult and after the types of results one would deal with in JavaScript.

Example usage using then and catchError

        employeeService.lookupEmployee(33, result -> {
            result.then(e -> saveEmployee(e))
                  .catchError(error -> {
                    logger.error("Unable to lookup", error);
            });
        });

Example usage using Ref from thenRef and catchError

        employeeService.lookupEmployee(33, result -> {
            result.thenRef(ref -> ref.ifPresent(e -> saveEmployee(e)))
                  .catchError(error -> {
                    logger.error("Unable to lookup", error);
            });
        });

Callback is a generic event handler which can be thought of as a callback handler. This is like an async future or callback. This was modeled after QBit's callback and Guava's Callback, and JavaScripts callbacks. A Result represents the result or error from an async operation and is passed to onResult.

Promise is like a non-blocking Future (java.util.concurrent.Future). With a promise you can get notified of changes instead of having to call get.
A promise is both a Callback (io.advantageous.reakt.Callback), and a Result(io.advantageous.reakt.Result). A promise is a sort of deferred value.
There are three types of promises, a blocking promise, a callback promise and a replay promise.
blocking promise is for legacy integration and for testing. A callback promise will get called back (its thenthenRef and catchError handlers), but usually on a foreign thread. A replay promise gets called back on the the caller's thread not the callee. The replay promise usually works with aReactor (a concept from QBit).

Stream is a generic event handler for N results, i.e., a stream of results. This is a like a type of Callback for streaming results. While Callback can be considered for scalar results, a Stream is more appropriate for non-scalar results, i.e., Stream.onNext will get called many times.
This is a very small API and you can find examples via unit tests at Reakt. Please check it out and give feedback.




No comments:

Post a Comment

Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training