Rick

Rick
Rick

Saturday, August 1, 2015

Understanding ManagedServiceBuilder to create Microservices in QBit that support Docker, Heroku, Swagger, Consul, and StatsD with near 0 config

QBit integrates easily with ConsulStatsD and Swagger. In addition QBit has its own health system, stats engine, and meta-data engine.

Swagger is for code generations of REST clients in Python, Java, Ruby, Scala, etc.
Consul is for health monitoring and service discovery (among other things).
StatsD is for microservice health monitoring.
To make configuring QBit easier we did two things: 1) we added support for Spring Boot (which we have not released yet), and 2) we created the ManagedServiceBuilder, which we will show below.
The ManagedServiceBuilder simplifies construction of QBit endpoints by registering all services and endpoints with the system service so that they are shut down correctly when you CTRL-C or kill an app gracefully.
In addition ManagedServiceBuilder allows enabling of StatsDSwagger, and Consul in one simple step. By default, ManagedServiceBuilder is configured to run in an environment like Docker or Heroku.
Let's show a simple example:

Hello World REST service in QBit

package com.mammatustech;


import io.advantageous.qbit.admin.ManagedServiceBuilder;
import io.advantageous.qbit.annotation.RequestMapping;

@RequestMapping("/hello")
public class HelloWorldService {


    @RequestMapping("/hello")
    public String hello() {
        return "hello " + System.currentTimeMillis();
    }

    public static void main(final String... args) {
        final ManagedServiceBuilder managedServiceBuilder =
                ManagedServiceBuilder.managedServiceBuilder().setRootURI("/root");

        /* Start the service. */
        managedServiceBuilder.addEndpointService(new HelloWorldService())
                .getEndpointServerBuilder()
                .build().startServer();

        /* Start the admin builder which exposes health end-points and meta data. */
        managedServiceBuilder.getAdminBuilder().build().startServer();

        System.out.println("Servers started");


    }
}
The above is a simple REST service. It has one REST method.

Hello REST method

@RequestMapping("/hello")
public class HelloWorldService {


    @RequestMapping("/hello")
    public String hello() {
        return "hello " + System.currentTimeMillis();
    }
QBit uses the same style REST methods as Spring MVC REST support. QBit only supports JSON as body params and return types.
The gradle file to compile this is as follows:

build.gradle

group 'qbit-ex'
version '1.0-SNAPSHOT'

apply plugin: 'java'


compileJava {
    sourceCompatibility = 1.8
}

repositories {
    mavenCentral()
    mavenLocal()
}

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.11'
    compile group: 'io.advantageous.qbit', name: 'qbit-admin', version: '0.8.16-RC2-SNAPSHOT'
    compile group: 'io.advantageous.qbit', name: 'qbit-vertx', version: '0.8.16-RC2-SNAPSHOT'

}
Change the version to a release after 0.8.16 or build the snapshot.
The main method starts up the end point on port 8080 and then starts up an Admin Server on PORT 7777.

Main method starts up endpoint and admin server

...

    public static void main(final String... args) {
        final ManagedServiceBuilder managedServiceBuilder =
                ManagedServiceBuilder.managedServiceBuilder().setRootURI("/root");

        /* Start the service. */
        managedServiceBuilder.addEndpointService(new HelloWorldService())
                .getEndpointServerBuilder()
                .build().startServer();

        /* Start the admin builder which exposes health end-points and meta data. */
        managedServiceBuilder.getAdminBuilder().build().startServer();

        System.out.println("Servers started");


    }
Since this class has a main method, you should be able to run it from your IDE. The admin server exposes health end-points and meta-data. The default port for an end point is 8080, but you can override it by setting the environment variable PORT or WEB_PORT.
By default, we enable endpoints to manage the server for health, and stats that works in a Heroku or Docker environment. There are methods on ManagedServiceBuilder to disable health checks, etc. There are also methods on ManagedServiceBuilder to turn on StatsD and Consul support.
Let's look at the endpoints we have exposed so far.

Actual Service end point

$ curl http://localhost:8080/root/hello/hello
"hello 1438470713526"

Swagger meta endpoint

$ curl http://localhost:7777/__admin/meta/
{
    "swagger": "2.0",
    "info": {
        "title": "application title goes here",
        "description": "Description not set",
        "contact": {
            "name": "ContactName not set",
            "url": "Contact URL not set",
            "email": "no.contact.email@set.me.please.com"
        },
        "version": "0.1-NOT-SET",
        "license": {
            "name": "licenseName not set",
            "url": "http://www.license.url.com/not/set/"
        }
    },
    "host": "localhost:8888",
    "basePath": "/root",
    "schemes": [
        "http",
        "https",
        "wss",
        "ws"
    ],
    "consumes": [
        "application/json"
    ],
    "produces": [
        "application/json"
    ],
    "paths": {
        "/hello/hello": {
            "get": {
                "operationId": "hello",
                "produces": [
                    "application/json"
                ],
                "responses": {
                    "200": {
                        "description": "returns",
                        "schema": {
                            "type": "string"
                        }
                    }
                }
            }
        }
    }
}
You can import the above into the Swagger editor and generate clients in Python, Perl, PHP, Ruby, Java, C# and more. By the way, there are ways to configure all the parameters that say "set me" or some variation of the above.
There is a health endpoint to make working in Docker and Heroku easy or other similar cloud environments (EC2, VMWare cloud, OpenStack).

Health system endpoint

$ curl http://localhost:8080/__health
"ok"
This deceptively easy end-point will check every endpoint server, service, service queue, etc. to see if they are healthy and you can register your own health checks. This is not just your REST services but all of the IO services, nano services, etc. that they depend on. We could write a whole article on just the HealthService, which is preconfigured with ManagedServiceBuilder.

Other end points of note

Admin Endpoint ok

 $ curl http://localhost:7777/__admin/ok
The above Returns true if all registered health systems are healthy.
A node is a service, service bundle, queue, or server endpoint that is being monitored.

All nodes by name (health and unhealthy)

    $ curl http://localhost:7777/__admin/all-nodes/

List healthy nodes by name

 $ curl http://localhost:7777/__admin/healthy-nodes/

List complete node information

$ curl http://localhost:7777/__admin/load-nodes/

List stats information for Heroku and Docker style environments

 $ curl http://localhost:8080/__stats/instance

Consul

Let's say you want to use Consul. There is one method to enable it.

Enabling Consul

        managedServiceBuilder.enableConsulServiceDiscovery("dc1", "localhost", 8500);
If you want to use the default host and port:

Enabling Consul with on argument

        managedServiceBuilder.enableConsulServiceDiscovery("dc1");
Just do the above before you create your first endpoint server or service queue. The main endpoint will automatically register with Consul and periodically check-in health. It will even check with the internal health system to see if all of the nodes (service queues, endpoints etc.) are healthy and pass that information to Consul.

Enabling StatsD

Enabling StatsD is also easy

Enabling StatsD

        managedServiceBuilder.getStatsDReplicatorBuilder()
                      .setHost("somehost").setPort(9000);
        managedServiceBuilder.setEnableStats(true);
Just do the above before you create your first endpoint server or service queue. There are default stats gathered for all Service Queues and Endpoint servers.
ManagedServiceBuilder is one stop shopping to writing a cloud friendly microservices in QBit.

Monday, July 27, 2015

Working with Service Pools - working with SOLRJ from a service pool (microservices)

Working with Service Pools - working with SOLRJ from a service pool



In a truly reactive word, one can expect that all APIs are async. However, at times we have to integrate with legacy services and legacy APIs like JDBC.
There are times when you will need worker pools. If you are dealing with IO and the API is not async, then you will want to wrap the API in a service that you can access from a Service pool.
In this example, we will use SOLRJ API to access SOLR.

Example SOLR service

public class SolrServiceImpl implements SolrService {


    /**
     * Create SolrCalypsoDataStore with config file.
     *
     * @param solrConfig solrConfig
     */
    public SolrServiceImpl(final SolrConfig solrConfig, ...) {

        logger.info("SOLR Calypso Exporter Service init {}", solrConfig);
        healthServiceAsync.register(HEALTH_NAME, 20, TimeUnit.SECONDS);
        this.solrConfig = solrConfig;
        connect();
    }

        ...

        /**
     * Connect to solr.
     */
    private void connect() {

          ...
    }


    @Override
    public void storeEvent(Event event) {
        store(event);
    }

    @Override
    public void storeTimeSeries(TimeSeries timeSeries) { store(timeSeries);}


    @Override
    public void get(final Callback<String> callback, final @RequestParam(value = "q", required = true) String queryParams) {
         callback.accept(doGet(queryParams));
    }

    private boolean store(final Object data) {

        logger.info("store():: importing calypso data event into solr {}",
                data);

        if (connectedToSolr) {

            SolrInputDocument doc = SolrServiceHelper.getSolrDocument(data);

            try {
                UpdateResponse ur = client.add(doc);
                if (solrConfig.isForceCommit()) {
                    client.commit();
                }

            } catch (Exception e) {
                             ...    
               }

            return true;
        } else {
                            ...
            return false;
        }
    }

    /**
     * Proxy the request to solr
     * @param queryParams query params
     * @return
     */
    public String doGet(@RequestParam(value = "q", required = true) String queryParams) {

        queryParams = queryParams.replaceAll("\\n", "");

        logger.debug("Processing query params: {} ", queryParams);
        String solrQueryUrl = this.solrConfig.getSolrQueryUrl() + queryParams;

        logger.info("solr request Built {} ", solrQueryUrl);

        String result = null;
        try {
            result = IOUtils.toString(new URI(solrQueryUrl));

        } catch (IOException | URISyntaxException e) {
            logger.error("Failed to get solr response for queryUrl {} ", solrQueryUrl, e);
        }

        return result;
    }



    @QueueCallback(QueueCallbackType.SHUTDOWN)
    public void stop() {

        logger.info("Solr Client stopped");
        try {

            this.client.close();
            this.connectedToSolr = false;
        } catch (IOException e) {
            logger.warn("Exception while closing the solr client ", e);
        }

    }
}
Pretty simple. Mainly for an example. Now we want to access this from multiple threads since SOLR can block.
To do this we will use a RoundRobinServiceWorkerBuilder which creates aRoundRobinServiceWorker. To get more background on workers in QBit read sharded service workers and service workers.
RoundRobinServiceWorker is a start-able service dispatcher (Startable,ServiceMethodDispatcher) which can be registered with a ServiceBundle. AServiceMethodDispatcher is an object that can dispatch method calls to a service.
final ManagedServiceBuilder managedServiceBuilder = ManagedServiceBuilder.managedServiceBuilder();

final CassandraService cassandraService = new CassandraService(config.cassandra);


/* Create the round robin dispatcher with 16 threads. */
final RoundRobinServiceWorkerBuilder roundRobinServiceWorkerBuilder = RoundRobinServiceWorkerBuilder
                .roundRobinServiceWorkerBuilder().setWorkerCount(16);

/* Register a callback to create instances. */
roundRobinServiceWorkerBuilder.setServiceObjectSupplier(() 
        -> new SolrServiceImpl(config.solr));

/* Build and start the dispatcher. */
final ServiceMethodDispatcher serviceMethodDispatcher = roundRobinServiceWorkerBuilder.build();
serviceMethodDispatcher.start();

/* Create a service bundle and register the serviceMethodDispatcher with the bundle. */
final ServiceBundle bundle = managedServiceBuilder.createServiceBundleBuilder().setAddress("/").build();
bundle.addServiceConsumer("/solrWorkers", serviceMethodDispatcher);
final SolrService solrWorkers = bundle.createLocalProxy(SolrService.class, "/solrWorkers");
bundle.start();

/* Create other end points and register them with service endpoint server. */
final SolrServiceEndpoint solrServiceEndpoint = new SolrServiceEndpoint(solrWorkers);
final EventStorageService eventStorageService = new EventStorageService(cassandraService);

//final EventManager eventManager = managedServiceBuilder.getEventManager(); In 0.8.16+
final EventManager eventManager = QBit.factory().systemEventManager();
final IngestionService ingestionService = new IngestionService(eventManager);



managedServiceBuilder.getEndpointServerBuilder().setUri("/").build()
                .initServices( cassandraService,
                               eventStorageService,
                               ingestionService,
                               solrServiceEndpoint
                             )
                .startServer();
Notice this code that creates a RoundRobinServiceWorkerBuilder.

Working with RoundRobinServiceWorkerBuilder

        /* Create the round robin dispatcher with 16 threads. */
        final RoundRobinServiceWorkerBuilder roundRobinServiceWorkerBuilder = RoundRobinServiceWorkerBuilder
                .roundRobinServiceWorkerBuilder().setWorkerCount(16);
Above we are creating the builder and setting the number of workers for the round robin dispatcher. The default is to set the number equal to the number of available CPUs. Next we need to tell the builder how to create the service impl objects as follows:

Registering a callback to create instance of the service.

        /* Register a callback to create instances. */
        roundRobinServiceWorkerBuilder.setServiceObjectSupplier(() 
              -> new SolrServiceImpl(config.solr));
NOTE: Note that you use RoundRobinServiceWorkerBuilder when the services are stateless (other than connection state) and you use ShardedServiceWorkerBuilder if you must maintain sharded state (caches or some such).
ServiceBundle knows how to deal with a collection of addressableServiceMethodDispatchers. Thus to use the RoundRobinServiceWorker we need to use a service bundle. Therefore, we create a service bundle and register the service worker with it.

Registering the roundRobinServiceWorker with a service bundle

        /* Build and start the dispatcher. */
        final ServiceMethodDispatcher serviceMethodDispatcher = roundRobinServiceWorkerBuilder.build();
        serviceMethodDispatcher.start();

        /* Create a service bundle and register the serviceMethodDispatcher with the bundle. */
        final ServiceBundle bundle = managedServiceBuilder.createServiceBundleBuilder().setAddress("/").build();
        bundle.addServiceConsumer("/solrWorkers", serviceMethodDispatcher);
        final SolrService solrWorkers = bundle.createLocalProxy(SolrService.class, "/solrWorkers");
        bundle.start();
Service bundles do not auto flush, and we are using an interface from a service bundle from our SolrServiceEndpoint instance. Therefore, we should use a Reactor. A QBitReactor is owned by a service that is siting behind a service queue (ServiceQueue). You can register services to be flushed with a reactor, you can register for repeating jobs with the reactor, and you can coordinate callbacks with the reactor. Thereactor has a process method that needs to be periodically called during idle times, when batch limits (queue is full) are met and when the queue is empty. We do that by calling the process method as follows:

SolrServiceEndpoint using a reactor object to manage callbacks and flushes

@RequestMapping(value = "/storage/solr", method = RequestMethod.ALL)
public class SolrServiceEndpoint {


    private final SolrService solrService;
    private final Reactor reactor;

    public SolrServiceEndpoint(final SolrService solrService) {
        this.solrService = solrService;
        reactor = ReactorBuilder.reactorBuilder().build();
        reactor.addServiceToFlush(solrService);

    }

    @OnEvent(IngestionService.NEW_EVENT_CHANNEL)
    public void storeEvent(final Event event) {
        solrService.storeEvent(event);
    }

    @OnEvent(IngestionService.NEW_TIMESERIES_CHANNEL)
    public void storeTimeSeries(final TimeSeries timeSeries) {
        solrService.storeTimeSeries(timeSeries);
    }


    /**
     * Proxy the request to solr
     *
     * @param queryParams
     * @return
     */
    @RequestMapping(value = "/get", method = RequestMethod.GET)
    public void get(final Callback<String> callback, final @RequestParam(value = "q", required = true) String queryParams) {
        solrService.get(callback, queryParams);
    }


    @QueueCallback({QueueCallbackType.EMPTY, QueueCallbackType.IDLE, QueueCallbackType.LIMIT})
    public void process() {
        reactor.process();
    }
}
Notice that the process method of SolrServiceEndpoint uses the QueueCallBackannotation and enums (@QueueCallback({QueueCallbackType.EMPTY, QueueCallbackType.IDLE, QueueCallbackType.LIMIT}), and then all it does it callreactor.process. In the constructor, we registered the solrService service proxy with the reactor.

Registering the solrService with the reactor

 public SolrServiceEndpoint(final SolrService solrService) {
        this.solrService = solrService;
        reactor = ReactorBuilder.reactorBuilder().build();
        reactor.addServiceToFlush(solrService);

    }
JMeter vs. Gatling: Fact Checking: SHILL! ASTROTURFING SHILL!