Rick

Rick
Rick

Monday, November 30, 2015

CallbackBuilder and generics for Reactive Java Microservices

The CallbackBuilder is used to create callbacks. Callbacks have error handlers, timeout handlers and return handlers.

Setting up error handlers, timeout handlers and callback handlers with a callback builder.

                callbackBuilder
                .setCallback(ResultSet.class, resultSet -> 
                                             statusCallback.accept(resultSet!=null))
                .setOnTimeout(() -> statusCallback.accept(false))
                .setOnError(error -> statusCallback.onError(error))
                .build(ResultSet.class);

        this.addEventStorageRecordAsync(callbackBuilder.build(), storageRec);
The CallbackBuilder has many helper methods to help you with dealing with common Java types like OptionalMapListCollectionSetString, primitive types and wrappers.
This allows you to quickly build callbacks without navigating the complexity of Generics. Let's cover a small example.
First let's define a basic service that uses lists, maps and optional.

Basic service to drive the example

package io.advantageous.qbit.example.callback;


import io.advantageous.boon.core.Lists;
import io.advantageous.boon.core.Maps;
import io.advantageous.qbit.reactive.Callback;

import java.util.List;
import java.util.Map;
import java.util.Optional;

public class EmployeeServiceImpl implements EmployeeService {

    @Override
    public void getEmployeesAsMap(final Callback<Map<String, Employee>> empMapCallback) {

        empMapCallback.returnThis(Maps.map("rick", new Employee("Rick")));
    }

    @Override
    public void getEmployeesAsList(final Callback<List<Employee>> empListCallback) {

        empListCallback.returnThis(Lists.list(new Employee("Rick")));
    }


    @Override
    public void findEmployeeByName(final Callback<Optional<Employee>> employeeCallback,
                                   final String name) {

        if (name.equals("Rick")) {
            employeeCallback.returnThis(Optional.of(new Employee("Rick")));
        } else {
            employeeCallback.returnThis(Optional.empty());
        }
    }

}
The interface for the above looks like this:

Basic interface to drive the example

package io.advantageous.qbit.example.callback;

import io.advantageous.qbit.reactive.Callback;

import java.util.List;
import java.util.Map;
import java.util.Optional;

public interface EmployeeService {
    void getEmployeesAsMap(Callback<Map<String, Employee>> empMapCallback);

    void getEmployeesAsList(Callback<List<Employee>> empListCallback);

    void findEmployeeByName(Callback<Optional<Employee>> employeeCallback,
                            String name);
}
If you are familiar with QBit, all of the above should already make sense. If not, I suggest going through the home page of the WIKI and coming back here after you skim it.
To show how to use the CallbackBuilder we will define a basic Rest service calledCompanyRestService.

CompanyRestService to demonstrate CallbackBuilder

/**
 * To access this service
 * curl http://localhost:8080/emap
 {"rick":{"name":"Rick"}}
 */
@RequestMapping("/")
public class CompanyRestService {

    private final Logger logger = LoggerFactory.getLogger(CompanyRestService.class);
    private final EmployeeService employeeService;

    public CompanyRestService(EmployeeService employeeService) {
        this.employeeService = employeeService;
    }

...

    @QueueCallback({QueueCallbackType.EMPTY, QueueCallbackType.LIMIT})
    public void process(){
        ServiceProxyUtils.flushServiceProxy(employeeService);
    }


QBit uses micro-batching which helps optimize message passing between service queues (service actors). The QueueCallback annotation allows us to capture when our request queue is empty or when it has hit its limit. The limit is usually the batch size but could be other things like hitting an important message. Whenever we hit our limit or when are request queue is empty, we go ahead and flush things to the downstream service by callingServiceProxyUtils.flushServiceProxy. This should be mostly review.
As you can see, the EmployeeService has a lot of methods that take Generic types likeOptionalList and Map. When we want to call a downstream service that is going to return a map, list or optional, we have helper methods to make the construction of the callback easier.

CompanyRestService Calling getEmployeesAsMap using CallbackBuilder.wrap

@RequestMapping("/")
public class CompanyRestService {

...
    @RequestMapping("/emap")
    public void employeeMap(final Callback<Map<String, Employee>> empMapCallback) {

        final CallbackBuilder callbackBuilder = CallbackBuilder.newCallbackBuilder();
        callbackBuilder.wrap(empMapCallback); //Forward to error handling, timeout, and callback defined in empMapCallback
        employeeService.getEmployeesAsMap(callbackBuilder.build());

    }
In this case, we use the wrap method. This will forward errors, timeouts and the callback return to the empMapCallback.
To run this, we need to start up the application.

Starting up the REST application.

    public static void main(final String... args) throws Exception {

        /** Create a ManagedServiceBuilder which simplifies QBit wiring. */
        final ManagedServiceBuilder managedServiceBuilder = ManagedServiceBuilder.managedServiceBuilder().setRootURI("/");
        managedServiceBuilder.enableLoggingMappedDiagnosticContext();

        /** Create a service queue for the employee service. */
        final ServiceQueue employeeServiceQueue = managedServiceBuilder.createServiceBuilderForServiceObject(
                new EmployeeServiceImpl()).buildAndStartAll();

        /** Add a CompanyRestService passing it a client proxy to the employee service. */
        managedServiceBuilder.addEndpointService(
                new CompanyRestService(employeeServiceQueue.createProxy(EmployeeService.class)));

        /** Start the server. */
        managedServiceBuilder.startApplication();

    }
If we wanted to copy and mutate the map before we serialized it, we could use the withMapCallback to capture the async return, i.e., Callback from the employeeService.

CompanyRestService using withMapCallback and delegate

@RequestMapping("/")
public class CompanyRestService {
...

    @RequestMapping("/emap2")
    public void employeeMap2(final Callback<Map<String, Employee>> empMapCallback) {

        final CallbackBuilder callbackBuilder = CallbackBuilder.newCallbackBuilder();
        callbackBuilder.delegate(empMapCallback); //Forward to error handling and timeout defined in empMapCallback

        callbackBuilder.withMapCallback(String.class, Employee.class, employeeMap -> {
            logger.info("GET MAP {}", employeeMap);
            empMapCallback.returnThis(employeeMap);
        });
        employeeService.getEmployeesAsMap(callbackBuilder.build());

    }
In this case we forward just the error handling and timeout handling to the callback that we are creating, and then we create a custom return handler using withMapCallback.

CompanyRestService using withMapCallback and delegateWithLogging

@RequestMapping("/")
public class CompanyRestService {
...

    @RequestMapping("/emap3")
    public void employeeMap3(final Callback<Map<String, Employee>> empMapCallback) {

        final CallbackBuilder callbackBuilder = CallbackBuilder.newCallbackBuilder();
        // Forward to error handling and timeout defined in empMapCallback, but install some additional logging for
        // timeout and error handling that associates the error and timeout handling with this call.
        callbackBuilder.delegateWithLogging(empMapCallback, logger, "employeeMap3");
        callbackBuilder.withMapCallback(String.class, Employee.class, employeeMap -> {
            logger.info("GET MAP {}", employeeMap);
            empMapCallback.returnThis(employeeMap);
        });
        employeeService.getEmployeesAsMap(callbackBuilder.build());
    }
If you want to handle error logging and timeout logging in the context of this services log, you can simply use the delegateWithLogging method. This will setup some basic logging for error handing and timeouts.
We of course also have methods that work with List can Collections and Sets and....

Working with list by using withListCallback

@RequestMapping("/")
public class CompanyRestService {
...

    @RequestMapping("/elist")
    public void employeeList(final Callback<List<Employee>> empListCallback) {

        final CallbackBuilder callbackBuilder = CallbackBuilder.newCallbackBuilder();
        // Forward to error handling and timeout defined in empMapCallback, but install some additional logging for
        // timeout and error handling that associates the error and timeout handling with this call.
        callbackBuilder.delegateWithLogging(empListCallback, logger, "employeeList");
        callbackBuilder.withListCallback(Employee.class, employeeList -> {
            logger.info("GET List {}", employeeList);
            empListCallback.returnThis(employeeList);
        });
        employeeService.getEmployeesAsList(callbackBuilder.build());
    }
The above works as you would expect. Let's mix things up a bit. We will callfindEmployeeByName which may or many not return an employee.

Working with optional by using withOptionalCallback

@RequestMapping("/")
public class CompanyRestService {
...

    @RequestMapping("/find")
    public void findEmployee(final Callback<Employee> employeeCallback,
                             @RequestParam("name") final String name) {

        final CallbackBuilder callbackBuilder = CallbackBuilder.newCallbackBuilder();
        // Forward to error handling and timeout defined in empMapCallback, 
        // but install some additional logging for
        // timeout and error handling that associates the error and timeout handling with this call.
        callbackBuilder.delegateWithLogging(employeeCallback, logger, "employeeMap3");
        callbackBuilder.withOptionalCallback(Employee.class, employeeOptional -> {


            if (employeeOptional.isPresent()) {
                employeeCallback.returnThis(employeeOptional.get());
            } else {
                employeeCallback.onError(new Exception("Employee not found"));
            }
        });
        employeeService.findEmployeeByName(callbackBuilder.build(), name);
    }
To work with Optional's we use withOptionalCallback. Here we return an error if the employee is not found and we return the Employee object if he is found.
/**
 * You need this is you want to do error handling (Exception) from a callback.
 * Callback Builder
 * created by rhightower on 3/23/15.
 */
@SuppressWarnings("UnusedReturnValue")
public class CallbackBuilder {


    /**
     * Builder method to set callback handler that takes a list
     * @param componentClass  componentClass
     * @param callback callback
     * @param <T> T
     * @return this
     */
    public <T> CallbackBuilder withListCallback(final Class<T> componentClass,
                                                       final Callback<List<T>> callback) {
        this.callback = callback;
        return this;
    }


    /**
     * Builder method to set callback handler that takes a set
     * @param componentClass  componentClass
     * @param callback callback
     * @param <T> T
     * @return this
     */
    public <T> CallbackBuilder withSetCallback(final Class<T> componentClass,
                                                     final Callback<Set<T>> callback) {
        this.callback = callback;
        return this;
    }


    /**
     * Builder method to set callback handler that takes a collection
     * @param componentClass  componentClass
     * @param callback callback
     * @param <T> T
     * @return this
     */
    public <T> CallbackBuilder withCollectionCallback(final Class<T> componentClass,
                                                            final Callback<Collection<T>> callback) {
        this.callback = callback;
        return this;
    }


    /**
     * Builder method to set callback handler that takes a map
     * @param keyClass  keyClass
     * @param valueClass  valueClass
     * @param callback callback
     * @param <K> key type
     * @param <V> value type
     * @return this
     */
    public <K, V> CallbackBuilder withMapCallback(final Class<K> keyClass,
                                                        final Class<V> valueClass,
                                                        final Callback<Map<K, V>> callback) {
        this.callback = callback;
        return this;
    }


    /**
     * Builder method to set callback handler that takes a boolean
     * @param callback callback
     * @return this
     */
    public CallbackBuilder withBooleanCallback(final Callback<Boolean> callback) {
        this.callback = callback;
        return this;
    }

    /**
     * Builder method to set callback handler that takes a integer
     * @param callback callback
     * @return this
     */
    public CallbackBuilder withIntCallback(final Callback<Integer> callback) {
        this.callback = callback;
        return this;
    }


    /**
     * Builder method to set callback handler that takes a long
     * @param callback callback
     * @return this
     */
    public CallbackBuilder withLongCallback(final Callback<Long> callback) {
        this.callback = callback;
        return this;
    }


    /**
     * Builder method to set callback handler that takes a string
     * @param callback callback
     * @return this
     */
    public CallbackBuilder withStringCallback(final Callback<String> callback) {
        this.callback = callback;
        return this;
    }



    /**
     * Builder method to set callback handler that takes an optional string
     * @param callback callback
     * @return this
     */
    public CallbackBuilder withOptionalStringCallback(final Callback<Optional<String>> callback) {
        this.callback = callback;
        return this;
    }



    /**
     * Builder method to set callback handler that takes an optional string
     * @param callback callback
     * @return this
     */
    public <T> CallbackBuilder withOptionalCallback(final Class<T> cls, final Callback<Optional<T>> callback) {
        this.callback = callback;
        return this;
    }

Read more about callback builders and how to handle errors, timeouts and downstream calls.

Reactor

Let's say that EmployeeService was really talking to some downstream remote services or perhaps to Cassandra and/or Redis. Let's also say that you want to add some timeout for this downstream system. Let's say 10 seconds.
Then our example will use the QBit Reactor and the easiest way to do that would be to subclass the BaseService.

Using QBit Reactor from the BaseService

package io.advantageous.qbit.example.callback;

import io.advantageous.qbit.admin.ManagedServiceBuilder;

import io.advantageous.qbit.annotation.RequestMapping;
import io.advantageous.qbit.annotation.RequestParam;
import io.advantageous.qbit.reactive.Callback;
import io.advantageous.qbit.reactive.CallbackBuilder;
import io.advantageous.qbit.reactive.Reactor;
import io.advantageous.qbit.reactive.ReactorBuilder;
import io.advantageous.qbit.service.BaseService;
import io.advantageous.qbit.service.ServiceQueue;
import io.advantageous.qbit.service.stats.StatsCollector;
import io.advantageous.qbit.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;


@RequestMapping("/")
public class CompanyRestServiceUsingReactor extends BaseService {


    private final Logger logger = LoggerFactory.getLogger(CompanyRestService.class);
    private final EmployeeService employeeService;

    public CompanyRestServiceUsingReactor(Reactor reactor,
                                          Timer timer,
                                          StatsCollector statsCollector,
                                          EmployeeService employeeService) {
        super(reactor, timer, statsCollector);
        this.employeeService = employeeService;
        reactor.addServiceToFlush(employeeService);
    }



    @RequestMapping("/emap")
    public void employeeMap(final Callback<Map<String, Employee>> empMapCallback) {

        final CallbackBuilder callbackBuilder =  super.reactor.callbackBuilder();
        callbackBuilder.wrap(empMapCallback); //Forward to error handling, timeout, and callback defined in empMapCallback
        employeeService.getEmployeesAsMap(callbackBuilder.build());

    }


    @RequestMapping("/emap2")
    public void employeeMap2(final Callback<Map<String, Employee>> empMapCallback) {

        final CallbackBuilder callbackBuilder = super.reactor.callbackBuilder();
        callbackBuilder.delegate(empMapCallback); //Forward to error handling and timeout defined in empMapCallback

        callbackBuilder.withMapCallback(String.class, Employee.class, employeeMap -> {
            logger.info("GET MAP {}", employeeMap);
            empMapCallback.returnThis(employeeMap);
        });
        employeeService.getEmployeesAsMap(callbackBuilder.build());

    }


    @RequestMapping("/emap3")
    public void employeeMap3(final Callback<Map<String, Employee>> empMapCallback) {

        final CallbackBuilder callbackBuilder = super.reactor.callbackBuilder();
        // Forward to error handling and timeout defined in empMapCallback, but install some additional logging for
        // timeout and error handling that associates the error and timeout handling with this call.
        callbackBuilder.delegateWithLogging(empMapCallback, logger, "employeeMap3");
        callbackBuilder.withMapCallback(String.class, Employee.class, employeeMap -> {
            logger.info("GET MAP {}", employeeMap);
            empMapCallback.returnThis(employeeMap);
        });
        employeeService.getEmployeesAsMap(callbackBuilder.build());
    }


    @RequestMapping("/elist")
    public void employeeList(final Callback<List<Employee>> empListCallback) {

        final CallbackBuilder callbackBuilder = super.reactor.callbackBuilder();
        // Forward to error handling and timeout defined in empMapCallback, but install some additional logging for
        // timeout and error handling that associates the error and timeout handling with this call.
        callbackBuilder.delegateWithLogging(empListCallback, logger, "employeeList");
        callbackBuilder.withListCallback(Employee.class, employeeList -> {
            logger.info("GET List {}", employeeList);
            empListCallback.returnThis(employeeList);
        });
        employeeService.getEmployeesAsList(callbackBuilder.build());
    }


    @RequestMapping("/find")
    public void findEmployee(final Callback<Employee> employeeCallback,
                             @RequestParam("name") final String name) {

        final long startTime = super.time;

        final CallbackBuilder callbackBuilder = super.reactor.callbackBuilder();
        // Forward to error handling and timeout defined in empMapCallback, but install some additional logging for
        // timeout and error handling that associates the error and timeout handling with this call.
        callbackBuilder.delegateWithLogging(employeeCallback, logger, "employeeMap3");
        callbackBuilder.withOptionalCallback(Employee.class, employeeOptional -> {


            super.recordTiming("findEmployee", time - startTime);
            if (employeeOptional.isPresent()) {

                employeeCallback.returnThis(employeeOptional.get());
            } else {
                employeeCallback.onError(new Exception("Employee not found"));
            }
        });
        employeeService.findEmployeeByName(callbackBuilder.build(), name);
    }


    public static void main(final String... args) throws Exception {

        /** Create a ManagedServiceBuilder which simplifies QBit wiring. */
        final ManagedServiceBuilder managedServiceBuilder = ManagedServiceBuilder.managedServiceBuilder().setRootURI("/");
        managedServiceBuilder.enableLoggingMappedDiagnosticContext();

        /** Create a service queue for the employee service. */
        final ServiceQueue employeeServiceQueue = managedServiceBuilder.createServiceBuilderForServiceObject(
                new EmployeeServiceImpl()).buildAndStartAll();

        /** Add a CompanyRestService passing it a client proxy to the employee service. */
        managedServiceBuilder.addEndpointService(
                new CompanyRestServiceUsingReactor(
                        ReactorBuilder.reactorBuilder().setDefaultTimeOut(10).setTimeUnit(TimeUnit.SECONDS).build(),
                        Timer.timer(),
                        managedServiceBuilder.getStatServiceBuilder().buildStatsCollector(),
                        employeeServiceQueue.createProxy(EmployeeService.class)));

        /** Start the server. */
        managedServiceBuilder.startApplication();

    }
}
Notice that the callbackBuilder is now constructed from the reactor (final CallbackBuilder callbackBuilder = super.reactor.callbackBuilder();).
To learn more about the Reactor, please read Reactively handling async calls with QBit Reactive Microservices.

Stats

When you use the BaseService, you also have access to the stats system.

Stats from BaseService

    @RequestMapping("/find")
    public void findEmployee(final Callback<Employee> employeeCallback,
                             @RequestParam("name") final String name) {

        final long startTime = super.time;

        final CallbackBuilder callbackBuilder = super.reactor.callbackBuilder();
        callbackBuilder.delegateWithLogging(employeeCallback, logger, "employeeMap3");
        callbackBuilder.withOptionalCallback(Employee.class, employeeOptional -> {


            /** Record timing. */
            super.recordTiming("findEmployee", time - startTime);

            if (employeeOptional.isPresent()) {
                /* Increment count of employees found. */
                super.incrementCount("employeeFound");
                employeeCallback.returnThis(employeeOptional.get());
            } else {
                /* Increment count of employees not found. */
                super.incrementCount("employeeNotFound");
                employeeCallback.onError(new Exception("Employee not found"));
            }
        });
        employeeService.findEmployeeByName(callbackBuilder.build(), name);
    }
Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training