Rick

Rick
Rick

Monday, July 27, 2015

Working with Service Pools - working with SOLRJ from a service pool (microservices)

Working with Service Pools - working with SOLRJ from a service pool



In a truly reactive word, one can expect that all APIs are async. However, at times we have to integrate with legacy services and legacy APIs like JDBC.
There are times when you will need worker pools. If you are dealing with IO and the API is not async, then you will want to wrap the API in a service that you can access from a Service pool.
In this example, we will use SOLRJ API to access SOLR.

Example SOLR service

public class SolrServiceImpl implements SolrService {


    /**
     * Create SolrCalypsoDataStore with config file.
     *
     * @param solrConfig solrConfig
     */
    public SolrServiceImpl(final SolrConfig solrConfig, ...) {

        logger.info("SOLR Calypso Exporter Service init {}", solrConfig);
        healthServiceAsync.register(HEALTH_NAME, 20, TimeUnit.SECONDS);
        this.solrConfig = solrConfig;
        connect();
    }

        ...

        /**
     * Connect to solr.
     */
    private void connect() {

          ...
    }


    @Override
    public void storeEvent(Event event) {
        store(event);
    }

    @Override
    public void storeTimeSeries(TimeSeries timeSeries) { store(timeSeries);}


    @Override
    public void get(final Callback<String> callback, final @RequestParam(value = "q", required = true) String queryParams) {
         callback.accept(doGet(queryParams));
    }

    private boolean store(final Object data) {

        logger.info("store():: importing calypso data event into solr {}",
                data);

        if (connectedToSolr) {

            SolrInputDocument doc = SolrServiceHelper.getSolrDocument(data);

            try {
                UpdateResponse ur = client.add(doc);
                if (solrConfig.isForceCommit()) {
                    client.commit();
                }

            } catch (Exception e) {
                             ...    
               }

            return true;
        } else {
                            ...
            return false;
        }
    }

    /**
     * Proxy the request to solr
     * @param queryParams query params
     * @return
     */
    public String doGet(@RequestParam(value = "q", required = true) String queryParams) {

        queryParams = queryParams.replaceAll("\\n", "");

        logger.debug("Processing query params: {} ", queryParams);
        String solrQueryUrl = this.solrConfig.getSolrQueryUrl() + queryParams;

        logger.info("solr request Built {} ", solrQueryUrl);

        String result = null;
        try {
            result = IOUtils.toString(new URI(solrQueryUrl));

        } catch (IOException | URISyntaxException e) {
            logger.error("Failed to get solr response for queryUrl {} ", solrQueryUrl, e);
        }

        return result;
    }



    @QueueCallback(QueueCallbackType.SHUTDOWN)
    public void stop() {

        logger.info("Solr Client stopped");
        try {

            this.client.close();
            this.connectedToSolr = false;
        } catch (IOException e) {
            logger.warn("Exception while closing the solr client ", e);
        }

    }
}
Pretty simple. Mainly for an example. Now we want to access this from multiple threads since SOLR can block.
To do this we will use a RoundRobinServiceWorkerBuilder which creates aRoundRobinServiceWorker. To get more background on workers in QBit read sharded service workers and service workers.
RoundRobinServiceWorker is a start-able service dispatcher (Startable,ServiceMethodDispatcher) which can be registered with a ServiceBundle. AServiceMethodDispatcher is an object that can dispatch method calls to a service.
final ManagedServiceBuilder managedServiceBuilder = ManagedServiceBuilder.managedServiceBuilder();

final CassandraService cassandraService = new CassandraService(config.cassandra);


/* Create the round robin dispatcher with 16 threads. */
final RoundRobinServiceWorkerBuilder roundRobinServiceWorkerBuilder = RoundRobinServiceWorkerBuilder
                .roundRobinServiceWorkerBuilder().setWorkerCount(16);

/* Register a callback to create instances. */
roundRobinServiceWorkerBuilder.setServiceObjectSupplier(() 
        -> new SolrServiceImpl(config.solr));

/* Build and start the dispatcher. */
final ServiceMethodDispatcher serviceMethodDispatcher = roundRobinServiceWorkerBuilder.build();
serviceMethodDispatcher.start();

/* Create a service bundle and register the serviceMethodDispatcher with the bundle. */
final ServiceBundle bundle = managedServiceBuilder.createServiceBundleBuilder().setAddress("/").build();
bundle.addServiceConsumer("/solrWorkers", serviceMethodDispatcher);
final SolrService solrWorkers = bundle.createLocalProxy(SolrService.class, "/solrWorkers");
bundle.start();

/* Create other end points and register them with service endpoint server. */
final SolrServiceEndpoint solrServiceEndpoint = new SolrServiceEndpoint(solrWorkers);
final EventStorageService eventStorageService = new EventStorageService(cassandraService);

//final EventManager eventManager = managedServiceBuilder.getEventManager(); In 0.8.16+
final EventManager eventManager = QBit.factory().systemEventManager();
final IngestionService ingestionService = new IngestionService(eventManager);



managedServiceBuilder.getEndpointServerBuilder().setUri("/").build()
                .initServices( cassandraService,
                               eventStorageService,
                               ingestionService,
                               solrServiceEndpoint
                             )
                .startServer();
Notice this code that creates a RoundRobinServiceWorkerBuilder.

Working with RoundRobinServiceWorkerBuilder

        /* Create the round robin dispatcher with 16 threads. */
        final RoundRobinServiceWorkerBuilder roundRobinServiceWorkerBuilder = RoundRobinServiceWorkerBuilder
                .roundRobinServiceWorkerBuilder().setWorkerCount(16);
Above we are creating the builder and setting the number of workers for the round robin dispatcher. The default is to set the number equal to the number of available CPUs. Next we need to tell the builder how to create the service impl objects as follows:

Registering a callback to create instance of the service.

        /* Register a callback to create instances. */
        roundRobinServiceWorkerBuilder.setServiceObjectSupplier(() 
              -> new SolrServiceImpl(config.solr));
NOTE: Note that you use RoundRobinServiceWorkerBuilder when the services are stateless (other than connection state) and you use ShardedServiceWorkerBuilder if you must maintain sharded state (caches or some such).
ServiceBundle knows how to deal with a collection of addressableServiceMethodDispatchers. Thus to use the RoundRobinServiceWorker we need to use a service bundle. Therefore, we create a service bundle and register the service worker with it.

Registering the roundRobinServiceWorker with a service bundle

        /* Build and start the dispatcher. */
        final ServiceMethodDispatcher serviceMethodDispatcher = roundRobinServiceWorkerBuilder.build();
        serviceMethodDispatcher.start();

        /* Create a service bundle and register the serviceMethodDispatcher with the bundle. */
        final ServiceBundle bundle = managedServiceBuilder.createServiceBundleBuilder().setAddress("/").build();
        bundle.addServiceConsumer("/solrWorkers", serviceMethodDispatcher);
        final SolrService solrWorkers = bundle.createLocalProxy(SolrService.class, "/solrWorkers");
        bundle.start();
Service bundles do not auto flush, and we are using an interface from a service bundle from our SolrServiceEndpoint instance. Therefore, we should use a Reactor. A QBitReactor is owned by a service that is siting behind a service queue (ServiceQueue). You can register services to be flushed with a reactor, you can register for repeating jobs with the reactor, and you can coordinate callbacks with the reactor. Thereactor has a process method that needs to be periodically called during idle times, when batch limits (queue is full) are met and when the queue is empty. We do that by calling the process method as follows:

SolrServiceEndpoint using a reactor object to manage callbacks and flushes

@RequestMapping(value = "/storage/solr", method = RequestMethod.ALL)
public class SolrServiceEndpoint {


    private final SolrService solrService;
    private final Reactor reactor;

    public SolrServiceEndpoint(final SolrService solrService) {
        this.solrService = solrService;
        reactor = ReactorBuilder.reactorBuilder().build();
        reactor.addServiceToFlush(solrService);

    }

    @OnEvent(IngestionService.NEW_EVENT_CHANNEL)
    public void storeEvent(final Event event) {
        solrService.storeEvent(event);
    }

    @OnEvent(IngestionService.NEW_TIMESERIES_CHANNEL)
    public void storeTimeSeries(final TimeSeries timeSeries) {
        solrService.storeTimeSeries(timeSeries);
    }


    /**
     * Proxy the request to solr
     *
     * @param queryParams
     * @return
     */
    @RequestMapping(value = "/get", method = RequestMethod.GET)
    public void get(final Callback<String> callback, final @RequestParam(value = "q", required = true) String queryParams) {
        solrService.get(callback, queryParams);
    }


    @QueueCallback({QueueCallbackType.EMPTY, QueueCallbackType.IDLE, QueueCallbackType.LIMIT})
    public void process() {
        reactor.process();
    }
}
Notice that the process method of SolrServiceEndpoint uses the QueueCallBackannotation and enums (@QueueCallback({QueueCallbackType.EMPTY, QueueCallbackType.IDLE, QueueCallbackType.LIMIT}), and then all it does it callreactor.process. In the constructor, we registered the solrService service proxy with the reactor.

Registering the solrService with the reactor

 public SolrServiceEndpoint(final SolrService solrService) {
        this.solrService = solrService;
        reactor = ReactorBuilder.reactorBuilder().build();
        reactor.addServiceToFlush(solrService);

    }

Thursday, July 23, 2015

Calling Cassandra async from QBit using Reactor and CallBackBuilder, and Callbacks

Cassandra offers an async API as does QBit. Cassandra uses Google Guava. QBit uses QBit. :)
How do you combine them so you do not have to create a worker pool in QBit to make async calls to Cassandra?
Let's say you have a Cassandra service like so...

Example Cassandra service

import com.datastax.driver.core.*;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import io.advantageous.qbit.annotation.*;

import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;

import io.advantageous.qbit.reactive.Callback;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.driver.core.exceptions.QueryExecutionException;
import com.datastax.driver.core.exceptions.QueryValidationException;
...

public class CassandraService {


    private final Logger logger = LoggerFactory.getLogger(CassandraService.class);
    private final CassandraCluster cluster ;
    private final CassandraConfig config;
    private final Session session; //only one per keyspace,
    private final AtomicBoolean isConnected = new AtomicBoolean(false);
    /**
     * Configure the client to connect to cluster
     * @param config
     */
    public CassandraService (final CassandraConfig config) {

            ...
    }




    public void executeAsync(final Callback<ResultSet> callback, final Statement stmt) {
        final ResultSetFuture future = this.session.executeAsync(stmt);

        Futures.addCallback(future, new FutureCallback<ResultSet>() {
            @Override
            public void onSuccess(ResultSet result) {
                callback.accept(result);
            }

            @Override
            public void onFailure(Throwable t) {
                callback.onError(t);
            }
        });

    }
Note that Futures from Cassandra driver support comes from the Guava library from google. DataStax has a nice tutorial on using Cassandra async API with Guava.
In this example we have a service called EventStorageService which endeavors to store an event into Cassandra. Most of the plumbing and tables DDL for the Event have been omitted. This is not a Cassandra tutorial by any means.
Note that in the onSuccess of the FutureCallback that we call the QBit callback akaCallback accept method. A QBit callback is a Java 8 consumer interface Callback<T> extends Consumer<T> which is probably what FutureCallback would have been if it were created post Java 8. You can also see that if theFutureCallback.onFailure gets called and that the code delegates to onError. Fairly simple.
Now we have another service call this service. As in this example CassandraService is a thin wrapper over the Cassandra API.

Example service that uses the cassandra service

public class EventStorageService {
    private final Logger logger = LoggerFactory.getLogger(EventStorageService.class);

    private final CassandraService cassandraService;


    private final Reactor reactor;

    public EventStorageService (final CassandraService cassandraService,
                                final Reactor reactor) {
        this.cassandraService = cassandraService;
        logger.info(" Event Storage Service is up ");

        if (reactor!=null) {
            this.reactor = reactor;
        } else {
            this.reactor = ReactorBuilder.reactorBuilder().build();
        }

    }


    @RequestMapping(value = "/event", method = RequestMethod.POST)
    public void addEventAsync (final Callback<Boolean> statusCallback, final Event event) {
        logger.debug("Storing Event  async {} " , event);
        final EventStorageRecord storageRec = EventConverter.toStorageRec(event);

        final Callback<ResultSet> callback = reactor.callbackBuilder()
                .setCallback(ResultSet.class, resultSet -> 
                                             statusCallback.accept(resultSet!=null))
                .setOnTimeout(() -> statusCallback.accept(false))
                .setOnError(error -> statusCallback.onError(error))
                .build(ResultSet.class);

        this.addEventStorageRecordAsync(callback, storageRec);


    }




    public void addEventStorageRecordAsync (final Callback<ResultSet> callback, 
                                            final EventStorageRecord storageRec) {
        logger.info("Storing  the record with storage-key {} async  ", storageRec.getStorageKey());

        if(storageRec != null) {

            SimpleStatement simpleStatement = ...;
            cassandraService.executeAsync(callback, simpleStatement);

        }


    }

Note that QBit uses a callbackBuilder so the constituent parts of a callback can be lambda expressions.
Callback is a rather simple interface that builds on Java 8 Consumer and adds timeout and error handling.

Callback

public interface Callback<T> extends Consumer<T> {

    default void onError(Throwable error) {

        LoggerFactory.getLogger(Callback.class)
                .error(error.getMessage(), error);
    }


    default void onTimeout() {

    }

}
The Reactor is class to manage timeouts, schedule periodic tasks, and other service call coordination. We initialize the Reactor in the constructor of theEventStorageService as seen in the previous code listing. We use thecallbackBuilder created from the Reactor as it will register the callbacks with thereactor for timeouts and such.
To enable the reactor, we must call it from service queue callback method of idle, limit and empty. One merely needs to call reactor.process from the callback, and it will periodically check for timeouts and such.

Calling reactor process to process callbacks and handle timeouts

    @QueueCallback({
            QueueCallbackType.LIMIT, 
            QueueCallbackType.IDLE,
            QueueCallbackType.EMPTY})
    public void process() {
        reactor.process();
    }

Underneath the covers.

The Reactor uses AsyncFutureCallback which is both a FutureRunnable and aCallback so therefore a Consumer. Rather then invent our own async API or functional API we decided to lean on Java 8, and build on the shoulders of giants.

Reactor uses AsyncFutureCallback internally. And CallBack builder really builds AsyncFutureCallback

public interface AsyncFutureCallback<T> extends Runnable, Callback<T>, Future<T> {
    Exception CANCEL = new Exception("Cancelled RunnableCallback");

    boolean checkTimeOut(long now);

    void accept(T t);

    void onError(Throwable error);

    void run();

    @Override
    boolean cancel(boolean mayInterruptIfRunning);

    @Override
    boolean isCancelled();

    @Override
    boolean isDone();

    @Override
    T get();

    @SuppressWarnings("NullableProblems")
    @Override
    T get(long timeout, TimeUnit unit);


    default boolean timedOut(long now) {

        return !(startTime() == -1 || timeOutDuration() == -1) && (now - startTime()) > timeOutDuration();
    }

    default long timeOutDuration() {
        return -1;
    }


    default long startTime() {
        return -1;
    }

    default void finished() {

    }


    default boolean isTimedOut() {
        return false;
    }
}
Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training