Rick

Rick
Rick

Saturday, March 21, 2015

UPDATE 4: Preview Reactive programming for QBit Java microservices library (QBit and RxJava)


Summary of RxJava and QBit:
The best description RxJava was what you use it in this scenario: call ServiceA, call ServiceB, take results of B, call ServiceC, return results of A and C. There was this blog that explained it well.

On hearing this explanation, RxJava's value made sense.  We are in the process of adding some reactive programming features to QBit so you can easily do things like call A, call B, take the results of B, call C with results of B, and then return the results. Check out this preview:


Details RxJava and reactive programming in QBit:
I get a lot of questions about RxJava and QBit. The first thing that I always say is that you can use RxJava and QBit.
I feel that RxJava is good at handling many calls to services, and coordinating the results.
The best description I heard about RxJava was what if you wanted to call ServiceA, ServiceB and you wanted to take what ServiceB returns and call ServiceC, but you need to return the results from ServiceA and ServiceC to a client. This is a great example of the power of RxJava.
QBit is async from the ground up. This is there from the get go. QBit does not have a lot of utilities to coordinate complex calls to/from local and out of process services. At the moment, QBit has a callback. The rest is up to you, rolling your own stuff or using a lib like RxJava. Until now....
Before we begin: QBit is a microservices Java lib. It has a fast queue (100M to 200M messages a second), and it has a fast event bus that can be replicated to other nodes and can integrate with Consul to be clustered. It also has a fast JSON parser and support for HTTP calls over JSON (REST) using Spring MVC style annotations and WebSockets.
I have been on projects where I had to coordinate calls to more than one service. I wanted a good way to do this in QBit. I have always rolled one-off solutions. RxJava is a great library, but I also wanted a QBit way to do things.
Since QBit relies on Java 8, I can use what comes with Java 8 and backwards compatibility to Java 6 and Java 7 is not an issue.
Let's show an example, please note that this is from working code, but it is still in early phases.
It will become part of QBit in short order. You can roll your own like I have been doing with Runnables and such and you can see examples of this in the wiki. Or you can use RxJava. Or you can use this new feature (once I finish adding it to QBit, this is a preview).

My pretend service

    public static class PretendService {


        private final String name;

        PretendService(final String name) {

            this.name = name;
        }

        public void serviceCall(final Callback<String> callback, final int seconds, String message) {

            Thread thread = new Thread(() -> {
                Sys.sleep(seconds * 1000);
                callback.accept(name + "::" + message);
            });
            thread.start();
        }
    }

The pretend service is nice in that it just waits as long as I tell it, and then returns its name as a return value. It is a great service for demonstrating the concept of Callback coordination.

Callback coordinator for a Java 8 Lambda reactor system

        final PretendService serviceA = new PretendService("SERVICE A");
        final PretendService serviceB = new PretendService("SERVICE B");
        final PretendService serviceC = new PretendService("SERVICE C");


        //Inside of Z Service
        ....
        final AtomicReference<String> serviceAReturn = new AtomicReference<>();
        final AtomicReference<String> serviceCReturn = new AtomicReference<>();

        final long startTime = Timer.timer().now();

        /* Create a callback for service A to demonstrate
            a callback to show that it can be cancelled. */
        final AsyncFutureCallback<String> serviceACallback =
                reactor.callback(String.class, serviceAReturn::set);

        /* Call service A using the callback. */
        serviceA.serviceCall(serviceACallback, 1, " from main");

        /* Call Service B, register a callback
             which call service C on service b completion. */
        serviceB.serviceCall(
                reactor.callback(returnValue ->
                        serviceC.serviceCall(reactor.callback(serviceCReturn::set), 1, " from " + returnValue)
                ), 1, " from main");


        /* Register a coordinator that checks for return values form service A and C */
        reactor.coordinate(() -> {

            /* If service A and service C are done, then we are done. 
            * Let the client know.
            */
            if (serviceAReturn.get()!=null && serviceCReturn.get()!=null) {
                sendResponseBackToClient(serviceAReturn.get(), serviceCReturn.get());
                return true;  //true means we are done
            }

            /* We are not done, so check to see if 4s elapsed, then signal we are done.*/
            if ( Timer.timer().now() - startTime > 4000) {
                sendTimeoutBackToClient();
                serviceACallback.cancel(true);
                return true; //true means we are done
            }
            return false; //keep going
        });
By letting the coordinator decide how to handle things and making the state of the coordination be local variables that are captured by the lambda, we can handle the coordination in the context of one method call. This simplifies the code as we use regular imperative Java to do the coordination instead of tons of small callbacks. We are still using functional programming via lambda expressions and capturing local variables as part of the state of the callback. It is just much easier to follow.

Coordinators will have a timeout as this seems to be a common case, and there is no use having this cross cutting concern in every coordinator. Coordinators and AysncFutureCallback will have builders so you can specify error handlers, timeout handlers, timeout values, etc. The ServiceQueue will implement the reactor so it can manage  Coordinators and AysncFutureCallback. Right now the reactor is experimental, and is not part of the ServiceQueue as of yet. ServiceQueue will have default timeouts for Coordinators so you don't have to specify it each time. There will be a reactor interface.

The aspect I like about the above approach is that the logic for determining when the request can be very complex. Perhaps if there is a timeout, you send partial results. Or you only send results if important calls are made, but less important calls only get mixed into the final results if they return on time (a deprecated mode). I can imagine complex rules and fallbacks that are easy to express in Java and hard to configure in a programmatic handler. Java lambdas are the key here.

Also I want the ability for the C callback to say hey.. tell the coordinator that I am done so he does not have to get polled. This would improve the latency. coordinator.finished(). Perhaps the coordinator has a latch like mechanism where finished just increments a counter and when the last callback calls finished then, the magic happens. Expect more prototypes before I roll this into QBit proper. (Coordinator becomes like an async countdown latch.)

This is the start of the journey for QBit and reactive programming. In the meantime and after, please use QBit with RxJava.

I took another swing at it...

        final PretendService serviceA = new PretendService("SERVICE A");
        final PretendService serviceB = new PretendService("SERVICE B");
        final PretendService serviceC = new PretendService("SERVICE C");


        //Inside Z Service method
        final AtomicReference<String> serviceAReturn = new AtomicReference<>();
        final AtomicReference<String> serviceCReturn = new AtomicReference<>();


        /* Create a callbackWithTimeout for service A to demonstrate
            a callbackWithTimeout to show that it can be cancelled. */
        final AsyncFutureCallback<String> serviceACallback =
                reactor.callback(String.class, serviceAReturn::set);




        /* Register a coordinator that checks for return values from service A and C */
        final CallbackCoordinator coordinator = reactor.coordinateWithTimeout(() -> {

            /* If service A and service C are done, then we are done.
            * Let the client know.
            */
            if (serviceACallback.isDone() && serviceCReturn.get() != null) {
                sendResponseBackToClient(serviceAReturn.get(), serviceCReturn.get());
                return true;  //true means we are done
            }

            return false;

        }, Timer.timer().now(), 5, TimeUnit.SECONDS, RunnableCallbackTest::sendTimeoutBackToClient);




        /* Call service A using the A callbackWithTimeout. */
        serviceA.serviceCall(serviceACallback, 1, " from main");

        /* Create service C callbackWithTimeout. */
        final AsyncFutureCallback<String> serviceCCallback = reactor.callback(String.class,
                returnValueFromC -> {
                    serviceCReturn.set(returnValueFromC);
                    handleReturnFromC(serviceAReturn, coordinator);
                }
        );


        /* Call Service B, register a callback
             which call service C on service b completion. */
        serviceB.serviceCall(
                reactor.callback(returnValue ->
                        serviceC.serviceCall(serviceCCallback, 1, " from " + returnValue)
                ), 1, " from main");



...
...


    private void sendTimeoutBackToClient() {

        System.out.println("You have timed out");
    }

    private void sendResponseBackToClient(String a, String ab) {

        System.out.println(a + "::" + ab);

    }


    public void handleReturnFromC(AtomicReference<String> serviceAReturn,
                                         CallbackCoordinator coordinator) {

        if (serviceAReturn.get()!=null) {
            coordinator.finished();
        }

    }

The coordinator can now be finished and the coordinators and the callbacks can be given timeouts. In the last example, we now added the handler for C to mark the coordinator as finished.


One more version.... This one has an async latch. This seems to be the cleanest yet. That last countdown triggers the runnable, which completes the job.
        final AtomicReference<String> serviceAReturn = new AtomicReference<>();
        final AtomicReference<String> serviceCReturn = new AtomicReference<>();





        /* Register a coordinator that checks for return values from service A and C */
        final CallbackCoordinator coordinator = reactor.coordinateWithTimeout(() -> {

            /* If service A and service C are done, then we are done.
            * Let the client know.
            */
            if (serviceAReturn.get()!=null && serviceCReturn.get() != null) {
                sendResponseBackToClient(serviceAReturn.get(), serviceCReturn.get());
                return true;  //true means we are done
            }

            return false;

        }, Timer.timer().now(), 5, TimeUnit.SECONDS, RunnableCallbackTest::sendTimeoutBackToClient);




        final CountDownAsyncLatch latch = countDownLatch(2,
                () -> {

                    System.out.println("From Latch");
                    coordinator.finished();

                }

        );


          /* Create a callbackWithTimeout for service A to demonstrate
            a callbackWithTimeout to show that it can be cancelled. */
        final AsyncFutureCallback<String> serviceACallback =
                reactor.callback(String.class, (t) -> {
                    serviceAReturn.set(t);
                    latch.countDown();
                });


        /* Call service A using the A callbackWithTimeout. */
        serviceA.serviceCall(serviceACallback, 1, " from main");

        /* Create service C callbackWithTimeout. */
        final AsyncFutureCallback<String> serviceCCallback = reactor.callback(String.class,
                returnValueFromC -> {
                    serviceCReturn.set(returnValueFromC);
                    latch.countDown();
                }
        );


        /* Call Service B, register a callback
             which call service C on service b completion. */
        serviceB.serviceCall(
                reactor.callback(returnValue ->
                        serviceC.serviceCall(serviceCCallback, 1,
                                " from " + returnValue)
                ),
                1, " from main");


Tried a few more things. Added builders so it was a little more clear what we were working with.
        final AtomicReference<String> serviceAReturn = new AtomicReference<>();
        final AtomicReference<String> serviceCReturn = new AtomicReference<>();


        final CallbackBuilder callbackBuilder = reactor.callbackBuilder();

        /* Register a coordinator that checks for return values from service A and C */
        final CallbackCoordinator coordinator = reactor.coordinatorBuilder()
                .setCoordinator(
                        () -> {

                                /* If service A and service C are done, then we are done.
                                * Let the client know.
                                */
                            if (serviceAReturn.get() != null && serviceCReturn.get() != null) {
                                sendResponseBackToClient(serviceAReturn.get(), serviceCReturn.get());
                                return true;  //true means we are done
                            }

                            return false;
                        })
                .setTimeoutDuration(5)
                .setTimeoutTimeUnit(TimeUnit.SECONDS)
                .setTimeOutHandler(() -> {
                    System.out.println("Coordinator timed out");
                    sendTimeoutBackToClient();
                })
                .build();

        final CountDownAsyncLatch latch = countDownLatch(2,
                () -> {
                    System.out.println("From Latch");
                    coordinator.finished();
                }
        );


          /* Create a callbackWithTimeout for service A to demonstrate
            a callbackWithTimeout to show that it can be cancelled. */
        final Callback<String> serviceACallback =
                callbackBuilder
                        .setCallback(String.class, returnValueFromA -> {

                            serviceAReturn.set(returnValueFromA);
                            latch.countDown();
                        })
                        .setOnTimeout(() -> {
                                System.out.println("Service A timed out");
                                sendTimeoutBackToClient();
                                coordinator.cancel();
                            })
                        .setTimeoutDuration(4).setTimeoutTimeUnit(TimeUnit.SECONDS)
                        .build();

        /* Call service A using the serviceACallback. */
        serviceA.serviceCall(serviceACallback, 1, " from main");

        /* Create service C callback. */
        final Callback<String> serviceCCallback =
                callbackBuilder
                    .setCallback(String.class, returnValueFromC -> {

                        serviceCReturn.set(returnValueFromC);
                        latch.countDown();
                    })
                    .setOnTimeout(() -> {
                        System.out.println("Service C timed out");
                        sendTimeoutBackToClient();
                        coordinator.cancel();
                    })
                     .setTimeoutDuration(4).setTimeoutTimeUnit(TimeUnit.SECONDS)
                     .build();



        /* Call Service B, register a callback
             which call service C on service b completion. */
        serviceB.serviceCall(
                reactor.callback(returnValue ->
                                serviceC.serviceCall(serviceCCallback, 1,
                                        " from " + returnValue)
                ),
                1, " from main");
The builder has reasonable defaults.
        final AtomicReference<String> serviceAReturn = new AtomicReference<>();
        final AtomicReference<String> serviceCReturn = new AtomicReference<>();


        final CallbackBuilder callbackBuilder = reactor.callbackBuilder();

        /* Register a coordinator that checks for return values from service A and C */
        final CallbackCoordinator coordinator = reactor.coordinatorBuilder()
                .setCoordinator(
                        () -> {

                                /* If service A and service C are done, then we are done.
                                * Let the client know.
                                */
                            if (serviceAReturn.get() != null && serviceCReturn.get() != null) {
                                sendResponseBackToClient(serviceAReturn.get(), serviceCReturn.get());
                                return true;  //true means we are done
                            }

                            return false;
                        })
                .setTimeOutHandler(() -> {
                    System.out.println("Coordinator timed out");
                    sendTimeoutBackToClient();
                })
                .build();

        final CountDownAsyncLatch latch = countDownLatch(2,
                () -> {
                    System.out.println("From Latch");
                    coordinator.finished();
                }
        );


          /* Create a callbackWithTimeout for service A to demonstrate
            a callbackWithTimeout to show that it can be cancelled. */
        final Callback<String> serviceACallback =
                callbackBuilder
                        .setCallback(String.class, returnValueFromA -> {

                            serviceAReturn.set(returnValueFromA);
                            latch.countDown();
                        })
                        .setOnTimeout(() -> {
                                System.out.println("Service A timed out");
                                sendTimeoutBackToClient();
                                coordinator.cancel();
                            })
                        .setTimeoutDuration(4)
                        .build();

        /* Call service A using the serviceACallback. */
        serviceA.serviceCall(serviceACallback, 1, " from main");

        /* Create service C callback. */
        final Callback<String> serviceCCallback =
                callbackBuilder
                    .setCallback(String.class, returnValueFromC -> {

                        serviceCReturn.set(returnValueFromC);
                        latch.countDown();
                    })
                    .setOnTimeout(() -> {
                        System.out.println("Service C timed out");
                        sendTimeoutBackToClient();
                        coordinator.cancel();
                    })
                   .setTimeoutDuration(4)
                   .build();



        /* Call Service B, register a callback
             which call service C on service b completion. */
        serviceB.serviceCall(
                reactor.callbackBuilder()
                        .setCallback(
                            returnValue ->
                                serviceC.serviceCall(serviceCCallback, 1,
                                        " from " + returnValue)
                        ).build(),
                1, " from main");

More will be done. The early work seems to work really well.

In the mean time.. feel free to read about QBit the Java Microservice Lib that focuses on Microservices, WebSocket, JSON and HTTP using Active Objects and high-speed queueing, messaging and event bus for modern cloud and mobile applications back-ends.


References and reading materials:

1. Microservices by Martin Fowler and James Lewis
2. Microservices Architecture by Chris Richardson
5. Micro service architecure by Fred George
6. Microservices are not a free lunch by Benjamin Wootton
11. Migrating to microservices by Adrian Cockroft
15. Microservices and DevOps by Adrian Cockcroft
16. Building and Deploying Microservices - Bart Blommaerts
17. Microservices on the JVM - Alexander Heusingfeld
18, Microservices Shaun Abrams
Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training