Working with Service Pools - working with SOLRJ from a service pool
In a truly reactive word, one can expect that all APIs are async. However, at times we have to integrate with legacy services and legacy APIs like JDBC.
There are times when you will need worker pools. If you are dealing with IO and the API is not async, then you will want to wrap the API in a service that you can access from a Service pool.
In this example, we will use SOLRJ API to access SOLR.
Example SOLR service
public class SolrServiceImpl implements SolrService {
/**
* Create SolrCalypsoDataStore with config file.
*
* @param solrConfig solrConfig
*/
public SolrServiceImpl(final SolrConfig solrConfig, ...) {
logger.info("SOLR Calypso Exporter Service init {}", solrConfig);
healthServiceAsync.register(HEALTH_NAME, 20, TimeUnit.SECONDS);
this.solrConfig = solrConfig;
connect();
}
...
/**
* Connect to solr.
*/
private void connect() {
...
}
@Override
public void storeEvent(Event event) {
store(event);
}
@Override
public void storeTimeSeries(TimeSeries timeSeries) { store(timeSeries);}
@Override
public void get(final Callback<String> callback, final @RequestParam(value = "q", required = true) String queryParams) {
callback.accept(doGet(queryParams));
}
private boolean store(final Object data) {
logger.info("store():: importing calypso data event into solr {}",
data);
if (connectedToSolr) {
SolrInputDocument doc = SolrServiceHelper.getSolrDocument(data);
try {
UpdateResponse ur = client.add(doc);
if (solrConfig.isForceCommit()) {
client.commit();
}
} catch (Exception e) {
...
}
return true;
} else {
...
return false;
}
}
/**
* Proxy the request to solr
* @param queryParams query params
* @return
*/
public String doGet(@RequestParam(value = "q", required = true) String queryParams) {
queryParams = queryParams.replaceAll("\\n", "");
logger.debug("Processing query params: {} ", queryParams);
String solrQueryUrl = this.solrConfig.getSolrQueryUrl() + queryParams;
logger.info("solr request Built {} ", solrQueryUrl);
String result = null;
try {
result = IOUtils.toString(new URI(solrQueryUrl));
} catch (IOException | URISyntaxException e) {
logger.error("Failed to get solr response for queryUrl {} ", solrQueryUrl, e);
}
return result;
}
@QueueCallback(QueueCallbackType.SHUTDOWN)
public void stop() {
logger.info("Solr Client stopped");
try {
this.client.close();
this.connectedToSolr = false;
} catch (IOException e) {
logger.warn("Exception while closing the solr client ", e);
}
}
}
Pretty simple. Mainly for an example. Now we want to access this from multiple threads since SOLR can block.
To do this we will use a
RoundRobinServiceWorkerBuilder
which creates aRoundRobinServiceWorker
. To get more background on workers in QBit read sharded service workers and service workers.
A
RoundRobinServiceWorker
is a start-able service dispatcher (Startable
,ServiceMethodDispatcher
) which can be registered with a ServiceBundle
. AServiceMethodDispatcher
is an object that can dispatch method calls to a service.final ManagedServiceBuilder managedServiceBuilder = ManagedServiceBuilder.managedServiceBuilder();
final CassandraService cassandraService = new CassandraService(config.cassandra);
/* Create the round robin dispatcher with 16 threads. */
final RoundRobinServiceWorkerBuilder roundRobinServiceWorkerBuilder = RoundRobinServiceWorkerBuilder
.roundRobinServiceWorkerBuilder().setWorkerCount(16);
/* Register a callback to create instances. */
roundRobinServiceWorkerBuilder.setServiceObjectSupplier(()
-> new SolrServiceImpl(config.solr));
/* Build and start the dispatcher. */
final ServiceMethodDispatcher serviceMethodDispatcher = roundRobinServiceWorkerBuilder.build();
serviceMethodDispatcher.start();
/* Create a service bundle and register the serviceMethodDispatcher with the bundle. */
final ServiceBundle bundle = managedServiceBuilder.createServiceBundleBuilder().setAddress("/").build();
bundle.addServiceConsumer("/solrWorkers", serviceMethodDispatcher);
final SolrService solrWorkers = bundle.createLocalProxy(SolrService.class, "/solrWorkers");
bundle.start();
/* Create other end points and register them with service endpoint server. */
final SolrServiceEndpoint solrServiceEndpoint = new SolrServiceEndpoint(solrWorkers);
final EventStorageService eventStorageService = new EventStorageService(cassandraService);
//final EventManager eventManager = managedServiceBuilder.getEventManager(); In 0.8.16+
final EventManager eventManager = QBit.factory().systemEventManager();
final IngestionService ingestionService = new IngestionService(eventManager);
managedServiceBuilder.getEndpointServerBuilder().setUri("/").build()
.initServices( cassandraService,
eventStorageService,
ingestionService,
solrServiceEndpoint
)
.startServer();
Notice this code that creates a
RoundRobinServiceWorkerBuilder
.Working with RoundRobinServiceWorkerBuilder
/* Create the round robin dispatcher with 16 threads. */
final RoundRobinServiceWorkerBuilder roundRobinServiceWorkerBuilder = RoundRobinServiceWorkerBuilder
.roundRobinServiceWorkerBuilder().setWorkerCount(16);
Above we are creating the builder and setting the number of workers for the round robin dispatcher. The default is to set the number equal to the number of available CPUs. Next we need to tell the builder how to create the service impl objects as follows:
Registering a callback to create instance of the service.
/* Register a callback to create instances. */
roundRobinServiceWorkerBuilder.setServiceObjectSupplier(()
-> new SolrServiceImpl(config.solr));
NOTE: Note that you use
RoundRobinServiceWorkerBuilder
when the services are stateless (other than connection state) and you use ShardedServiceWorkerBuilder
if you must maintain sharded state (caches or some such).
A
ServiceBundle
knows how to deal with a collection of addressableServiceMethodDispatcher
s. Thus to use the RoundRobinServiceWorker
we need to use a service bundle. Therefore, we create a service bundle and register the service worker with it.Registering the roundRobinServiceWorker with a service bundle
/* Build and start the dispatcher. */
final ServiceMethodDispatcher serviceMethodDispatcher = roundRobinServiceWorkerBuilder.build();
serviceMethodDispatcher.start();
/* Create a service bundle and register the serviceMethodDispatcher with the bundle. */
final ServiceBundle bundle = managedServiceBuilder.createServiceBundleBuilder().setAddress("/").build();
bundle.addServiceConsumer("/solrWorkers", serviceMethodDispatcher);
final SolrService solrWorkers = bundle.createLocalProxy(SolrService.class, "/solrWorkers");
bundle.start();
Service bundles do not auto flush, and we are using an interface from a service bundle from our
SolrServiceEndpoint
instance. Therefore, we should use a Reactor
. A QBitReactor
is owned by a service that is siting behind a service queue (ServiceQueue
). You can register services to be flushed with a reactor
, you can register for repeating jobs with the reactor
, and you can coordinate callbacks with the reactor
. Thereactor
has a process method that needs to be periodically called during idle times, when batch limits (queue is full) are met and when the queue is empty. We do that by calling the process method as follows:SolrServiceEndpoint using a reactor object to manage callbacks and flushes
@RequestMapping(value = "/storage/solr", method = RequestMethod.ALL)
public class SolrServiceEndpoint {
private final SolrService solrService;
private final Reactor reactor;
public SolrServiceEndpoint(final SolrService solrService) {
this.solrService = solrService;
reactor = ReactorBuilder.reactorBuilder().build();
reactor.addServiceToFlush(solrService);
}
@OnEvent(IngestionService.NEW_EVENT_CHANNEL)
public void storeEvent(final Event event) {
solrService.storeEvent(event);
}
@OnEvent(IngestionService.NEW_TIMESERIES_CHANNEL)
public void storeTimeSeries(final TimeSeries timeSeries) {
solrService.storeTimeSeries(timeSeries);
}
/**
* Proxy the request to solr
*
* @param queryParams
* @return
*/
@RequestMapping(value = "/get", method = RequestMethod.GET)
public void get(final Callback<String> callback, final @RequestParam(value = "q", required = true) String queryParams) {
solrService.get(callback, queryParams);
}
@QueueCallback({QueueCallbackType.EMPTY, QueueCallbackType.IDLE, QueueCallbackType.LIMIT})
public void process() {
reactor.process();
}
}
Notice that the
process
method of SolrServiceEndpoint
uses the QueueCallBack
annotation and enums (@QueueCallback({QueueCallbackType.EMPTY, QueueCallbackType.IDLE, QueueCallbackType.LIMIT}
), and then all it does it callreactor.process
. In the constructor, we registered the solrService
service proxy with the reactor
.Registering the solrService with the reactor
public SolrServiceEndpoint(final SolrService solrService) {
this.solrService = solrService;
reactor = ReactorBuilder.reactorBuilder().build();
reactor.addServiceToFlush(solrService);
}