Rick

Rick
Rick

Monday, July 27, 2015

Working with Service Pools - working with SOLRJ from a service pool (microservices)

Working with Service Pools - working with SOLRJ from a service pool



In a truly reactive word, one can expect that all APIs are async. However, at times we have to integrate with legacy services and legacy APIs like JDBC.
There are times when you will need worker pools. If you are dealing with IO and the API is not async, then you will want to wrap the API in a service that you can access from a Service pool.
In this example, we will use SOLRJ API to access SOLR.

Example SOLR service

public class SolrServiceImpl implements SolrService {


    /**
     * Create SolrCalypsoDataStore with config file.
     *
     * @param solrConfig solrConfig
     */
    public SolrServiceImpl(final SolrConfig solrConfig, ...) {

        logger.info("SOLR Calypso Exporter Service init {}", solrConfig);
        healthServiceAsync.register(HEALTH_NAME, 20, TimeUnit.SECONDS);
        this.solrConfig = solrConfig;
        connect();
    }

        ...

        /**
     * Connect to solr.
     */
    private void connect() {

          ...
    }


    @Override
    public void storeEvent(Event event) {
        store(event);
    }

    @Override
    public void storeTimeSeries(TimeSeries timeSeries) { store(timeSeries);}


    @Override
    public void get(final Callback<String> callback, final @RequestParam(value = "q", required = true) String queryParams) {
         callback.accept(doGet(queryParams));
    }

    private boolean store(final Object data) {

        logger.info("store():: importing calypso data event into solr {}",
                data);

        if (connectedToSolr) {

            SolrInputDocument doc = SolrServiceHelper.getSolrDocument(data);

            try {
                UpdateResponse ur = client.add(doc);
                if (solrConfig.isForceCommit()) {
                    client.commit();
                }

            } catch (Exception e) {
                             ...    
               }

            return true;
        } else {
                            ...
            return false;
        }
    }

    /**
     * Proxy the request to solr
     * @param queryParams query params
     * @return
     */
    public String doGet(@RequestParam(value = "q", required = true) String queryParams) {

        queryParams = queryParams.replaceAll("\\n", "");

        logger.debug("Processing query params: {} ", queryParams);
        String solrQueryUrl = this.solrConfig.getSolrQueryUrl() + queryParams;

        logger.info("solr request Built {} ", solrQueryUrl);

        String result = null;
        try {
            result = IOUtils.toString(new URI(solrQueryUrl));

        } catch (IOException | URISyntaxException e) {
            logger.error("Failed to get solr response for queryUrl {} ", solrQueryUrl, e);
        }

        return result;
    }



    @QueueCallback(QueueCallbackType.SHUTDOWN)
    public void stop() {

        logger.info("Solr Client stopped");
        try {

            this.client.close();
            this.connectedToSolr = false;
        } catch (IOException e) {
            logger.warn("Exception while closing the solr client ", e);
        }

    }
}
Pretty simple. Mainly for an example. Now we want to access this from multiple threads since SOLR can block.
To do this we will use a RoundRobinServiceWorkerBuilder which creates aRoundRobinServiceWorker. To get more background on workers in QBit read sharded service workers and service workers.
RoundRobinServiceWorker is a start-able service dispatcher (Startable,ServiceMethodDispatcher) which can be registered with a ServiceBundle. AServiceMethodDispatcher is an object that can dispatch method calls to a service.
final ManagedServiceBuilder managedServiceBuilder = ManagedServiceBuilder.managedServiceBuilder();

final CassandraService cassandraService = new CassandraService(config.cassandra);


/* Create the round robin dispatcher with 16 threads. */
final RoundRobinServiceWorkerBuilder roundRobinServiceWorkerBuilder = RoundRobinServiceWorkerBuilder
                .roundRobinServiceWorkerBuilder().setWorkerCount(16);

/* Register a callback to create instances. */
roundRobinServiceWorkerBuilder.setServiceObjectSupplier(() 
        -> new SolrServiceImpl(config.solr));

/* Build and start the dispatcher. */
final ServiceMethodDispatcher serviceMethodDispatcher = roundRobinServiceWorkerBuilder.build();
serviceMethodDispatcher.start();

/* Create a service bundle and register the serviceMethodDispatcher with the bundle. */
final ServiceBundle bundle = managedServiceBuilder.createServiceBundleBuilder().setAddress("/").build();
bundle.addServiceConsumer("/solrWorkers", serviceMethodDispatcher);
final SolrService solrWorkers = bundle.createLocalProxy(SolrService.class, "/solrWorkers");
bundle.start();

/* Create other end points and register them with service endpoint server. */
final SolrServiceEndpoint solrServiceEndpoint = new SolrServiceEndpoint(solrWorkers);
final EventStorageService eventStorageService = new EventStorageService(cassandraService);

//final EventManager eventManager = managedServiceBuilder.getEventManager(); In 0.8.16+
final EventManager eventManager = QBit.factory().systemEventManager();
final IngestionService ingestionService = new IngestionService(eventManager);



managedServiceBuilder.getEndpointServerBuilder().setUri("/").build()
                .initServices( cassandraService,
                               eventStorageService,
                               ingestionService,
                               solrServiceEndpoint
                             )
                .startServer();
Notice this code that creates a RoundRobinServiceWorkerBuilder.

Working with RoundRobinServiceWorkerBuilder

        /* Create the round robin dispatcher with 16 threads. */
        final RoundRobinServiceWorkerBuilder roundRobinServiceWorkerBuilder = RoundRobinServiceWorkerBuilder
                .roundRobinServiceWorkerBuilder().setWorkerCount(16);
Above we are creating the builder and setting the number of workers for the round robin dispatcher. The default is to set the number equal to the number of available CPUs. Next we need to tell the builder how to create the service impl objects as follows:

Registering a callback to create instance of the service.

        /* Register a callback to create instances. */
        roundRobinServiceWorkerBuilder.setServiceObjectSupplier(() 
              -> new SolrServiceImpl(config.solr));
NOTE: Note that you use RoundRobinServiceWorkerBuilder when the services are stateless (other than connection state) and you use ShardedServiceWorkerBuilder if you must maintain sharded state (caches or some such).
ServiceBundle knows how to deal with a collection of addressableServiceMethodDispatchers. Thus to use the RoundRobinServiceWorker we need to use a service bundle. Therefore, we create a service bundle and register the service worker with it.

Registering the roundRobinServiceWorker with a service bundle

        /* Build and start the dispatcher. */
        final ServiceMethodDispatcher serviceMethodDispatcher = roundRobinServiceWorkerBuilder.build();
        serviceMethodDispatcher.start();

        /* Create a service bundle and register the serviceMethodDispatcher with the bundle. */
        final ServiceBundle bundle = managedServiceBuilder.createServiceBundleBuilder().setAddress("/").build();
        bundle.addServiceConsumer("/solrWorkers", serviceMethodDispatcher);
        final SolrService solrWorkers = bundle.createLocalProxy(SolrService.class, "/solrWorkers");
        bundle.start();
Service bundles do not auto flush, and we are using an interface from a service bundle from our SolrServiceEndpoint instance. Therefore, we should use a Reactor. A QBitReactor is owned by a service that is siting behind a service queue (ServiceQueue). You can register services to be flushed with a reactor, you can register for repeating jobs with the reactor, and you can coordinate callbacks with the reactor. Thereactor has a process method that needs to be periodically called during idle times, when batch limits (queue is full) are met and when the queue is empty. We do that by calling the process method as follows:

SolrServiceEndpoint using a reactor object to manage callbacks and flushes

@RequestMapping(value = "/storage/solr", method = RequestMethod.ALL)
public class SolrServiceEndpoint {


    private final SolrService solrService;
    private final Reactor reactor;

    public SolrServiceEndpoint(final SolrService solrService) {
        this.solrService = solrService;
        reactor = ReactorBuilder.reactorBuilder().build();
        reactor.addServiceToFlush(solrService);

    }

    @OnEvent(IngestionService.NEW_EVENT_CHANNEL)
    public void storeEvent(final Event event) {
        solrService.storeEvent(event);
    }

    @OnEvent(IngestionService.NEW_TIMESERIES_CHANNEL)
    public void storeTimeSeries(final TimeSeries timeSeries) {
        solrService.storeTimeSeries(timeSeries);
    }


    /**
     * Proxy the request to solr
     *
     * @param queryParams
     * @return
     */
    @RequestMapping(value = "/get", method = RequestMethod.GET)
    public void get(final Callback<String> callback, final @RequestParam(value = "q", required = true) String queryParams) {
        solrService.get(callback, queryParams);
    }


    @QueueCallback({QueueCallbackType.EMPTY, QueueCallbackType.IDLE, QueueCallbackType.LIMIT})
    public void process() {
        reactor.process();
    }
}
Notice that the process method of SolrServiceEndpoint uses the QueueCallBackannotation and enums (@QueueCallback({QueueCallbackType.EMPTY, QueueCallbackType.IDLE, QueueCallbackType.LIMIT}), and then all it does it callreactor.process. In the constructor, we registered the solrService service proxy with the reactor.

Registering the solrService with the reactor

 public SolrServiceEndpoint(final SolrService solrService) {
        this.solrService = solrService;
        reactor = ReactorBuilder.reactorBuilder().build();
        reactor.addServiceToFlush(solrService);

    }
Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training