Rick

Rick
Rick

Thursday, February 12, 2015

Working with strongly typed event bus proxies for QBit Java Microservice lib


This example extends these example on the QBit event bus: Rough Cut: Working with event bus for QBit the microservice engine, Rough Cut: Working with private event bus for inproc microservices.



In the last example we used the event bus to "call" OnEvent methods on other services - sort of. We had to employ an event manager, and even though it was our event manager interface, we had weird method names to use like send and sendArray. Also we had to pass around event channel names which felt icky. It all worked fine, but wouldn't it be nice to not have our code tied to an event manager per se. We can still have callbacks/events, which just don't need to know that we are using an event manager.

In this example, we are going to do things a bit different. We are going to define a strong typed, abstraction over our use of the event manager.

Strongly typed interface to event manager

    interface EmployeeEventManager {

        @EventChannel(NEW_HIRE_CHANNEL)
        void sendNewEmployee(Employee employee);


        @EventChannel(NEW_HIRE_CHANNEL)
        void sendSalaryChange(Employee employee, int newSalary);

    }
Now we are sending out events with methods that abstract our use of an event manager.

Using Strongly typed interface to event manager

    public static class EmployeeHiringService {

        final EmployeeEventManager eventManager;// <----------- Notice we use 
                                                // EmployeeEventManager which is strongly typed

        public EmployeeHiringService (
                       final EmployeeEventManager employeeEventManager) {// <-----------
            this.eventManager = employeeEventManager;
        }

        @QueueCallback(QueueCallbackType.EMPTY)
        private void noMoreRequests() {
            flushServiceProxy(eventManager);
        }

        @QueueCallback(QueueCallbackType.LIMIT)
        private void hitLimitOfRequests() {
            flushServiceProxy(eventManager);
        }

        public void hireEmployee(final Employee employee) {

            int salary = 100;
            System.out.printf("Hired employee %s\n", employee);

            //Does stuff to hire employee


            eventManager.sendNewEmployee( employee); // <------------------ 
                                                     //  Strongly typed method call
            eventManager.sendSalaryChange( employee, salary );// <------------------ 
                                                              // Strongly typed method call

        }

    }

To wire this in do this

        /* Get an eventBusProxyCreator which can create strongly typed event interface. */
        final EventBusProxyCreator eventBusProxyCreator = QBit.factory().eventBusProxyCreator();

        /* Create a strongly typed proxy to the event manager. */
        final EmployeeEventManager employeeEventManager =
                eventBusProxyCreator.createProxy(privateEventBus, EmployeeEventManager.class);


        /*
        Create your EmployeeHiringService but this time pass strongly typed,  private event bus.
         */
        EmployeeHiringService employeeHiring = new EmployeeHiringService(employeeEventManager);


        /** Employee hiring service. */
        Service employeeHiringService = serviceBuilder()
                .setServiceObject(employeeHiring)
                .setInvokeDynamic(false).build();


        /* Now wire in the event bus so it can fire events into the service queues. */
        privateEventBus.joinService(payrollService);
        privateEventBus.joinService(employeeBenefitsService);
        privateEventBus.joinService(volunteeringService);


        privateEventBusService.start();
        employeeHiringService.start();

        /** Now the service proxy like before. */
        EmployeeHiringServiceClient employeeHiringServiceClientProxy =
                employeeHiringService.createProxy(EmployeeHiringServiceClient.class);

        /** Call the hireEmployee method which triggers the other events. */
        employeeHiringServiceClientProxy.hireEmployee(new Employee("Rick", 1));
I also included strongly typed (no magic method) queue listener callbacks.
        @QueueCallback(QueueCallbackType.EMPTY)
        private void noMoreRequests() {
            flushServiceProxy(eventManager);
        }

        @QueueCallback(QueueCallbackType.LIMIT)
        private void hitLimitOfRequests() {
            flushServiceProxy(eventManager);
        }
As before and always, you can use your own annotations and enums or use the ones that ship with QBit.
Now we can have a service that can have 0 tie to QBit. You could with some elbow grease use your implementation in Akka and QBit. I hate framework tie-in which is why QBit is a lib not a framework. Use it how, when you want.

Complete Example

package io.advantageous.qbit.example.events;

import io.advantageous.qbit.QBit;
import io.advantageous.qbit.annotation.EventChannel;
import io.advantageous.qbit.annotation.OnEvent;
import io.advantageous.qbit.annotation.QueueCallback;
import io.advantageous.qbit.annotation.QueueCallbackType;
import io.advantageous.qbit.events.EventBusProxyCreator;
import io.advantageous.qbit.events.EventManager;
import io.advantageous.qbit.service.Service;
import io.advantageous.qbit.service.ServiceProxyUtils;
import org.boon.core.Sys;


import static io.advantageous.qbit.service.ServiceBuilder.serviceBuilder;
import static io.advantageous.qbit.service.ServiceContext.serviceContext;
import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;

/**
 * Created by rhightower on 2/11/15.
 */
public class EmployeeEventExampleUsingEventProxyToSendEvents {



    public static final String NEW_HIRE_CHANNEL = "com.mycompnay.employee.new";

    public static final String PAYROLL_ADJUSTMENT_CHANNEL = "com.mycompnay.employee.payroll";

    public static class Employee {
        final String firstName;
        final int employeeId;

        public Employee(String firstName, int employeeId) {
            this.firstName = firstName;
            this.employeeId = employeeId;
        }

        public String getFirstName() {
            return firstName;
        }

        public int getEmployeeId() {
            return employeeId;
        }

        @Override
        public String toString() {
            return "Employee{" +
                    "firstName='" + firstName + '\'' +
                    ", employeeId=" + employeeId +
                    '}';
        }
    }

    interface EmployeeHiringServiceClient {
        void hireEmployee(final Employee employee);

    }


    interface EmployeeEventManager {

        @EventChannel(NEW_HIRE_CHANNEL)
        void sendNewEmployee(Employee employee);


        @EventChannel(PAYROLL_ADJUSTMENT_CHANNEL)
        void sendSalaryChangeEvent(Employee employee, int newSalary);

    }

    public static class EmployeeHiringService {

        final EmployeeEventManager eventManager;

        public EmployeeHiringService (final EmployeeEventManager employeeEventManager) {
            this.eventManager = employeeEventManager;
        }



        @QueueCallback(QueueCallbackType.EMPTY)
        private void noMoreRequests() {
            flushServiceProxy(eventManager);
        }


        @QueueCallback(QueueCallbackType.LIMIT)
        private void hitLimitOfRequests() {
            flushServiceProxy(eventManager);
        }



        public void hireEmployee(final Employee employee) {

            int salary = 100;
            System.out.printf("Hired employee %s\n", employee);

            //Does stuff to hire employee


            eventManager.sendNewEmployee( employee);
            eventManager.sendSalaryChangeEvent( employee, salary );


        }

    }



    public static class BenefitsService {

        @OnEvent(NEW_HIRE_CHANNEL)
        public void enroll(final Employee employee) {

            System.out.printf("Employee enrolled into benefits system employee %s %d\n",
                    employee.getFirstName(), employee.getEmployeeId());

        }

    }

    public static class VolunteerService {

        @OnEvent(NEW_HIRE_CHANNEL)
        public void invite(final Employee employee) {

            System.out.printf("Employee will be invited to the community outreach program %s %d\n",
                    employee.getFirstName(), employee.getEmployeeId());

        }

    }



    public static class PayrollService {

        @OnEvent(PAYROLL_ADJUSTMENT_CHANNEL)
        public void addEmployeeToPayroll(final Employee employee, int salary) {

            System.out.printf("Employee added to payroll  %s %d %d\n",
                    employee.getFirstName(), employee.getEmployeeId(), salary);

        }

    }

    public static void main(String... args) {


        /* Create you own private event bus. */
        EventManager privateEventBus = QBit.factory().createEventManager();

        /* Create a service queue for this event bus. */
        Service privateEventBusService = serviceBuilder()
                .setServiceObject(privateEventBus)
                .setInvokeDynamic(false).build();


        final EventBusProxyCreator eventBusProxyCreator =
                           QBit.factory().eventBusProxyCreator();

        final EmployeeEventManager employeeEventManager =
                       eventBusProxyCreator.createProxy(privateEventBus, EmployeeEventManager.class);

        /*
        Create your EmployeeHiringService but this time pass the private event bus.
        Note you could easily use Spring or Guice for this wiring.
         */
        EmployeeHiringService employeeHiring = new EmployeeHiringService(employeeEventManager);



        /* Now createWithWorkers your other service POJOs which have no compile time dependencies on QBit. */
        PayrollService payroll = new PayrollService();
        BenefitsService benefits = new BenefitsService();
        VolunteerService volunteering = new VolunteerService();



        /** Employee hiring service. */
        Service employeeHiringService = serviceBuilder()
                .setServiceObject(employeeHiring)
                .setInvokeDynamic(false).build();
        /** Payroll service */
        Service payrollService = serviceBuilder()
                .setServiceObject(payroll)
                .setInvokeDynamic(false).build();
        /** Employee Benefits service. */
        Service employeeBenefitsService = serviceBuilder()
                .setServiceObject(benefits)
                .setInvokeDynamic(false).build();
        /* Community outreach program. */
        Service volunteeringService = serviceBuilder()
                .setServiceObject(volunteering)
                .setInvokeDynamic(false).build();


        /* Now wire in the event bus so it can fire events into the service queues. */
        privateEventBus.joinService(payrollService);
        privateEventBus.joinService(employeeBenefitsService);
        privateEventBus.joinService(volunteeringService);


        privateEventBusService.start();
        employeeHiringService.start();
        volunteeringService.start();
        payrollService.start();
        employeeBenefitsService.start();


        /** Now createWithWorkers the service proxy like before. */
        EmployeeHiringServiceClient employeeHiringServiceClientProxy =
                employeeHiringService.createProxy(EmployeeHiringServiceClient.class);

        /** Call the hireEmployee method which triggers the other events. */
        employeeHiringServiceClientProxy.hireEmployee(new Employee("Rick", 1));

        flushServiceProxy(employeeHiringServiceClientProxy);

        Sys.sleep(5_000);

    }
}

Tutorials on QBit Java Microservice lib

Tutorials are classified:
  • Rough Cut - little instruction assumes you know QBit basics are pick on the uptake
  • Quick Start - Code centric tutorial focusing more on code then explanation
  • Detailed Tutorial - A lot of theory, explanation and background with some hand holding.
  • Doc - A snippet of documentation with no full example per se
Rough cuts will one day be a Quick Start or even a Detailed Tutorial, but right now. It is documentation.

What is QBit again?

QBit is a queuing library for microservices. It is similar to many other projects like Akka, Spring Reactor, etc. QBit is just a library not a platform. QBit has libraries to put a service behind a queue. You can use QBit queues directly or you can create a service. QBit services can be exposed by WebSocket, HTTP, HTTP pipeline, and other types of remoting. A service in QBit is a Java class whose methods are executed behind service queues. QBit implements apartment model threading and is similar to the Actor model or a better description would be Active Objects. QBit does not use a disruptor. It uses regular Java Queues. QBit can do north of 100 million ping pong calls per second which is an amazing speed (seen as high as 200M). QBit also supports calling services via REST, and WebSocket. QBit is microservices in the pure Web sense: JSON, HTTP, WebSocket, etc.

QBit lingo

QBit is a Java microservice lib supporting REST, JSON and WebSocket. It is written in Java but I might one day write a version in Rust or Go or C# (but that would require a large payday).
Service POJO (plain old Java object) behind a queue that can receive method calls via proxy calls or events (May have one thread managing events, method calls, and responses or two one for method calls and events and the other for responses so response handlers do not block service. One is faster unless responses block). Services can use Spring MVC style REST annotations to expose themselves to the outside world via REST and WebSocket.
ServiceBundle Many POJOs behind one response queue and many receive queues. There may be one thread for all responses or not. They also can be one receive queue.
Queue A thread managing a queue. It supports batching. It has events for empty, reachedLimit, startedBatch, idle. You can listen to these events from services that sit behind a queue. You don't have to use Services. You can use Queue's direct.
ServiceServer ServiceBundle that is exposed to REST and WebSocket communication
EventBus EventBus is a way to send a lot of messages to services that may be loosely coupled
ClientProxy Way to invoke service through async interface, service can be inproc (same process) or remoted over WebSocket.
Non-blocking QBit is a non-blocking lib. You use CallBacks via Java 8 Lambdas. You can also send event messages and get replies. Messaging is built into the system so you can easily coordinate complex tasks.
Speed There is a lot of room for improvement with Speed. But already QBit is VERY fast. 200M+ TPS inproc ping pong, 10M-20M+ TPS event bus, 500K TPS RPC calls over WebSocket/JSON, etc. More work needs to be done to improve speed, but now it is fast enough where I am working more with usability.

Learn more about QBit:


  • [Detailed Tutorial] QBit microservice example
  • [Doc] Queue Callbacks for QBit queue based services
  • [Quick Start] Building a simple Rest web microservice server with QBit
  • [Quick Start] Building a TODO web microservice client with QBit
  • [Quick Start] Building a TODO web microservice server with QBit
  • [Quick Start] Building boon for the QBit microservice engine
  • [Quick Start] Building QBit the microservice lib for Java
  • [Rough Cut] Delivering up Single Page Applications from QBit Java JSON Microservice lib
  • [Rough Cut] Working with event bus for QBit the microservice engine
  • [Rough Cut] Working with inproc MicroServices
  • [Rough Cut] Working with private event bus for inproc microservices
  • [Rough Cut] Working with strongly typed event bus proxies for QBit Java Microservice lib
  • [Rough Cut] Working with System Manager for QBit Mircoservice lib
  • [Z Notebook] More benchmarking internal
  • [Z Notebook] Performance testing for REST
  • [Z Notebook] Roadmap
  • Home
  • Introduction to QBit
  • Local Service Proxies
  • QBit Boon New Wave of JSON HTTP and Websocket
  • QBit Docs

  • Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training