Rick

Rick
Rick

Thursday, July 23, 2015

Calling Cassandra async from QBit using Reactor and CallBackBuilder, and Callbacks

Cassandra offers an async API as does QBit. Cassandra uses Google Guava. QBit uses QBit. :)
How do you combine them so you do not have to create a worker pool in QBit to make async calls to Cassandra?
Let's say you have a Cassandra service like so...

Example Cassandra service

import com.datastax.driver.core.*;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import io.advantageous.qbit.annotation.*;

import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;

import io.advantageous.qbit.reactive.Callback;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.driver.core.exceptions.QueryExecutionException;
import com.datastax.driver.core.exceptions.QueryValidationException;
...

public class CassandraService {


    private final Logger logger = LoggerFactory.getLogger(CassandraService.class);
    private final CassandraCluster cluster ;
    private final CassandraConfig config;
    private final Session session; //only one per keyspace,
    private final AtomicBoolean isConnected = new AtomicBoolean(false);
    /**
     * Configure the client to connect to cluster
     * @param config
     */
    public CassandraService (final CassandraConfig config) {

            ...
    }




    public void executeAsync(final Callback<ResultSet> callback, final Statement stmt) {
        final ResultSetFuture future = this.session.executeAsync(stmt);

        Futures.addCallback(future, new FutureCallback<ResultSet>() {
            @Override
            public void onSuccess(ResultSet result) {
                callback.accept(result);
            }

            @Override
            public void onFailure(Throwable t) {
                callback.onError(t);
            }
        });

    }
Note that Futures from Cassandra driver support comes from the Guava library from google. DataStax has a nice tutorial on using Cassandra async API with Guava.
In this example we have a service called EventStorageService which endeavors to store an event into Cassandra. Most of the plumbing and tables DDL for the Event have been omitted. This is not a Cassandra tutorial by any means.
Note that in the onSuccess of the FutureCallback that we call the QBit callback akaCallback accept method. A QBit callback is a Java 8 consumer interface Callback<T> extends Consumer<T> which is probably what FutureCallback would have been if it were created post Java 8. You can also see that if theFutureCallback.onFailure gets called and that the code delegates to onError. Fairly simple.
Now we have another service call this service. As in this example CassandraService is a thin wrapper over the Cassandra API.

Example service that uses the cassandra service

public class EventStorageService {
    private final Logger logger = LoggerFactory.getLogger(EventStorageService.class);

    private final CassandraService cassandraService;


    private final Reactor reactor;

    public EventStorageService (final CassandraService cassandraService,
                                final Reactor reactor) {
        this.cassandraService = cassandraService;
        logger.info(" Event Storage Service is up ");

        if (reactor!=null) {
            this.reactor = reactor;
        } else {
            this.reactor = ReactorBuilder.reactorBuilder().build();
        }

    }


    @RequestMapping(value = "/event", method = RequestMethod.POST)
    public void addEventAsync (final Callback<Boolean> statusCallback, final Event event) {
        logger.debug("Storing Event  async {} " , event);
        final EventStorageRecord storageRec = EventConverter.toStorageRec(event);

        final Callback<ResultSet> callback = reactor.callbackBuilder()
                .setCallback(ResultSet.class, resultSet -> 
                                             statusCallback.accept(resultSet!=null))
                .setOnTimeout(() -> statusCallback.accept(false))
                .setOnError(error -> statusCallback.onError(error))
                .build(ResultSet.class);

        this.addEventStorageRecordAsync(callback, storageRec);


    }




    public void addEventStorageRecordAsync (final Callback<ResultSet> callback, 
                                            final EventStorageRecord storageRec) {
        logger.info("Storing  the record with storage-key {} async  ", storageRec.getStorageKey());

        if(storageRec != null) {

            SimpleStatement simpleStatement = ...;
            cassandraService.executeAsync(callback, simpleStatement);

        }


    }

Note that QBit uses a callbackBuilder so the constituent parts of a callback can be lambda expressions.
Callback is a rather simple interface that builds on Java 8 Consumer and adds timeout and error handling.

Callback

public interface Callback<T> extends Consumer<T> {

    default void onError(Throwable error) {

        LoggerFactory.getLogger(Callback.class)
                .error(error.getMessage(), error);
    }


    default void onTimeout() {

    }

}
The Reactor is class to manage timeouts, schedule periodic tasks, and other service call coordination. We initialize the Reactor in the constructor of theEventStorageService as seen in the previous code listing. We use thecallbackBuilder created from the Reactor as it will register the callbacks with thereactor for timeouts and such.
To enable the reactor, we must call it from service queue callback method of idle, limit and empty. One merely needs to call reactor.process from the callback, and it will periodically check for timeouts and such.

Calling reactor process to process callbacks and handle timeouts

    @QueueCallback({
            QueueCallbackType.LIMIT, 
            QueueCallbackType.IDLE,
            QueueCallbackType.EMPTY})
    public void process() {
        reactor.process();
    }

Underneath the covers.

The Reactor uses AsyncFutureCallback which is both a FutureRunnable and aCallback so therefore a Consumer. Rather then invent our own async API or functional API we decided to lean on Java 8, and build on the shoulders of giants.

Reactor uses AsyncFutureCallback internally. And CallBack builder really builds AsyncFutureCallback

public interface AsyncFutureCallback<T> extends Runnable, Callback<T>, Future<T> {
    Exception CANCEL = new Exception("Cancelled RunnableCallback");

    boolean checkTimeOut(long now);

    void accept(T t);

    void onError(Throwable error);

    void run();

    @Override
    boolean cancel(boolean mayInterruptIfRunning);

    @Override
    boolean isCancelled();

    @Override
    boolean isDone();

    @Override
    T get();

    @SuppressWarnings("NullableProblems")
    @Override
    T get(long timeout, TimeUnit unit);


    default boolean timedOut(long now) {

        return !(startTime() == -1 || timeOutDuration() == -1) && (now - startTime()) > timeOutDuration();
    }

    default long timeOutDuration() {
        return -1;
    }


    default long startTime() {
        return -1;
    }

    default void finished() {

    }


    default boolean isTimedOut() {
        return false;
    }
}

QBit: Intercepting method calls, grabbing http request, using BeforeMethodCall, AOP like features with qbit

Recently someone asked me if you could capture the request parameters from a request with QBit REST support. You can.
QBit has this interface.

BeforeMethodCall

package io.advantageous.qbit.service;

import io.advantageous.qbit.message.MethodCall;

/**
 * Use this to register for before method calls for services.
 * <p>
 * created by Richard on 8/26/14.
 *
 * @author rhightower
 */
public interface BeforeMethodCall {

    boolean before(MethodCall call);
}
With this BeforeMethodCall interface you can intercept a method call. If you register it with a ServiceQueue via the ServiceBuilder then the method interception happens in the same thread as the service queue method calls.
If you return false from the before method then the call will not be made. You can also intercept calls at the ServiceBundle and the ServiceEndpointServer levels using theEndpointServerBuilder and the ServiceBundleBuilder. When you register aBeforeMethodCall with a service bundle or and server end point, it gets called before the method is enqueued to the actual service queue. When you register aBeforeMethodCall with a service queue, it gets called right before the method gets invoked in the same thread as the service queue, i.e., in the service thread, which is most useful for capturing the HttpRequest.
But let's say that you want to access the HttpRequest object to do something special with it. Perhaps read the request params.
This is possible. One merely has to intercept the call. Every Request object has a property called originatingRequest. A MethodCall is a Request object as is anHttpRequest. This means that you just have to intercept the call withBeforeMethodCall grab the methodCall, and then use it to get the HttpRequest.

Service example

/**
 * Created by rhightower.
 */
@RequestMapping("/api")
public class PushService {

    private final ThreadLocal<HttpRequest> currentRequest;

    public PushService() {
        this.currentRequest = new ThreadLocal<>();
    }


    @RequestMapping(value = "/event", method = RequestMethod.POST)
    public void event(final Callback<Boolean> callback, final Event event) {

        final HttpRequest httpRequest = currentRequest.get();
        System.out.println(httpRequest.address());
        System.out.println(httpRequest.params().size());
        ...
    }
Now in the main method, we will need to construct the service and then register the service with the endpoint.
Notice the private final ThreadLocal<HttpRequest> currentRequest; because we will use that to store the current http request.

Register a ServiceQueue with an end point server

final ManagedServiceBuilder managedServiceBuilder = ManagedServiceBuilder.managedServiceBuilder();

        final PushService pushService = new PushService();


        final ServiceEndpointServer serviceEndpointServer =  
                     managedServiceBuilder.getEndpointServerBuilder()
                                                            .setUri("/")
                                                            .build();

      ...


        final ServiceQueue pushServiceQueue = ...
        serviceEndpointServer.addServiceQueue("/api/pushservice", pushServiceQueue);
Notice when we create the service queue separately we have to register the address it is bound under.
When we create the service queue (pushServiceQueue) for pushService, we want to tell it to use the same response queue as our endpoint server and register the beforeCall lambda to capture the HttpRequest from the MethodCall.

Creating a lambda expression to populate the currentRequest from the originatingRequest of the MethodCall (call)

        final ServiceQueue pushServiceQueue = managedServiceBuilder
                .createServiceBuilderForServiceObject(pushService) 
                .setResponseQueue(serviceEndpointServer.serviceBundle().responses())
                .setBeforeMethodCall(call -> {

                    pushService.currentRequest.set((HttpRequest) call.originatingRequest());
                    return true;
                })
                .buildAndStart();
The full example is a bit longer as it has some other things not mentioned in this article.
public class Event {

    private final String name;

    public Event(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

}

....


import io.advantageous.qbit.annotation.*;
import io.advantageous.qbit.admin.ManagedServiceBuilder;
import io.advantageous.qbit.http.request.HttpRequest;
import io.advantageous.qbit.reactive.Callback;
import io.advantageous.qbit.reactive.Reactor;
import io.advantageous.qbit.reactive.ReactorBuilder;
import io.advantageous.qbit.server.ServiceEndpointServer;
import io.advantageous.qbit.service.ServiceQueue;

import java.util.concurrent.TimeUnit;



/**
 * Created by rhightower.
 */
@RequestMapping("/api")
public class PushService {


    private final Reactor reactor;
    private final StoreServiceClient storeServiceClient;

    private final ThreadLocal<HttpRequest> currentRequest;

    public PushService(final Reactor reactor,
                       final StoreServiceClient storeServiceClient) {
        this.reactor = reactor;
        this.storeServiceClient = storeServiceClient;
        this.currentRequest = new ThreadLocal<>();
    }

    @RequestMapping("/hi")
    public String sayHi() {
        return "hi";
    }

    @RequestMapping(value = "/event", method = RequestMethod.POST)
    public void event(final Callback<Boolean> callback, final Event event) {

        final HttpRequest httpRequest = currentRequest.get();

        System.out.println(httpRequest.address());

        System.out.println(httpRequest.params().baseMap());
        storeServiceClient.addEvent(callback, event);

    }

    @QueueCallback({QueueCallbackType.LIMIT, QueueCallbackType.EMPTY, QueueCallbackType.IDLE})
    public void load() {

        reactor.process();
    }


    public static void main(String... args) {


        /* Using new snapshot 2. */
        final ManagedServiceBuilder managedServiceBuilder = ManagedServiceBuilder.managedServiceBuilder();

        final StoreService storeService = new StoreService();


        final ServiceQueue serviceQueue = managedServiceBuilder.createServiceBuilderForServiceObject(storeService)
                .buildAndStartAll();

        final StoreServiceClient storeServiceClient = serviceQueue.createProxyWithAutoFlush(StoreServiceClient.class,
                100, TimeUnit.MILLISECONDS);


        final PushService pushService = new PushService(ReactorBuilder.reactorBuilder().build(),
                storeServiceClient);

        final ServiceEndpointServer serviceEndpointServer = managedServiceBuilder.getEndpointServerBuilder()
                .setUri("/")
                .build();

        final ServiceQueue pushServiceQueue = managedServiceBuilder
                .createServiceBuilderForServiceObject(pushService)
                .setResponseQueue(serviceEndpointServer.serviceBundle().responses())
                .setBeforeMethodCall(call -> {

                    pushService.currentRequest.set((HttpRequest) call.originatingRequest());
                    return true;
                })
                .buildAndStart();

        serviceEndpointServer.addServiceQueue("/api/pushservice", pushServiceQueue);


        serviceEndpointServer.startServer();

        /* Wait for the service to shutdown. */
        managedServiceBuilder.getSystemManager().waitForShutdown();

    }

}
...

public class StoreService {

    public boolean addEvent(final Event event) {

        return true;
    }

}
...

import io.advantageous.qbit.reactive.Callback;

public interface StoreServiceClient {

    void addEvent(final Callback<Boolean> callback, final Event event);
}
...
import io.advantageous.boon.json.JsonFactory;
import io.advantageous.qbit.http.HTTP;

import static io.advantageous.boon.core.IO.puts;

public class TestMain {

    public static void main(final String... args) throws Exception {


        HTTP.Response hello = HTTP.jsonRestCallViaPOST("http://localhost:9090/api/event", JsonFactory.toJson(new Event("hello")));

        puts(hello.body(), hello.status());
    }
}
Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training