Rick

Rick
Rick

Sunday, May 22, 2016

Understanding the QBit microservices lib's serviceEndpointServer

The ServiceEndpointServer essentially exposes a ServiceBundle to WebSocket and REST remote calls. This document is using the Todo example from the discussion of ServiceQueue and the ServiceBundle.
In fact, you can use ServiceEndpointServer very similar to the way we used ServiceBundle.

Creating a serviceEndpointServer

import io.advantageous.qbit.server.EndpointServerBuilder;
import io.advantageous.qbit.server.ServiceEndpointServer;
import static io.advantageous.qbit.server.EndpointServerBuilder.endpointServerBuilder;
...

        /* Create the serviceBundleBuilder. */
        final EndpointServerBuilder endpointServerBuilder = 
                                 endpointServerBuilder();

        endpointServerBuilder.addService(auditorAddress, 
                                          new AuditorImpl());


        /* Create the service endpoint server. */
        serviceEndpointServer = endpointServerBuilder.build();
We use a EndpointServerBuilder to build a serviceEndpointServer. You can add services to the builder or you can add them directly to the serviceEndpointServer.
Note you can use EndpointServerBuilder but most examples will use theManagedServiceBuilder which has the benefit of wiring the services it creates into the microservice health check system and the microservice statistics/monitoring/distributed MDC logging systems that QBit provides.
The serviceEndpointServer has a serviceBundle.

Using serviceEndpointServer's serviceBundle

        /* Create a service client proxy for the auditor. */
        auditor = serviceEndpointServer.serviceBundle()
                 .createLocalProxy(Auditor.class, auditorAddress);

        /* Create a todo manager and pass the 
            client proxy of the auditor to it. */
        final TodoManagerImpl todoManager = 
                                   new TodoManagerImpl(auditor);

        // Add the todoManager to the serviceBundle.
        serviceEndpointServer.serviceBundle()
                .addServiceObject(todoAddress, todoManager);

        /* Create a client proxy to communicate 
           with the service actor. */
        client = serviceEndpointServer.serviceBundle()
          .createLocalProxy(TodoManagerClient.class, 
          todoAddress);
Note if we wanted to hide access to the auditor, we could put the auditor in anotherserviceQueue or serviceBundle that was not accessible to WebSocket or REST.
We can use the proxy client just like we did before. We can create a local microservice actor proxy client. The only real difference is that auto flush is built into serviceEndpointServer and notserviceBundle.

Example of making local calls to the TodoService

        /* A list of promises for things we want to do all at once. */
        final List<Promise<Boolean>> promises = new ArrayList<>(3);
        final CountDownLatch latch = new CountDownLatch(1);
        final AtomicBoolean success = new AtomicBoolean();


        /** Add a todoItem to the client add method */
        final Todo todo = new Todo("write", "Write tutorial", timer.time());
        final Promise<Boolean> promise
                = client.add(todo);
        promises.add(promise);

        /** Add two more. */
        promises.add(client.add(new Todo("callMom", "Call Mom", timer.time())));
        promises.add(client.add(new Todo("callSis", "Call Sister", timer.time())));

        /** Now async wait for them all to come back. */
        Promises.all(promises).then(done -> {
            success.set(true);
            latch.countDown();
        }).catchError(e -> {
            success.set(false);
            latch.countDown();
        });

        /** Invoke the promises. */
        promises.forEach(Promise::invoke);


        /** They are all going to come back async. */
        latch.await();
        assertTrue(success.get());
Ok. Up until this point, nothing is really different than before. The TodoManagerImpl is now accessible via REST and WebSocket.

Using TodoManager service over WebSocket

import io.advantageous.qbit.client.Client;
import io.advantageous.qbit.client.ClientBuilder;

...
        //REMOVE THIS Create a client proxy to communicate with the service actor.
        //REMOVE client = serviceEndpointServer.serviceBundle()
        //REMOVE    .createLocalProxy(TodoManagerClient.class, todoAddress);

        /* Start the service endpoint server 
                    and wait until it starts. */
        serviceEndpointServer.startServerAndWait();



        /* Create the WebSocket Client Builder. */
        final ClientBuilder clientBuilder = ClientBuilder.clientBuilder();

        /** Build the webSocketClient. */
        webSocketClient = clientBuilder.setHost("localhost")
                                        .setPort(8080)
                                        .build();

        /* Create a REMOTE client proxy to communicate with the service actor. */
        client = webSocketClient.createProxy(TodoManagerClient.class, todoAddress);

        /* Start the remote client. */
        webSocketClient.start();

        ...


    @After
    public void tearDown() throws Exception{
        Thread.sleep(100);
        serviceEndpointServer.stop(); //stop the server
        webSocketClient.stop(); //stop the client
    }
The client like the service endpoint server also auto-flushes. You can use the remote client (remote microservice client proxy) just like before (when we showed the local microservice client proxy).

Remote client gets used just like the local client.

        /* A list of promises for things we want to do all at once. */
        final List<Promise<Boolean>> promises = new ArrayList<>(3);
        final CountDownLatch latch = new CountDownLatch(1);
        final AtomicBoolean success = new AtomicBoolean();


        /** Add a todoItem to the client add method */
        final Todo todo = new Todo("write", "Write tutorial", timer.time());
        final Promise<Boolean> promise
                = client.add(todo);
        promises.add(promise);

        /** Add two more. */
        promises.add(client.add(new Todo("callMom", "Call Mom", timer.time())));
        promises.add(client.add(new Todo("callSis", "Call Sister", timer.time())));

        /** Now async wait for them all to come back. */
        Promises.all(promises).then(done -> {
            success.set(true);
            latch.countDown();
        }).catchError(e -> {
            success.set(false);
            latch.countDown();
        });

        /** Invoke the promises. */
        promises.forEach(Promise::invoke);


        /** They are all going to come back async. */
        latch.await();
        assertTrue(success.get());
To expose the TodoManagerImpl to REST, we will define a main method to start the server. Then we will add @RequestMapping@POST@PUT@DELETE/@RequestParam, and @GET.

Adding @RequestMapping@POST@PUT,@DELETE/@RequestParam, and @GET

package com.mammatustech.todo;
...
import io.advantageous.qbit.annotation.*;
import io.advantageous.qbit.annotation.http.DELETE;
import io.advantageous.qbit.annotation.http.GET;
import io.advantageous.qbit.annotation.http.PUT;
import io.advantageous.qbit.reactive.Callback;
...
@RequestMapping("/todo-service")
public class TodoManagerImpl {

    private final Map<String, Todo> todoMap = new TreeMap<>();
    private final Auditor auditor;

    public TodoManagerImpl(final Auditor auditor) {
        this.auditor = auditor;
    }


    @GET("/todo/count")
    public int size() {
        return todoMap.size();
    }



    @PUT("/todo/")
    public void add(final Callback<Boolean> callback, final Todo todo) {
        todoMap.put(todo.getId(), todo);
        auditor.audit("add", "added new todo");
        callback.resolve(true);
    }

    @DELETE("/todo/")
    public void remove(final Callback<Boolean> callback,
                       @RequestParam("id") final String id) {
        final Todo removed = todoMap.remove(id);

        auditor.audit("add", "removed new todo");
        callback.resolve(removed != null);
    }

    @GET("/todo/")
    public void list(final Callback<List<Todo>> callback) {
        auditor.audit("list", "auditor added");
        callback.accept(new ArrayList<>(todoMap.values()));
    }
...
}
The main method just creates the microservices and starts the server.

Main method to start the service

package com.mammatustech.todo;

import io.advantageous.qbit.server.EndpointServerBuilder;
import io.advantageous.qbit.server.ServiceEndpointServer;

import static io.advantageous.qbit.server.EndpointServerBuilder.endpointServerBuilder;

public class TodoServiceMain {

    public static void main(final String... args) {


        /** Object address to the auditorService service actor. */
        final String auditorAddress = "auditorService";


        /* Create the serviceBundleBuilder. */
        final EndpointServerBuilder endpointServerBuilder = 
                                     endpointServerBuilder();

        endpointServerBuilder.setPort(8080).setUri("/");

        endpointServerBuilder.addService(auditorAddress, 
                                      new AuditorImpl());


        /* Create the service server. */
        final ServiceEndpointServer serviceEndpointServer = 
                               endpointServerBuilder.build();


        /* Create a service client proxy for the auditor. */
        final Auditor auditor = serviceEndpointServer
                .serviceBundle()
                .createLocalProxy(Auditor.class, auditorAddress);

        /* Create a todo manager and pass 
              the client proxy of the auditor to it. */
        final TodoManagerImpl todoManager = 
                          new TodoManagerImpl(auditor);

        // Add the todoManager to the serviceBundle.
        serviceEndpointServer.addService(todoManager);

        /* Start the service endpoint server 
             and wait until it starts. */
        serviceEndpointServer.startServerAndWait();

        System.out.println("Started");
    }

}
No RESTful microservice is proven to be RESTful without some curl script.

curl accessing service

echo "Todo item list before "
curl http://localhost:8080/todo-service/todo/
echo

echo "Count of Todo items "
curl http://localhost:8080/todo-service/todo/count
echo

echo "PUT a TODO item"
curl -X PUT http://localhost:8080/todo-service/todo/ \
-H 'Content-Type: application/json' \
-d '{"name":"wash-car", "description":"Take the car to the car wash", "createTime":1463950095000}'
echo


echo "Todo item list after add "
curl http://localhost:8080/todo-service/todo/
echo

echo "Count of Todo items after add "
curl http://localhost:8080/todo-service/todo/count
echo

echo "Remove a TODO item"
curl -X DELETE http://localhost:8080/todo-service/todo/?id=wash-car::1463950095000
echo


echo "Todo item list after add "
curl http://localhost:8080/todo-service/todo/
echo

echo "Count of Todo items after add "
curl http://localhost:8080/todo-service/todo/count
echo

$ ./curl-test.sh 
Todo item list before 
[]
Count of Todo items 
0
PUT a TODO item
true
Todo item list after add 
[{"name":"wash-car","description":"Take the car to the car wash","createTime":1463950095000,"id":"wash-car::1463950095000"}]
Count of Todo items after add 
1
Remove a TODO item
true
Todo item list after add 
[]
Count of Todo items after add 
0
For completeness, here is the build file.

Build file

group 'qbit-ex'
version '1.0-SNAPSHOT'

apply plugin: 'java'


apply plugin: 'application'


mainClassName = "com.mammatustech.todo.TodoServiceMain"


compileJava {
    sourceCompatibility = 1.8
}

repositories {
    mavenCentral()
    mavenLocal()
}

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.11'
    compile 'io.advantageous.qbit:qbit-vertx:1.9.1'
    compile 'io.advantageous.qbit:qbit-admin:1.9.1'
    compile 'io.advantageous.reakt:reakt:2.8.15'
}

Conclusion

ServiceEndpointServer exposes a ServiceBundle as a remote accessible microservice whose methods can be invoked over WebSocket and HTTP/REST. Remote proxies can be created with QBit Client/ClientBuilder. The ServiceEndpointServer and the Client are both auto flushing (interval duration of flush is configurable from their respective builders).
To learn more about QBit and REST see Restful QBit tutorial and Resourceful RESTful Microservices tutorial.
To learn about the ManagedServiceBuilder please read QBit Batteries included which covers health, stats and microservice monitoring. The QBit batteries included also covers using QBit with Swagger. QBit can generate swagger JSON from all of its services which you can then use to generate clients for other platforms.

Sunday, May 15, 2016

Understanding the QBit microervices lib's serviceBundle

Understanding the serviceBundle

The serviceBundle is a collection of services sitting behind serviceQueue's. You use aserviceBundle when you want to share a response queue and a response queue thread. TheserviceBundle can also share the same thread for the request queue but that is not the default. The ServiceEndpointServer which is used to expose service actors as remotemicroservices via REST and WebSocket uses the serviceBundle.
The serviceBundle is also used to add other forms of services, like service pools, and sharded services.
Let's walk through an example. We will use the Todo example that we used for serviceQueue's. Since we are covering ServiceBundle, we will add another service called Auditor and its implementation called AuditorImpl. We will change the TodoManagerImpl to use the Auditor.
Let's review our Todo example. The Todo example, has a TodoManagerClient interface.

TodoManagerClient

package com.mammatustech.todo;

import io.advantageous.reakt.promise.Promise;
import java.util.List;

public interface TodoManagerClient {
    Promise<Boolean> add(Todo todo);
    Promise<Boolean> remove(String id);
    Promise<List<Todo>> list();
}
This is the interface we will use to invoke async methods.
To this we will add a new service called Auditor.

Auditor

package com.mammatustech.todo;

interface Auditor {
    void audit(final String operation, final String log);
}
We will keep the implementation simple so we can focus on QBit and the serviceBundle.

AuditorImpl

package com.mammatustech.todo;

public class AuditorImpl implements Auditor {

    public void audit(final String operation, final String log) {

        System.out.printf("operations %s, message %s log\n", 
                                       operation, log);
    }
}
Now to mix things up a bit and since we are talking about a serviceBundle, we will pass anAuditor instance to the constructor of the TodoManagerImpl.

AuditorImpl

package com.mammatustech.todo;

import io.advantageous.qbit.annotation.QueueCallback;
import io.advantageous.qbit.annotation.QueueCallbackType;
import io.advantageous.qbit.reactive.Callback;

import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;

public class TodoManagerImpl {

    private final Map<String, Todo> todoMap = new TreeMap<>();
    private final Auditor auditor;

    public TodoManagerImpl(final Auditor auditor) {
        this.auditor = auditor;
    }

    public void add(final Callback<Boolean> callback, 
                                        final Todo todo) {
        todoMap.put(todo.getId(), todo);
        auditor.audit("add", "added new todo");
        callback.resolve(true);
    }

    public void remove(final Callback<Boolean> callback, 
                                        final String id) {
        final Todo removed = todoMap.remove(id);

        auditor.audit("add", "removed new todo");
        callback.resolve(removed != null);
    }

    public void list(final Callback<ArrayList<Todo>> callback) {
        auditor.audit("list", "auditor added");
        callback.accept(new ArrayList<>(todoMap.values()));
    }

    @QueueCallback({QueueCallbackType.LIMIT,
                    QueueCallbackType.EMPTY,
                    QueueCallbackType.IDLE})
    public void process() {
        flushServiceProxy(auditor);
    }
...
}
Note that the addremovelist all use the auditor instance. Unlike the serviceQueuethere is no auto flush feature. This is typically because serviceBundless contain manyserviceQueues. If you wanted to get auto-flush going with a serviceQueue in a bundle, then you add the serviceQueue to the bundle or you look up the serviceQueue from the bundle and then use the serviceQueue to create the auto flush client proxy. This is usually not needed as manually flushing at the right time is better for thread hand off performance and IO performance. QBit uses micro-batching to optimize sending operations to other local and remote service actors.

QueueCallbacks

Since the TodoManagerImpl is using another service actor, we will flush operations to that actor when the processing queue for the TodoManagerImpl is idle, empty or reached its limit.

TodoManager using QueueCallbacks

package com.mammatustech.todo;
...
import io.advantageous.qbit.annotation.QueueCallback;
import io.advantageous.qbit.annotation.QueueCallbackType;

import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;

public class TodoManagerImpl {
...

    @QueueCallback({QueueCallbackType.LIMIT,
                    QueueCallbackType.EMPTY,
                    QueueCallbackType.IDLE})
    public void process() {
        flushServiceProxy(auditor);
    }
...
You can do this with annotaitons. (You can also do this without using annotations, which will show later.). The above @QueueCallback annotation says if the processing queue is empty (QueueCallbackType.EMPTY, no more requests or events in the queue), or if the request processing queue is idle (QueueCallbackType.IDLE, not busy at all), or if we have hit the queue limit (QueueCallbackType.LIMIT can only happen under heavy load or if you set the limit very low). A queue limit of ten would have ten times less thread handoff time than a queue limit of size 1 (under heavy load). If the auditor were a remote service, having a larger batch size than 1 would save on the cost of the IO operations.
You can turn off micro-batching by setting the processing queue to 1.
Later when we introduce the Reactor you can set up a reoccurring job that fires every 10ms or 100ms to flush collaborating services like the auditor.
You can use QueueCallbacks with any serviceQueue and with any serviceBundle.
There are other QueueCallbacks to get notified with the services has shutdown and when it has started.

QueueCallback for init and shutdown

public class TodoManagerImpl {
...

    @QueueCallback({QueueCallbackType.INIT})
    public void init() {
        auditor.audit("init", "init service");
    }

    @QueueCallback({QueueCallbackType.SHUTDOWN})
    public void shutdown() {
        System.out.println("operation shutdown, shutdown service");
        flushServiceProxy(auditor);
    }
The init operation would get called once when the serviceQueue for the microservice actorstarts up. The shutdown operation would get called once when the when the microservice actorshuts down.
Let's create a serviceBundle and add the auditor and todoManager services to it, and run them.

Using the service bundle with the auditor and todoManager services

    /** Object address to the todoManagerImpl service actor. */
    private final String todoAddress = "todoService";
    /** Object address to the auditorService service actor. */
    private final String auditorAddress = "auditorService";
    /** Service Bundle */
    private ServiceBundle serviceBundle;
    /** Client service proxy to the todoManager */
    private TodoManagerClient client;
    /** Client service proxy to the auditor. */
    private Auditor auditor;

            /* Create the serviceBundleBuilder. */
        final ServiceBundleBuilder serviceBundleBuilder = serviceBundleBuilder();

        /* Create the service bundle. */
        serviceBundle = serviceBundleBuilder.build();

        /* Add the AuditorImpl instance to the serviceBundle. */
        serviceBundle.addServiceObject(auditorAddress, new AuditorImpl());

        /* Create a service client proxy for the auditor. */
        auditor = serviceBundle.createLocalProxy(Auditor.class, auditorAddress);

        /* Create a todo manager and pass the 
            client proxy of the auditor to it. */
        final TodoManagerImpl todoManager = new TodoManagerImpl(auditor);

        // Add the todoManager to the serviceBundle.
        serviceBundle
                .addServiceObject(todoAddress, todoManager);

        /* Create a client proxy to communicate 
            with the service actor. */
        client = serviceBundle
              .createLocalProxy(TodoManagerClient.class, 
                                             todoAddress);

        // Start the service bundle.
        serviceBundle.start();
Above we create the serviceBundleBuilder which can be used to the response and request queue size, types, batch size, and more. Then we create the serviceBundle. Next we add theauditor microservice actor to the serviceBundle under the address specified byauditorAddress. Next we create a service client proxy for the auditor microservice actor that we can pass to the TodoManagerImpl. We then add the TodoManagerImpl to form themicroservice actor for the TodoManager Service. Next we create a client of the TodoManagerService to test with. Then we start the serviceBundle.
 To use the `todoManager` service proxy client aka `client`, the code is much like it was before with the `serviceQueue` example except now we will flush (since by default the queue batch size is greater than 1). 

Using the todoManager microservice client proxy

        final Promise<Boolean> promise = Promises.blockingPromiseBoolean();

        // Add the todo item.
        client.add(new Todo("write", "Write tutorial", timer.time()))
                .invokeWithPromise(promise);
        flushServiceProxy(client);


        assertTrue("The call was successful", promise.success());
        assertTrue("The return from the add call", promise.get());

        final Promise<List<Todo>> promiseList = Promises.blockingPromiseList(Todo.class);

        // Get a list of todo items.
        client.list().invokeWithPromise(promiseList);

        // Call flush since this is not an auto-flush. */
        flushServiceProxy(client);


        // See if the Todo item we created is in the listing.
        final List<Todo> todoList = promiseList.get().stream()
                .filter(todo -> todo.getName().equals("write")
                        && todo.getDescription().equals("Write tutorial")).collect(Collectors.toList());

        // Make sure we found it.
        assertEquals("Make sure there is one", 1, todoList.size());


        // Remove promise
        final Promise<Boolean> removePromise = 
                      Promises.blockingPromiseBoolean();
        client.remove(todoList.get(0).getId())
                 .invokeWithPromise(removePromise);
        flushServiceProxy(client);


        final Promise<List<Todo>> promiseList2 = 
                Promises.blockingPromiseList(Todo.class);

        // Make sure it is removed.
        client.list().invokeWithPromise(promiseList2);
        flushServiceProxy(client);

        // See if the Todo item we created is removed.
        final List<Todo> todoList2 = promiseList2.get().stream()
                .filter(todo -> todo.getName().equals("write")
                        && todo.getDescription()
                         .equals("Write tutorial"))
                         .collect(Collectors.toList());

        // Make sure we don't find it.
        assertEquals("Make sure there is one",  
                             0, todoList2.size());

        flushServiceProxy(client);
We can also repeat the async example were we executed more than one operation at a time.

Making async calls and coordinating with Promises

        /* A list of promises for things we 
          want to do all at once. */
        final List<Promise<Boolean>> promises = 
                                      new ArrayList<>(3);
        final CountDownLatch latch = new CountDownLatch(1);
        final AtomicBoolean success = new AtomicBoolean();


        /** Add a todoItem to the client add method */
        final Todo todo = new Todo("write", "Write tutorial", 
                                           timer.time());
        final Promise<Boolean> promise
                = client.add(todo);
        promises.add(promise);

        /** Add two more. */
        promises.add(client.add(new Todo("callMom", 
                                "Call Mom", timer.time())));
        promises.add(client.add(new Todo("callSis", 
                                "Call Sister", timer.time())));

        /** Now async wait for them all to come back. */
        Promises.all(promises).then(done -> {
            success.set(true);
            latch.countDown();
        }).catchError(e -> {
            success.set(false);
            latch.countDown();
        });

        /** Invoke the promises. */
        promises.forEach(Promise::invoke);
        flushServiceProxy(client);


        /** They are all going to come back async. */
        latch.await();
        assertTrue(success.get());
Please note that you can explicitly flush an client microservice proxy, it will also flush if you go over the limit for the request queue, or you can set the batch size to 1.
Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training