Rick

Rick
Rick

Monday, August 24, 2015

Reactor tutorial | reactively handling async calls with QBit Reactive Microservices

QBit reactive programming with the Reactor

The problem with microservices

Distributed system are complex.
"Microservices imply a distributed system." (Microservices - Not A Free Lunch!)
And this means to scale we need asynchronous calls.
"Where before we might have had a method call acting as a subsystem boundary, we now introduce lots of remote procedure calls, REST APIs or messaging to glue components together across different processes and servers." (Microservices - Not A Free Lunch!)
This introduces all sorts of coordination issues.
"Once we have distributed a system, we have to consider a whole host of concerns that we didn't before. Network latency, fault tolerance, message serialisation, unreliable networks, asynchronicity, versioning, varying loads within our application tiers etc." (Microservices - Not A Free Lunch!.)
Coordinating asynchronous calls is difficult.
"However, when things have to happen synchronously or transactionally in an inherently Asynchronous architecture, things get complex with us needing to manage correlation IDs and distributed transactions to tie various actions together." again according to according to Microservices - Not A Free Lunch!.
You can't really escape asynchronous calls as synchronous calls considered harmful according to the Microservices paper by Martin Fowler et al, which we discuss in some more detail below.
"Any time you have a number of synchronous calls between services you will encounter the multiplicative effect of downtime. Simply, this is when the downtime of your system becomes the product of the downtimes of the individual components. You face a choice, making your calls asynchronous or managing the downtime." (Microservices paper by Martin Fowler et al).
QBit provides support for coordinating asynchronous calls and avoiding cascading failures.
If you see a tutorial that makes a lot of synchronous calls and does not discuss eliminating cascading failures and the need for asynchronous programming, realize you are not really reading a microserivce tutorial, you are reading a tutorial most likely on how to expose your classes as REST. Microservices is more than just exposing an API via REST. This is a microservices tutorial. 

Reactive Microservices Background

One of the key tenets of a microservices architecture is the ability to be asynchronous. This is important because you want to make the best use of your hardware. There is little point in starting up thousands of threads that are waiting on IO. Instead you can have fewer CPU threads and use a async model.
An asynchronous programming model is not in and of itself a reactive programming model. You need asynchronous model before you can have a truly reactive model. In order to have a reactive model, you need to be able to coordinate asynchronous calls.
Imagine you have three services. One of the services is client facing. By client facing, we mean public web or for internal app, it is the end point that the client app talks to. Let's call this client facing service Service A.
For example, let’s say Service A performs an operation on behalf of the client, and this operation needs to call Service B, and after it calls Service B, it needs to take the result of Service B and call Service C. And then Service A takes the combined results a Service B and Service C and returns those back to the client. These of course are all nonblocking asynchronous calls.
Let’s summarize the Client calls a method on Service AService A calls a method on Service B,. Then when result from Service B method invocation comes back,Service A then calls a method on Service C, passing results from Service B as a parameter to the call to Service C. The combined results from Service B and Service C are then processed by Service A and then finally Service A passes the response that depended on calls to Service B and Service C to the Client.
The reactive nature comes into play in that we need to coordinate the call to Service Cto happen after the call to Service B. And we need to maintain enough with the context from the original call to return results to the original client. At this point we are still mostly talking about an asynchronous system not really a reactive system per se. There are language constructs and Java to capture the context of the call either lambda expression or an anonymous class.
Where a reactive system start to come into play is what happens if Service B orService C takes too long to respond. Or if the total operation of Service A takes too long to respond. You need to have a way to detect when asynchronous call do not come back in allotted period of time. If you do not have this, the client can continue to hold onto the connection that is not responding and there is hardware limitations to how many open connections you can. Now let's say if the Client is not a client app but rather another service that is calling Service A. You do not want a rolling back up of waiting connections if a downstream service like Service B stopped responding. The ability to handle a non-responsive system is what makes a reactive system reactive. The system has to be able to react to things like timeouts or downstream failures.
Now the call sequence that was just described is a fairly simple one. A more complicated call sequence might involved many downstream services and perhaps calls that rely on calls that rely on calls that then decide which other calls to make. It might make sense to have some service internal cache that can cache results of the calls and coordinate a filtered response based on N number of calls. However complex the call sequences the basic principle that you can't leave the client hanging still applies. At some point one has to determine that the call sequence is not going to be successful and at that point a response even if it's an error response must return to client. The main mission of reactive system is to not have a cascading failure.
Again, the main mission of reactive system is to not have a cascading failure.
In the case of the cache, which doesn't have to be a real cache at all one may need a mechanism to purge this cache and/or keep the cache warmed up. Perhaps instead of making frequent calls to Service BService A can be notified by Service B via an event that item of interest has been changed and Service A can ask ahead a time for the things it needs from serves be before the client asked for them.
Things like async call coordinationhandling async call timeoutscoordinating complex async calls, and populating caches based on events, and having periodic jobs to manage real-time stats, cache eviction, and complex call coordination is needed. A system that provides these things is a reactive system. In QBit (Java Microservice Lib) the main interface to this reactive system is the Reactor.

QBit Background

QBit is a service-orientedreactivemicro-service library. QBit revolves around having aJava idiomatic service architecture. Your services are Java classes. These services are guaranteed to only be called by one thread at a time. Since there are strong guarantees for thread safety your services can maintain state. Since your services can maintain state then they can do things like keeping internal cache where the internal cash might just be a tree map or a hash map or your own data structure. Stateful services can also do things like reports statistics since they can easily have counters.
If you have a CPU intensive service that needs to maintain state, QBit allows you to shard services in the same JVM. There are built-in shard rules to shard based on method call arguments and you can create your own shard rules easily.
Data safety can be accomplished through using tools like CassandraKafka or by simply having services that replicate to another service peer. You can set the service up so it does not mutate its internal state, until a call to Kaka, or an update to Cassandra or call to a replica succeeds. The calls to the replica or async store or transactional message bus will be asynchronous calls.
QBit enables the development of in-memory services, IO bound services, or both running in the same JVM, etc. QBit provides a cluster event bus (using idiomatic Java, i.e., interfaces, classes), a fast batched queuing system based on streams of calls, shared services, round-robin services, as well as exposing services via REST/JSON or WebSocket (pluggable remoting) not to mention a ServiceDiscovery mechanism so services can find peers for replication. QBit services can automatically be enrolled in the QBit health system or the QBit stats system. QBit provides 
HealthService,ServiceDiscoveryEventService and a StatService. The StatService can be integrated with StatsD to publish passive stats. Or you can query the stats engine and react to the stats (counts, timings and levels). The StatsService is a reactive stats system that can be clustered. The StatService is reactive in that your services can publish to it and query it and react based on the results. You can implement things like rate limiting and react to an increased rate of something. The ServiceDiscovery system integrates with the HealthSystem and Consul to roll up each of your internal services that make up you micro service and publish the composite availably of your micro service to a single HTTP endpoint or a dead mans switch in Consul (TTL). In short without going into a ton of detail, QBit fully embraces microservices. Down to even publishing the REST interfaces as swagger meta-data to enable API-gateways.
Whether QBIt is calling another async service or calling another QBit async service (remote or local) or is using a pool of services to call a blocking IO service one thing is clear, you need async call coordination.

QBit Reactor to reactively manage async microserivce calls

First and foremost, the Reactor ensures that async calls come in on the same thread as the method calls and event publication that the ServiceQueue already handles not a foreign thread so the callback handlers are thread safe. The Reactor is more or less a utility class to manage async calls and periodic jobs.
The Reactor works in concert with a ServiceQueue to manage async calls and schedule periodic jobs. Recall that events and method calls that come through aServiceQueue are guaranteed to come in on the same thread. The ServiceQueue based service is inherently thread safe. This is not a new idea DCOM supported this with active objects and apartment model threading, Akka supports this same concept with typed Actors and the LMAX architecture for trading uses the same principle (although souped up and highly optimized for high-speed trading). As it turns out, CPUs are fairly fast, and you can do a lot of operations per second on a single thread, quite a bit more than often the IO hardware card can handle.
Thus if both events and method calls come in on the same thread, what happens when we call into another service or use a library that has a callback or some sort of async future. The callback or async future will come back on a foreign thread. We need a way to get that callback to come back on the same thread as the ServiceQueue. This is where the Reactor comes into play. The Reactor ensures that callbacks happen on the same thread as the Service running in a ServiceQueue.
If you adopt the QBit model, you embrace the fact that services can be stateful, even if the state is only counters and caches. You are in effect embracing in-memory services. This does not force you to manage state in a Java class, but it allows you to manage state and makes things like counters and stats collection chlid’s play.
The missing link is managing callbacks so that they also come back on the same thread as the ServiceQueue. The Reactor allows callbacks to be handled like events and method calls.

Reactor to manage async calls

package io.advantageous.qbit.reactive;

…
public class Reactor {


    /** Add an object that is auto flushed.
     *
     * @param serviceObject as service object that will be auto-flushed.
     */
    public void addServiceToFlush(final Object serviceObject) {
       ….
    }

    /** Add a task that gets repeated.
     *
     * @param repeatEvery repeat Every time period
     * @param timeUnit unit for repeatEvery
     * @param task task to perform
     */
    public void addRepeatingTask(final long repeatEvery, final TimeUnit timeUnit, 
                                                     final Runnable task) {

       …
   }

   public CallbackBuilder callbackBuilder() {
        return CallbackBuilder.callbackBuilder(this);
    }

    public CoordinatorBuilder coordinatorBuilder() {
        return CoordinatorBuilder.coordinatorBuilder(this);
    }

…

   public process() {
       …
   }

}
You do not always need to create a callback via the Reactor. However, if you want to mutate the state of the Service based on a ServiceQueue, you will want to use aReactor. Also the Reactor makes it convenient to have callbacks with timeouts. Those are the two use cases for the Reactor. You want to enforce a timeout or you want to ensure that the callback executes on the same thread as the method calls and events so that the access to member variables of the service are thread safe.

HRService and DepartmentRepo example using Reactor

Let’s create a small example to show how it all ties in.
We have the following components and classes and interfaces:
  • HRService (Human resources service) that is exposed via REST
  • DepartmentRepo which stores departments in a long term storage Department a department object
  • DepartmentRepoAsync which is the async interface to DepartmentRepo
  • Reactor which coordinates calls to DepartmentRepo
  • HRServiceMain which constructs the servers and services queues (wiring)
Let’s look at HRServiceHRService (Human Resource Service) is a s Service that is running on a ServiceQueue thread.

HRService

/** This is the public REST interface to the Human Resources services.
 *
 */
@RequestMapping("/hr")
public class HRService {

    private final Map<Integer, Department> departmentMap 
                               = new HashMap<>();

    private final Reactor reactor;
    private final DepartmentRepoAsync departmentRepoAsync;

    /**
     * Construct a new HR REST Service.
     * @param reactor reactor
     * @param departmentRepoAsync async interface to DepartmentStore
     */
    public HRService(final Reactor reactor, 
                                   final DepartmentRepoAsync departmentRepoAsync) {
        this.reactor = reactor;
        this.reactor.addServiceToFlush(departmentRepoAsync);
        this.departmentRepoAsync = departmentRepoAsync;
    }

    /**
     * Add a new department
     * @param callback callback
     * @param departmentId department id
     * @param department department
     */
    @RequestMapping(value = "/department/{departmentId}/", 
                                    method = RequestMethod.POST)
    public void addDepartment(final Callback<Boolean> callback, 
                              @PathVariable("departmentId") Integer departmentId,
                              final Department department) {

        final Callback<Boolean> repoCallback = reactor.callbackBuilder()
                .setCallback(Boolean.class, succeeded -> {
                    departmentMap.put(departmentId, department);
                    callback.accept(succeeded);
                }).build();

        //TODO improve this to handle timeout and error handling.
        departmentRepoAsync.addDepartment(repoCallback, department);

    }

    /** Register to be notified when the service queue is idle, empty, 
          or has hit its batch limit.
     */
    @QueueCallback({QueueCallbackType.EMPTY, 
               QueueCallbackType.IDLE, QueueCallbackType.LIMIT})
    private void process () {

        /** Call the reactor to process callbacks. */
        reactor.process();
    }
To use the Reactor, you must do the following, 1) register collaborating services with addServiceToFlush, call the reactor’s process method from a @QueueCallback method of the service that registers for idle, empty and limit notification. The Reactor's process method will handle registered coordinators, repeating jobs, collaborating service queue flushes, and callback timeouts & callbacks running on the same thread as the service queue. Now every time we make a call to our collaborating service we will use the callback builder from the reactor (reactor.callbackBuilder) so the reactor can manage the callback and if it times out. Let's break this down.
First we register the collaborating services with addServiceToFlush.

register collaborating services with addServiceToFlush

public HRService(final Reactor reactor, 
                                final DepartmentRepoAsync departmentRepoAsync) {
       ...
        this.reactor.addServiceToFlush(departmentRepoAsync);
Next we call the reactor’s process method from a @QueueCallback method that registers for idle, empty and limit notification.

call the reactor’s process from a @QueueCallback method

    /** Register to be notified when the service queue is 
      idle, empty, or has hit its batch limit.
     */
    @QueueCallback({QueueCallbackType.EMPTY, 
               QueueCallbackType.IDLE, QueueCallbackType.LIMIT})
    private void process () {

        /** Call the reactor to process callbacks. */
        reactor.process();
    }


This literally means if the queue is idle or empty or we reached the batch size limit, then run the reactor process method. This works for most use cases, but you could opt to call reactor.process after some other important event or after X number of calls to a certain method. The reactor process method is where it manages the service flushes, callbacks, periodic jobs, etc.
DepartmentRepo which stores departments in a long term storage for now is just a simple class to keep the discussion moving forward.

DepartmentRepo which stores departments

package com.mammatustech.hr;

import java.util.HashMap;
import java.util.Map;

/**
 * Represents a storage repo. Imagine this is talking to MongoDB or
 * Cassandra. Perhaps it is also indexing the department name via
 * SOLR. It does all of this and then returns when it is finished.
 * If this in turn called other services, it would use a Callback instead of
 * returning a boolean.
 */
public class DepartmentRepo {

    private final Map<Long, Department> departmentMap = new HashMap<>();


    /**
     * Add a department.
     * @param department department we are adding.
     * @return true if successfully stored the department
     */
    public boolean addDepartment(final Department department) {

        departmentMap.put(department.getId(), department);
        return true;
    }
}
For now imagine it writing to database or Cassandra or LevelDB or something.
Since this is such a simple version, we don’t even need a Callback, but we do need one when we call it. (Later we will coordinate multiple calls).
DepartmentRepoAsync which is the async interface to DepartmentRepo so it allows async access even though, it does not technically need it yet.

DepartmentRepoAsync which is the async interface to DepartmentRepo

package com.mammatustech.hr;


import io.advantageous.qbit.reactive.Callback;

/**
 * Async interface to DepartmentRepo internal service.
 *
 */
public interface DepartmentRepoAsync {

    /**
     * Add a department to the repo.
     * @param callback callback which returns the success code async.
     * @param department department to add
     */
     void addDepartment(final Callback<Boolean> callback,
                        final Department department);

}
There is nothing special about the Department object.

Department Object

package com.mammatustech.hr;

import java.util.ArrayList;
import java.util.List;

public class Department {

    private final long id;
    private final String name;
    private final List<Employee> employeeList;

    public Department(long id, String name, List<Employee> employeeList) {
        this.id = id;
        this.name = name;
        this.employeeList = employeeList;
    }

    public void addEmployee(Employee employee) {
        employeeList.add(employee);
    }

    public List<Employee> getEmployeeList() {
        return new ArrayList<>(employeeList);
    }

    public long getId() {
        return id;
    }
}
HRServiceMain constructs the servers and services queues and starts them up. It is the bootstrap class.

HRServiceMain wires up DepartmentRepo and HRService

/**
 * Default port for admin is 7777.
 * Default port for main endpoint is 8080.
 *
 * <pre>
 * <code>
 *
 *     Access the service:
 *
 *    $ curl http://localhost:8888/v1/...
 *
 *
 *     To see swagger file for this service:
 *
 *    $ curl http://localhost:7777/__admin/meta/
 *
 *     To see health for this service:
 *
 *    $ curl http://localhost:8888/__health
 *     Returns "ok" if all registered health systems are healthy.
 *
 *     OR if same port endpoint health is disabled then:
 *
 *    $ curl http://localhost:7777/__admin/ok
 *     Returns "true" if all registered health systems are healthy.
 *
 *
 *     A node is a service, service bundle, queue, or server endpoint that is being monitored.
 *
 *     List all service nodes or endpoints
 *
 *    $ curl http://localhost:7777/__admin/all-nodes/
 *
 *
 *      List healthy nodes by name:
 *
 *    $ curl http://localhost:7777/__admin/healthy-nodes/
 *
 *      List complete node information:
 *
 *    $ curl http://localhost:7777/__admin/load-nodes/
 *
 *
 *      Show service stats and metrics
 *
 *    $ curl http://localhost:8888/__stats/instance
 * </code>
 * </pre>
 */
public class HRServiceMain {

    public static void main(final String... args) throws Exception {

        /* Create the ManagedServiceBuilder which 
               manages a clean shutdown, health, stats, etc. */
        final ManagedServiceBuilder managedServiceBuilder =
                ManagedServiceBuilder.managedServiceBuilder()
                        .setRootURI("/v1") //Defaults to services
                        .setPort(8888); //Defaults to 8080 or environment variable PORT


        /* Build the reactor. */
        final Reactor reactor = ReactorBuilder.reactorBuilder()
                                .setDefaultTimeOut(1)
                                .setTimeUnit(TimeUnit.SECONDS)
                                .build();


        /* Build the service queue for DepartmentRepo. */
        final ServiceQueue departmentRepoServiceQueue = 
                            managedServiceBuilder
                                 .createServiceBuilderForServiceObject(
                                         new DepartmentRepo()).build();

        departmentRepoServiceQueue
                 .startServiceQueue()
                 .startCallBackHandler();

        /* Build the remote interface for department repo. */
        final DepartmentRepoAsync departmentRepoAsync =
                          departmentRepoServiceQueue
                              .createProxy(DepartmentRepoAsync.class);



        /* Start the service. */
        managedServiceBuilder.addEndpointService(
               new HRService(reactor, departmentRepoAsync)) //Register HRService
                .getEndpointServerBuilder()
                .build().startServer();

        /* Start the admin builder which exposes health 
                     end-points and swagger meta data. */
        managedServiceBuilder.getAdminBuilder().build().startServer();

        System.out.println("HR Server and Admin Server started");

    }
}
You can run this example by going to Reactor Example on github. There is even a REST client generated with swagger to exercise this example HRService client generated with Swagger.
Thus far we have only handled making the callback from DepartmentRepo happen on the same thread as the ServiceQueue of HRService. We have not really handled the timeout case.
To handle the timeout case, we need to handle the onTimeOut handler. Essentially we need to register an onTimeOut with the callbackBuilder as follows.

Registering an onTimeOut to handle timeouts

    @RequestMapping(value = "/department/{departmentId}/", 
             method = RequestMethod.POST)
    public void addDepartment(final Callback<Boolean> callback, 
                              @PathVariable("departmentId") Integer departmentId,
                              final Department department) {


        final Callback<Boolean> repoCallback = reactor.callbackBuilder()
                .setCallback(Boolean.class, succeeded -> {
                    departmentMap.put(departmentId, department);
                    callback.accept(succeeded);
                }).setOnTimeout(() -> { //handle onTimeout
                    //callback.accept(false); // one way

                    // callback.onTimeout();  //another way
                    /* The best way. */
                    callback.onError(
                            new TimeoutException("Timeout can't add department " + departmentId));
                }).setOnError(error -> { //handle error handler
                    callback.onError(error);
                }).build();

        departmentRepoAsync.addDepartment(repoCallback, department);
Notice that now we handle not only the callback, but we handle if there was a timeout. You could just return false by calling callback.accept(false) but since a timeout is an exceptional case, we opted to create an Exception and pass it to the callback.onError(…). The other option is call the default onTimeout handler, but by using onError to report the timeout, we are able to pass some additional context information about the timeout.
In addition to handling the timeout, we handle the error handler case. If we don’t handle the timeout and the error handler if their is a timeout or an error then the REST client will hold on to the connection until the HTTP connection times out. We don’t want the client to hold on to the connection for a long time as that could lead to a cascading failure if a downstream service fails while upstream services or clients hold on to connections waiting for their HTTP connections to timeout. Bottom line, handle timeouts and errors by sending a response to the client (even if the client is only an upstream service). Don’t let the client hang. Prevent cascading failures.

Coordinating multiple calls

Let's take this a step further. Let's say that instead of calling one service whenaddDepartment gets called, that we call three services: AuthService,DepartmentCassandraRepo and DepartmentSolrIndexer. First we want the HRService to call the AuthService to see if the user identified by userName is authorized to add a department. The doAddDepartment gets called if auth succeeds. Remember this is merely an example to show what async call coordination looks like. Then the doAddDepartment calls the DepartmentCassandraRepo repo to store the department and if it successful it stores the department in the department cache (departmentMap), notifies the clientCallback, and then call DepartmentSolrIndexer to index the department so that it is searchable.

AuthServiceImpl

 package com.mammatustech.hr;

import io.advantageous.qbit.reactive.Callback;

public interface AuthService {

    void allowedToAddDepartment(Callback<Boolean> callback,
                                String username,
                                int departmentId);

}
...
package com.mammatustech.hr;

import io.advantageous.qbit.reactive.Callback;

public class AuthServiceImpl implements AuthService {

    public void allowedToAddDepartment(final Callback<Boolean> callback,
                                       final String username,
                                       final int departmentId) {

...
    }

}

DepartmentCassandraRepo to store departments

package com.mammatustech.hr;

import io.advantageous.boon.core.Sys;

import java.util.HashMap;
import java.util.Map;

/**
 * Represents a storage repo. Imagine this is talking to 
 * Cassandra. 
 */
public class DepartmentCassandraRepo {
...


    /**
     * Add a department.
     * @param department department we are adding.
     * @return true if successfully stored the department
     */
    public void addDepartment(final Callback<Boolean> callback, 
                        final Department department) {
         ...
    }
}

DepartmentSolrIndexer to index departments

package com.mammatustech.hr;

import io.advantageous.boon.core.Sys;

import java.util.HashMap;
import java.util.Map;

/**
 * Represents a SOLR indexer. Imagine this is talking to 
 * SOLR. 
 */
public class DepartmentSolrIndexer {
...


    /**
     * Add a department.
     * @param department department we are adding.
     * @return true if successfully stored the department
     */
    public void addDepartment(final Callback<Boolean> callback, 
                        final Department department) {
         ...
    }
}

HRService REST interface

/** This is the public REST interface to the Human Resources services.
 *
 */
@RequestMapping("/hr")
public class HRService {

    private final Map<Integer, Department> departmentMap = 
                                                  new HashMap<>();

    private final Reactor reactor;
    private final DepartmentRepoAsync solrIndexer;
    private final DepartmentRepoAsync cassandraStore;
    private final AuthService authService;

    /**
     * Construct a new HR REST Service.
     * @param reactor reactor
     * @param cassandraStore async interface to DepartmentStore
     * @param solrIndexer async interface to SOLR Service
     */
    public HRService(final Reactor reactor,
                     final DepartmentRepoAsync cassandraStore,
                     final DepartmentRepoAsync solrIndexer,
                     final AuthService authService) {
        this.reactor = reactor;
        this.reactor.addServiceToFlush(cassandraStore);
        this.reactor.addServiceToFlush(solrIndexer);
        this.reactor.addServiceToFlush(authService);
        this.cassandraStore = cassandraStore;
        this.solrIndexer = solrIndexer;
        this.authService = authService;
    }

    /**
     * Add a new department
     * @param clientCallback callback
     * @param departmentId department id
     * @param department department
     */
    @RequestMapping(value = "/department/{departmentId}/", method = RequestMethod.POST)
    public void addDepartment(final Callback<Boolean> clientCallback,
                              @PathVariable("departmentId") Integer departmentId,
                              final Department department,
                              @HeaderParam(value="username", defaultValue = "noAuth")
                                  final String userName) {

        final CallbackBuilder callbackBuilder = reactor.callbackBuilder()
                .setOnTimeout(() -> {
                    clientCallback.onError(
                            new TimeoutException("Timeout can't add department " 
                                          + departmentId));
                }).setOnError(clientCallback::onError);


        authService.allowedToAddDepartment(callbackBuilder.setCallback(Boolean.class, allowed -> {
            if (allowed) {
                doAddDepartment(clientCallback, callbackBuilder, department);
            } else {
                clientCallback.onError(new SecurityException("Go away!"));
            }
        }).build(), userName,  departmentId);


    }

    private void doAddDepartment(final Callback<Boolean> clientCallback,
                                 final CallbackBuilder callbackBuilder,
                                 final Department department) {

        final Callback<Boolean> callbackDeptRepo = callbackBuilder.setCallback(Boolean.class, addedDepartment -> {

            departmentMap.put((int)department.getId(), department);
            clientCallback.accept(addedDepartment);

            solrIndexer.addDepartment(indexedOk -> {
            }, department);
        }).build();

        cassandraStore.addDepartment(callbackDeptRepo, department);

    }

    /** Register to be notified when the service queue is idle, empty, or has hit its batch limit.
     */
    @QueueCallback({QueueCallbackType.EMPTY, QueueCallbackType.IDLE, QueueCallbackType.LIMIT})
    private void process () {

        /** Call the reactor to process callbacks. */
        reactor.process();
    }
The key to this is the shared callback builder.
    /**
     * Add a new department
     * @param clientCallback callback
     * @param departmentId department id
     * @param department department
     */
    @RequestMapping(value = "/department/{departmentId}/", method = RequestMethod.POST)
    public void addDepartment(final Callback<Boolean> clientCallback,
                              @PathVariable("departmentId") Integer departmentId,
                              final Department department,
                              @HeaderParam(value="username", defaultValue = "noAuth")
                                  final String userName) {

        final CallbackBuilder callbackBuilder = reactor.callbackBuilder()
                .setOnTimeout(() -> {
                    clientCallback.onError(
                            new TimeoutException("Timeout can't add department " 
                                          + departmentId));
                }).setOnError(clientCallback::onError);
Notice how we break the methods down and functional decompose them so that things are easier to read, witness doAddDepartment and how it is called.

Breaking down callback handling

        authService.allowedToAddDepartment(callbackBuilder.setCallback(Boolean.class, allowed -> {
            if (allowed) {
                doAddDepartment(clientCallback, callbackBuilder, department);
            } else {
                clientCallback.onError(new SecurityException("Go away!"));
            }
        }).build(), userName,  departmentId);
...

    private void doAddDepartment(final Callback<Boolean> clientCallback,
                                 final CallbackBuilder callbackBuilder,
                                 final Department department) {

        final Callback<Boolean> callbackDeptRepo = callbackBuilder.setCallback(Boolean.class, addedDepartment -> {

            departmentMap.put((int)department.getId(), department);
            clientCallback.accept(addedDepartment);

            solrIndexer.addDepartment(indexedOk -> {
            }, department);
        }).build();

        cassandraStore.addDepartment(callbackDeptRepo, department);

    }

Callback builder specifying timeouts

The CallbackBuilder allows you to specify timeouts for calls.

Specifying timeouts per CallbackBuilder

        final CallbackBuilder callbackBuilder = reactor.callbackBuilder()
                .setOnTimeout(() -> {
                    clientCallback.onError(
                            new TimeoutException("Timeout can't add department " + departmentId));
                }).setOnError(clientCallback::onError)
                .setTimeoutDuration(200)
                .setTimeoutTimeUnit(TimeUnit.MILLISECONDS);

Working with repeating tasks

@RequestMapping("/hr")
public class HRService {
    ...
    /**
     * Construct a new HR REST Service.
     * @param reactor reactor
     * @param cassandraStore async interface to DepartmentStore
     * @param solrIndexer async interface to SOLR Service
     */
    public HRService(final Reactor reactor,
                     final DepartmentRepoAsync cassandraStore,
                     final DepartmentRepoAsync solrIndexer,
                     final AuthService authService) {
        ...
        this.reactor.addRepeatingTask(1, TimeUnit.SECONDS, () -> {
            manageCache();
        });
    }

No comments:

Post a Comment

Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training