Rick

Rick
Rick

Thursday, February 5, 2015

QBit event bus for microservices, Round 2, using a private event bus.

This is the second part of this Rough Cut on event bus for microservices.
So I said this before and I will say this again. QBit is a lib not a framework. You can use it with Spring, Vertx, Jetty, etc. It does not want to take over your world.

QBit is a queuing library for microservices. It is similar to many other projects like Akka, Spring Reactor, etc. QBit is just a library not a platform. QBit has libraries to put a service behind a queue. You can use QBit queues directly or you can create a service. QBit services can be exposed by WebSocket, HTTP, HTTP pipeline, and other types of remoting. A service in QBit is a Java class whose methods are executed behind service queues. QBit implements apartment model threading and is similar to the Actor model or a better description would be Active Objects. QBit does not use a disruptor. It uses regular Java Queues. QBit can do north of 100 million ping pong calls per second which is an amazing speed (seen as high as 200M). QBit also supports calling services via REST, and WebSocket. QBit is microservices in the pure Web sense: JSON, HTTP, WebSocket, etc. 
QBit services and busses were designed to be composable. You can wire in QBit Services into many event bus systems. This makes it easy to wire a service to listen to events coming from Kafka, RabbitMQ or something else.
But if you look at the last example, with the system event bus, it is starting to look a lot like EJB. WHICH IS NOT A GOOD THING FOR QBIT. Well, you don't have to use the system event bus. QBit aint EJB.
So if you want to use the system event bus, we make it easy. But you can use a QBit event bus and still have no compile time dependencies to QBit. You would do this by using a private event bus.
You would also use a private event bus if you had a module of related services that needed communicate with each other and future listeners that might be added to that module, but not with the other services that might be in the same process. In this scenario, you could still use the system event bus for inter-module communication. You can even define your own private bus as your system event bus. There is no requirement to use the system event bus at all. QBit uses the system event bus to register services for metrics gathering and for inproc service discovery.
Remember that even Annotations are not compile time dependency because you can define your own and if they have the same name and params at QBit, then QBit will happily use them (same for Boon, this is a theme).
QBit is interface by convention like Go and Rust. It is not compile time dependent interfaces. You can develop QBit services in standalone jar files that do not have any compile time dependencies on QBit.
Ok.. Enough monologue, let's get cracking.
Continuing our last last example of the event bus.
First step define your own interface to QBit event bus
    interface EventManagerClient {

        <T> void send(String channel, T event);

        <T> void sendArray(String channel, T... event);

        void clientProxyFlush();
    }
Proxy client interfaces are by convention. If the interface matches, then it works. The method clientProxyFlush is a special method that will flush the events. When you do things by convention instead of my interface, then it is easier to mix and match different versions of jars. I learned this trick from Go. Go is strongly typed but supports interfaces. QBit and Boon do the same thing but for Java.
To get this example to work, we only have to change one class and then we have to do some wiring to let the private bus know about the other services. QBit services and busses were designed to be composable. You can wire in QBit Services into many event bus systems. This makes it easy to wire a service to listen to events coming from Kafka, RabbitMQ or something else.
    public static class EmployeeHiringService {

        private final EventManagerClient eventManager;

        public EmployeeHiringService (final EventManagerClient eventManager) {
            this.eventManager = eventManager;
        }

        void queueEmpty() {
            eventManager.clientProxyFlush();
        }

        void queueLimit() {
            eventManager.clientProxyFlush();
        }



        public void hireEmployee(final Employee employee) {

            int salary = 100;
            System.out.printf("Hired employee %s\n", employee);

            //Does stuff to hire employee

            //Sends events
            final EventManager eventManager = serviceContext().eventManager();
            eventManager.send(NEW_HIRE_CHANNEL, employee);
            eventManager.sendArray(PAYROLL_ADJUSTMENT_CHANNEL, employee, salary);


        }

    }
We added an EventManagerClient and we use it to send messages. Your code it not tied to QBit at all. This code could work somewhere else.
QBit has callbacks for Queue events like queue is idle, queue just received a bunch of messages, queue is busy, queue reached its batch limit, queue is shutdown, queue ate my cat (just seeing if you are paying attention).
To handle when the queue has reached a batch limit, we use queueLimit. To handle when the queue is empty, we use queueEmpty.
        void queueEmpty() {
            eventManager.clientProxyFlush();
        }

        void queueLimit() {
            eventManager.clientProxyFlush();
        }

The method names like the annotations are by convention not by interface. This allows us to add more callbacks in QBit without breaking your code. This is the Go programming concept of yeah go ahead and use interfaces but make them a convention not a hard fast rule so you can mix and match versions and evolve the software. QBit adopts this philosophy as does Boon. It is not the right fit for everything. A domain model could and should have strong interfaces. A lib like QBit should not.
To add to the "Don't add compile time dependencies to QBit", I added an annotation for OnEvent to the example:
    @Target({ ElementType.METHOD, ElementType.FIELD })
    @Retention(RetentionPolicy.RUNTIME)
    public @interface OnEvent {


    /* The channel you want to listen to. */;
        String value() default "";

        /* The consume is the last object listening to this event.
           An event channel can have many subscribers but only one consume.
         */
        boolean consume() default false;


    }
The last example used the OnEvent from QBit, this example defines its own. So now our services truly are QBit agnostic. They don't care if QBit adds more callbacks or more annotations. It all works by convention.
Next we create our own event bus:
        /* Create you own private event bus. */
        EventManager privateEventBus = QBit.factory().createEventManager();
Then we wrap that bad boy into a service queue so we can call it async.
        /* Create a service queue for this event bus. */
        Service privateEventBusService = serviceBuilder()
                .setServiceObject(privateEventBus)
                .setInvokeDynamic(false).build();
Now we need a way to call the event bus.
        /*
         Create a proxy client for the queued event bus service.
         */
        EventManagerClient eventBus = privateEventBusService.createProxy(EventManagerClient.class);
The client proxy will create MethodCall objects and enqueue those objects onto the queue for the EventManager. This way the calls are async. And the EventManager does not need to do a thread sync to send messages.
Next we need to create the EmployeeHiringService and give it this client proxy to the private event manager.
        /*
        Create your EmployeeHiringService but this time pass the private event bus.
        Note you could easily use Spring or Guice for this wiring.
         */
        EmployeeHiringService employeeHiring = new EmployeeHiringService(eventBus);
Ok.. Now we are in for the home stretch. Just create your services like you did before. These are just POJOs that are QBit agnostic except for some coding conventions.
        /* Now create your other service POJOs which have no compile time dependencies on QBit. */
        PayrollService payroll = new PayrollService();
        BenefitsService benefits = new BenefitsService();
        VolunteerService volunteering = new VolunteerService();
This should be familiar by now. Wire your POJOs up to make them high-speed services.
        /** Employee hiring service. */
        Service employeeHiringService = serviceBuilder()
                .setServiceObject(employeeHiring)
                .setInvokeDynamic(false).build();
        /** Payroll service */
        Service payrollService = serviceBuilder()
                .setServiceObject(payroll)
                .setInvokeDynamic(false).build();
        /** Employee Benefits service. */
        Service employeeBenefitsService = serviceBuilder()
                .setServiceObject(benefits)
                .setInvokeDynamic(false).build();
        /* Community outreach program service. */
        Service volunteeringService = serviceBuilder()
                .setServiceObject(volunteering)
                .setInvokeDynamic(false).build();
Now this is NEW. Pay attention. This is an important step.
Wire the private event bus to the services that are consuming the messages.
        /* Now wire in the event bus so it can fire events into the service queues. */
        privateEventBus.joinService(payrollService);
        privateEventBus.joinService(employeeBenefitsService);
        privateEventBus.joinService(volunteeringService);
SIDE NOTE: The reason we can wire in another event bus system into services is because services publishes a event queue as part of its public interface so anyone or anything can create Event objects and publish them to a service. (Look at the interface for service and then look at the eventReceiveQueue method. )
That is it. The rest is the same.
        /** Now create the service proxy like before. */
        EmployeeHiringServiceClient employeeHiringServiceClientProxy =
                employeeHiringService.createProxy(EmployeeHiringServiceClient.class);

        /** Call the hireEmployee method which triggers the other events. */
        employeeHiringServiceClientProxy.hireEmployee(new Employee("Rick", 1));

        flushServiceProxy(employeeHiringServiceClientProxy);

        Sys.sleep(5_000);
output
Hired employee Employee{firstName='Rick', employeeId=1}
Employee will be invited to the community outreach program Rick 1
Employee added to payroll  Rick 1 100
Employee enrolled into benefits system employee Rick 1 

Full code listing

package io.advantageous.qbit.example.events;

import io.advantageous.qbit.QBit;
import io.advantageous.qbit.events.EventManager;
import io.advantageous.qbit.service.Service;
import org.boon.core.Sys;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import static io.advantageous.qbit.service.ServiceBuilder.serviceBuilder;
import static io.advantageous.qbit.service.ServiceContext.serviceContext;
import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;

/**
 * Created by rhightower on 2/5/15.
 */
public class EmployeeEventExampleUsingStandaloneEventBus {

    @Target({ ElementType.METHOD, ElementType.FIELD })
    @Retention(RetentionPolicy.RUNTIME)
    public @interface OnEvent {


    /* The channel you want to listen to. */;
        String value() default "";

        /* The consume is the last object listening to this event.
           An event channel can have many subscribers but only one consume.
         */
        boolean consume() default false;


    }


    public static final String NEW_HIRE_CHANNEL = "com.mycompnay.employee.new";

    public static final String PAYROLL_ADJUSTMENT_CHANNEL = "com.mycompnay.employee.payroll";

    public static class Employee {
        final String firstName;
        final int employeeId;

        public Employee(String firstName, int employeeId) {
            this.firstName = firstName;
            this.employeeId = employeeId;
        }

        public String getFirstName() {
            return firstName;
        }

        public int getEmployeeId() {
            return employeeId;
        }

        @Override
        public String toString() {
            return "Employee{" +
                    "firstName='" + firstName + '\'' +
                    ", employeeId=" + employeeId +
                    '}';
        }
    }

    interface EmployeeHiringServiceClient {
        void hireEmployee(final Employee employee);

    }

    interface EventManagerClient {

        <T> void send(String channel, T event);

        <T> void sendArray(String channel, T... event);

        void clientProxyFlush();
    }


    public static class EmployeeHiringService {

        final EventManagerClient eventManager;

        public EmployeeHiringService (final EventManagerClient eventManager) {
            this.eventManager = eventManager;
        }

        void queueEmpty() {
            eventManager.clientProxyFlush();
        }

        void queueLimit() {
            eventManager.clientProxyFlush();
        }



        public void hireEmployee(final Employee employee) {

            int salary = 100;
            System.out.printf("Hired employee %s\n", employee);

            //Does stuff to hire employee

            //Sends events
            eventManager.send(NEW_HIRE_CHANNEL, employee);
            eventManager.sendArray(PAYROLL_ADJUSTMENT_CHANNEL, employee, salary);


        }

    }



    public static class BenefitsService {

        @OnEvent(NEW_HIRE_CHANNEL)
        public void enroll(final Employee employee) {

            System.out.printf("Employee enrolled into benefits system employee %s %d\n",
                    employee.getFirstName(), employee.getEmployeeId());

        }

    }

    public static class VolunteerService {

        @OnEvent(NEW_HIRE_CHANNEL)
        public void invite(final Employee employee) {

            System.out.printf("Employee will be invited to the community outreach program %s %d\n",
                    employee.getFirstName(), employee.getEmployeeId());

        }

    }



    public static class PayrollService {

        @OnEvent(PAYROLL_ADJUSTMENT_CHANNEL)
        public void addEmployeeToPayroll(final Employee employee, int salary) {

            System.out.printf("Employee added to payroll  %s %d %d\n",
                    employee.getFirstName(), employee.getEmployeeId(), salary);

        }

    }

    public static void main(String... args) {


        /* Create you own private event bus. */
        EventManager privateEventBus = QBit.factory().createEventManager();

        /* Create a service queue for this event bus. */
        Service privateEventBusService = serviceBuilder()
                .setServiceObject(privateEventBus)
                .setInvokeDynamic(false).build();

        /*
         Create a proxy client for the queued event bus service.
         */
        EventManagerClient eventBus = privateEventBusService.createProxy(EventManagerClient.class);


        /*
        Create your EmployeeHiringService but this time pass the private event bus.
        Note you could easily use Spring or Guice for this wiring.
         */
        EmployeeHiringService employeeHiring = new EmployeeHiringService(eventBus);



        /* Now create your other service POJOs which have no compile time dependencies on QBit. */
        PayrollService payroll = new PayrollService();
        BenefitsService benefits = new BenefitsService();
        VolunteerService volunteering = new VolunteerService();



        /** Employee hiring service. */
        Service employeeHiringService = serviceBuilder()
                .setServiceObject(employeeHiring)
                .setInvokeDynamic(false).build();
        /** Payroll service */
        Service payrollService = serviceBuilder()
                .setServiceObject(payroll)
                .setInvokeDynamic(false).build();
        /** Employee Benefits service. */
        Service employeeBenefitsService = serviceBuilder()
                .setServiceObject(benefits)
                .setInvokeDynamic(false).build();
        /* Community outreach program. */
        Service volunteeringService = serviceBuilder()
                .setServiceObject(volunteering)
                .setInvokeDynamic(false).build();


        /* Now wire in the event bus so it can fire events into the service queues. */
        privateEventBus.joinService(payrollService);
        privateEventBus.joinService(employeeBenefitsService);
        privateEventBus.joinService(volunteeringService);

        /** Now create the service proxy like before. */
        EmployeeHiringServiceClient employeeHiringServiceClientProxy =
                employeeHiringService.createProxy(EmployeeHiringServiceClient.class);

        /** Call the hireEmployee method which triggers the other events. */
        employeeHiringServiceClientProxy.hireEmployee(new Employee("Rick", 1));

        flushServiceProxy(employeeHiringServiceClientProxy);

        Sys.sleep(5_000);

    }
}

QBit lingo

QBit is a Java microservice lib supporting REST, JSON and WebSocket. It is written in Java but I might one day write a version in Rust or Go or C# (but that would require a large payday).

Service 
POJO (plain old Java object) behind a queue that can receive method calls via proxy calls or events (May have one thread managing events, method calls, and responses or two one for method calls and events and the other for responses so response handlers do not block service. One is faster unless responses block). Services can use Spring MVC style REST annotations to expose themselves to the outside world via REST and WebSocket. 

ServiceBundle 
Many POJOs behind one response queue and many receive queues. There may be one thread for all responses or not. They also can be one receive queue. 

Queue
A thread managing a queue. It supports batching. It has events for empty, reachedLimit, startedBatch, idle. You can listen to these events from services that sit behind a queue. You don't have to use Services. You can use Queue's direct.

ServiceServer 
ServiceBundle that is exposed to REST and WebSocket communication

EventBus 
EventBus is a way to send a lot of messages to services that may be loosely coupled

ClientProxy 
Way to invoke service through async interface, service can be inproc (same process) or remoted over WebSocket.

Non-blocking
QBit is a non-blocking lib. You use CallBacks via Java 8 Lambdas. You can also send event messages and get replies. Messaging is built into the system so you can easily coordinate complex tasks. 

Microservice lib
QBit. HTTP, HTTP pipeline, HTTP WebSocket, and JSON. QBit is the high-speed microservice framework for Java.

Speed
There is a lot of room for improvement with Speed. But already QBit is VERY fast.
 200M+ TPS inproc ping pong, 10M-20M+ TPS event bus, 500K TPS RPC calls over WebSocket/JSON, etc.
More work needs to be done to improve speed, but now it is fast enough where I am working more with usability.

Besides if I make it too fast, then no one will publish benchmarks, and then I can't publish follow up benchmarks that kick their ass. Where would be the fun? There is no story if their is no conflict. 

Learn more about QBit.


  • [Detailed Tutorial] QBit microservice example
  • [Doc] Queue Callbacks for QBit queue based services
  • [Quick Start] Building a simple Rest web microservice server with QBit
  • [Quick Start] Building a TODO web microservice client with QBit
  • [Quick Start] Building a TODO web microservice server with QBit
  • [Quick Start] Building boon for the QBit microservice engine
  • [Quick Start] Building QBit the microservice lib for Java
  • [Rough Cut] Delivering up Single Page Applications from QBit Java JSON Microservice lib
  • [Rough Cut] Working with event bus for QBit the microservice engine
  • [Rough Cut] Working with inproc MicroServices
  • [Rough Cut] Working with private event bus for inproc microservices
  • [Rough Cut] Working with strongly typed event bus proxies for QBit Java Microservice lib
  • [Rough Cut] Working with System Manager for QBit Mircoservice lib
  • [Z Notebook] More benchmarking internal
  • [Z Notebook] Performance testing for REST
  • [Z Notebook] Roadmap
  • Home
  • Introduction to QBit
  • Local Service Proxies
  • QBit Boon New Wave of JSON HTTP and Websocket
  • QBit Docs
  • Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training