Rick

Rick
Rick

Thursday, May 12, 2016

Understanding the QBit microservices lib's serviceQueue

QBit is made up of queues. There are request queues, response queues and event queues.
serviceQueue is a set of three queues, namely requests (methodCalls), responses and events. The serviceQueue turns an ordinary POJO (plain old Java Object) into a Service Actor. AserviceQueue is building block of QBit.
  • serviceQueue turns a POJO into a Service Actor
  • serviceBundle groups serviceQueues under different addresses, shares a response queue, allows for service pools, serviceSharding, etc.
  • serviceServer exposes a serviceBundle to REST and WebSocket RPC.
QBit allows you to adapt POJOs to become Service Actors. A Service Actor is a form of an active object. Method calls to a Service Actor are delivered asynchronously, and handled on one thread which can handle tens of millions or more method calls per second. Let's demonstrate by creating a simple POJO and turning it into a Service Actor.

Associating POJO with serviceQueue to make a service actor

ServiceQueue serviceQueue;
    ...

        // Create a serviceQueue with a serviceBuilder.
        final ServiceBuilder serviceBuilder = serviceBuilder();

        //Start the serviceQueue.
        serviceQueue = serviceBuilder
                .setServiceObject(new TodoManagerImpl())
                .buildAndStartAll();
The above code registers the POJO TodoManagerImpl with a serviceQueue by using the method serviceBuilder.setServiceObject. The serviceQueue is started by thebuildAndStartAll method of ServiceBuilder.
ServiceQueue is an interface (io.advantageous.qbit.service.ServiceQueue). TheServiceQueue is created with a ServiceBuilder(io.advantageous.qbit.service.ServiceBuilder). You create a Service Actor by associating a POJO with a serviceQueue. You make this association between the serviceQueue and your service POJO with the `ServiceBuilder.
Once started the serviceQueue can handle method calls on behalf of the TodoManagerImpl and recieve events and deliver them to TodoManagerImplTodoManagerImpl can sit behind theserviceQueue. If you only access TodoManagerImpl POJO service from a serviceQueue then it will only ever be accessed by one thread. TodoManagerImpl can handle tens of millions of calls per second, and all of those calls will be thread safe. Here is a simple example of a POJO that we will expose as a Service Actor.

Implementation

package com.mammatustech.todo;

import io.advantageous.qbit.reactive.Callback;

import java.util.ArrayList;
import java.util.Map;
import java.util.TreeMap;

public class TodoManagerImpl {

    private final Map<String, Todo> todoMap = new TreeMap<>();

    public TodoManagerImpl() {
    }

    public void add(final Callback<Boolean> callback, final Todo todo) {
        todoMap.put(todo.getId(), todo);
        callback.resolve(true);
    }

    public void remove(final Callback<Boolean> callback, final String id) {
        final Todo removed = todoMap.remove(id);
        callback.resolve(removed != null);
    }

    public void list(final Callback<ArrayList<Todo>> callback) {
        callback.resolve(new ArrayList<>(todoMap.values()));
    }
}
Notice that this example does not return values, instead it uses the callback to send a response back to the client. A call to callback.resolve(someValue) will send that value to theresponseQueue. Method calls come in on the requestQueue. The responses go out on the theresponseQueue. Let's explore this concept.
The serviceQueue has the following interface.

Partial code listing of serviceQueue showing queues

/**
 * Manages a service that sits behind a queue.
 * created by Richard on 7/21/14.
 *
 * @author rhightower
 */
public interface ServiceQueue extends ... {
...

    Object service();
    SendQueue<MethodCall<Object>> requests();
    SendQueue<Event<Object>> events();
    ReceiveQueue<Response<Object>> responses();
    ...
These methods are not typically accessed. They are for integration and internal usage but they can help you understand QBit microservices a bit better.
You can access the POJO that the serviceQueue is wrapping with service(). You can send method calls directly to the serviceQueue by using the requests() method to get asendQueue (SendQueue<MethodCall<Object>>). You can send events directly to theserviceQueue by using the events() method to get a sendQueue. Note that the sendQueueyou receive will not be thread safe (they implement micro-batching), so each thread will need to get its own copy of an event or methodCall (request) sendQueue. A sendQueue is the client's view of the queue.
On the receiver side (service side) events and methodCalls queues are handled by the same thread so that all events and methodCalls go to the POJO (e.g., TodoManagerImpl) on the same thread. This is what makes that POJO a Service Actor (active object).
Typically to make calls to a Service Actor, you use a service client proxy, which is just an interface. The service client proxy can return Promises or take a Callback as the first or last argument of the method. A promise is a deferred result that you can handle asynchronously. ThePromise interface is similar to ES6 promises.

Service Client Proxy interface

package com.mammatustech.todo;

import io.advantageous.reakt.promise.Promise;
import java.util.List;


public interface TodoManagerClient {
    Promise<Boolean> add(Todo todo);
    Promise<Boolean> remove(String id);
    Promise<List<Todo>> list();
}

Todo POJO to store a Todo item

package com.mammatustech.todo;

public class Todo {

    private final String name;
    private final String description;
    private final long createTime;
    private String id;

    public Todo(String name, String description, long createTime) {
       ...
    }

    //normal getters, equals, hashCode 
}
To create an use a service client proxy you use the serviceQueue.

Creating and using a service client proxy

    TodoManagerClient client;
    ServiceQueue serviceQueue;


        //Create a client proxy to communicate with the service actor.
        client = serviceQueue
              .createProxyWithAutoFlush(TodoManagerClient.class, 
                Duration.milliseconds(5));


//Add an item
        final Promise<Boolean> promise = Promises.blockingPromiseBoolean();

        // Add the todo item.
        client.add(new Todo("write", "Write tutorial", timer.time()))
                .invokeWithPromise(promise);


        assertTrue("The call was successful", promise.success());
        assertTrue("The return from the add call", promise.get());

//Get a list of items

        final Promise<List<Todo>> promiseList = Promises.blockingPromiseList(Todo.class);

        // Get a list of todo items.
        client.list().invokeWithPromise(promiseList);

        // See if the Todo item we created is in the listing.
        final List<Todo> todoList = 
                          promiseList.get().stream()...


//Remove an item
        // Remove promise
        final Promise<Boolean> removePromise = 
                           Promises.blockingPromiseBoolean();
        client.remove(todo.getId())
                     .invokeWithPromise(removePromise);
Note Blocking Promises are great for testing and integration but not something you typically use in your reactive microserivce (sot of defeats the whole purpose).
Here is a simple unit test showing what we have done and talked about so far, after this let's show a non-blocking example and some call coordination.

Unit test to show it is working

package com.mammatustech.todo;

import io.advantageous.qbit.service.ServiceBuilder;
import io.advantageous.qbit.service.ServiceQueue;
import io.advantageous.qbit.time.Duration;
import io.advantageous.qbit.util.Timer;
import io.advantageous.reakt.promise.Promise;
import io.advantageous.reakt.promise.Promises;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;


import java.util.List;
import java.util.stream.Collectors;

import static io.advantageous.qbit.service.ServiceBuilder.serviceBuilder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class TodoManagerImplTest {

    TodoManagerClient client;
    ServiceQueue serviceQueue;
    final Timer timer = Timer.timer();

    @Before
    public void setup() {

        // Create a serviceQueue with a serviceBuilder.
        final ServiceBuilder serviceBuilder = serviceBuilder();

        //Start the serviceQueue.
        serviceQueue = serviceBuilder
                .setServiceObject(new TodoManagerImpl())
                .buildAndStartAll();

        //Create a client proxy to communicate with the service actor.
        client = serviceQueue.createProxyWithAutoFlush(TodoManagerClient.class, Duration.milliseconds(5));

    }

    @Test
    public void test() throws Exception {
        final Promise<Boolean> promise = Promises.blockingPromiseBoolean();

        // Add the todo item.
        client.add(new Todo("write", "Write tutorial", timer.time()))
                .invokeWithPromise(promise);


        assertTrue("The call was successful", promise.success());
        assertTrue("The return from the add call", promise.get());

        final Promise<List<Todo>> promiseList = Promises.blockingPromiseList(Todo.class);

        // Get a list of todo items.
        client.list().invokeWithPromise(promiseList);

        // See if the Todo item we created is in the listing.
        final List<Todo> todoList = promiseList.get().stream()
                .filter(todo -> todo.getName().equals("write")
                        && todo.getDescription().equals("Write tutorial")).collect(Collectors.toList());

        // Make sure we found it.
        assertEquals("Make sure there is one", 1, todoList.size());


        // Remove promise
        final Promise<Boolean> removePromise = Promises.blockingPromiseBoolean();
        client.remove(todoList.get(0).getId()).invokeWithPromise(removePromise);



        final Promise<List<Todo>> promiseList2 = Promises.blockingPromiseList(Todo.class);

        // Make sure it is removed.
        client.list().invokeWithPromise(promiseList2);

        // See if the Todo item we created is removed.
        final List<Todo> todoList2 = promiseList2.get().stream()
                .filter(todo -> todo.getName().equals("write")
                        && todo.getDescription().equals("Write tutorial")).collect(Collectors.toList());

        // Make sure we don't find it.
        assertEquals("Make sure there is one", 0, todoList2.size());

    }

    @After
    public void tearDown() {
        serviceQueue.stop();
    }


}
You can find this source code at this github repo.
Here is a build file for the example so you can see the dependencies.

Build file build.gradle

group 'qbit-ex'
version '1.0-SNAPSHOT'

apply plugin: 'java'


apply plugin: 'application'


mainClassName = "com.mammatustech.todo.TodoServiceMain"


compileJava {
    sourceCompatibility = 1.8
}

repositories {
    mavenCentral()
    mavenLocal()
}

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.11'
    compile 'io.advantageous.qbit:qbit-vertx:1.8.3'
    compile 'io.advantageous.qbit:qbit-admin:1.8.3'
}

Executing a bunch of methods at once

We can execute a bunch of methods at once and use Promises.all to do the next item when they all succeed or Promises.any to something when any of them succeed.

Executing many methods on a service proxy at once

    @Test
    public void testUsingAll() throws Exception {

        /* A list of promises for things we want to do all at once. */
        final List<Promise<Boolean>> promises = new ArrayList<>(3);
        final CountDownLatch latch = new CountDownLatch(1);
        final AtomicBoolean success = new AtomicBoolean();


        /** Add a todoItem to the client add method */
        final Todo todo = new Todo("write", "Write tutorial", timer.time());
        final Promise<Boolean> promise
                = client.add(todo);
        promises.add(promise);

        /** Add two more. */
        promises.add(client.add(new Todo("callMom", "Call Mom", timer.time())));
        promises.add(client.add(new Todo("callSis", "Call Sister", timer.time())));

        /** Now async wait for them all to come back. */
        Promises.all(promises).then(done -> {
            success.set(true);
            latch.countDown();
        }).catchError(e-> {
            success.set(false);
            latch.countDown();
        });

        /** Invoke the promises. */
        promises.forEach(Promise::invoke);

        /** They are all going to come back async. */
        latch.await();
        assertTrue(success.get());
    }

Thread model

The serviceQueue can be started and stopped. There are several options to start aserviceQueue. You can start it with two threads, one thread for response handling and another thread for request/event handling (startAll()). You can start the serviceQueue with just the request/event handling thread (start()). You can also start it with one thread managing request/event and responses. Caution must be exercised with the last way since if a callback or promise blocks then your serviceQueue will be blocked. Typically you use startAll or you use a serviceBundle where one response queue is shared with many serviceQueues. TheserviceQueue was meant to be composable so you can access the queues and provide your own thread model if needed or desired.

Exception Handling

Typically you handle a exception from a Service Actor by calling callback.reject(exception)to pass the exception downstream to the client or you catch it and handle it in whatever way makes sense. If you do not catch an exception then the thread for your Service Actor will terminate. However, QBit will log the exception that you did not handle and restart a new thread to manage your Service Actor.

Handling calls to other Service Actor

In the QBit microservice lib it is common to call other async services, remote Service Actors, REST services, and async NoSQL database drivers. If you Service Actor is stateful (which is common with high-speed services), then you will want to do use a Reactor. There is theReactor that comes with QBit which is EOL (since we are replacing it with the one we wrote forReakt), and then there is the Reactor that comes Reakt. The serviceQueue allows events/method calls to all come to the Service Actor on one thread. The reactor is a way to also allow method call callbacks to happen on the same thread, and since the callbacks happen on the same thread as the Service Actor access to the Service Actors data (fields, collaborating objects, etc.) are also thread safe. You only need to use a Reactor if you want to handle callback on the same thread as the Service Actor, which is not always needed. You can also use theReactor to handle streaming data on the same thread as the Service Actor. The Reactor can also be used for scheduling async tasks or just scheduling a task to be run on the Service Actoras soon as possible.

Getting notified when you start, stop, etc.

You can get notified of different Service Actor lifecycle events like started, stopped, when the micro batch limit was met, when the request queue is empty, and more. These lifecycle events allow you to do thing in batches and thus effectively pass data from one service to another (both remote and local). The reactor for example has a process method that is usually called when the request queue has reached a limit or is empty. There are two ways to do this. You can use aQueueCallbackHandler with a ServiceBuilder (or ServiceBundle) or you can use the annotation@QueueCallback.

Admin package

The Admin package adds Consul discovery, and StatsD support to QBit microservices, and provides a simplified builder for creating a set of managed services which you can easily expose via REST or WebSocket RPC.
It is quite easy to build bridges into the QBit world and we have done so via Kafka, Vert.x event bus and even JMS. QBit was meant to be composeable so you can pick your messaging platform and plug QBit into it.
Two main packages of note in the QBit admin packages are the ManagedServiceBuilder and theServiceManagementBundle. The ManagedServiceBuilder gives you access to building a group of services and then easily wiring them to the same health monitor, discovery system and metrics/stats system. Whilst the ServiceManagementBundle allows services to interact with common QBit services like stats, health and discovery.
Let's show some simple examples using these that we will continue on in our discussion of theServiceBundle and the ServiceEndpointServer.

No comments:

Post a Comment

Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training