Rick

Rick
Rick

Wednesday, March 18, 2015

Working with Event Channels and strongly typed event bus with the QBit Microservice Java, JSON, WebSocket Lib


Working with Event Channels and strongly typed event bus with the QBit Microservice Java, JSON, WebSocket Lib


QBit event channels are unlike classical Java event listeners. The channel is both the interface to send the event from the producer and the interface to receive the event. A clear advantage to this strongly typed channel approach is you can use the IDE to quickly see what implements the channel methods, i.e., event listeners and what calls it to see all of your event producers (call is asynchronous). QBit event channels are part of QBit's event bus system which uses a microservice approach for communication and Consul for clustering and JSON and WebSocket for high-speed replication that is reader tolerant.

Event channels are now strongly typed interfaces which are IDE friendly

So I was on my way home from the city yesterday and me and my buddy discussed using command objects as event objects in an event bus so the type of the command object becomes the channel, mostly for clarity. The command objects would've been for the event objects.

The struggle

We were trying to come up with a way to have a strongly typed event bus without passing around a lot of strings. I was arguing that the strings as addresses are common like JMS, RabbitMQ, HTTP URLs, WebsSocket URLS, etc. I like the idea of having a string as an address. But as my buddy argued, it just had a certain smell to it. We had events and OnEvent annotations sort of spread throughout the code base, and it was somewhat hard to see where events were sent from and sent to.
He was quite enamored with the idea of using a command object as the event object. And I was quite against it. I've been on too many projects where they're just a proliferation of command objects and the code became very hard to maintain and understand. It brought back memories which were not good memories. A big part of the discussion was why. Why do you want command objects? His reasons were sound: clarity and the ability to re-factor and comprehend the code base using standard tools that are readily available, i,e., your IDE. I think this back-and-forth brought out a better solution than either just using strings or using the command object. It was a meeting of the minds. It was a confluence. It was a successful brainstorming session. We avoided groupthink through constructive confrontation and no one got hurt.
Just to reiterate, he made the point that there was no way from the IDE to clearly see exactly where the events were coming from or going. We did a little bit of brainstorming. We didn't want to give up the integration capabilities with things like JMS, STOMP, Kafka, 0MQ, etc. for the event bus. But we also wanted a clear, clean way to associate events, their publishers and their consumers in a way that is IDE friendly. We view IDE friendly as developer friendly.

Typed channels are born

After some back-and-forth and some heated discussion about command objects and events objects, we came up with the concept of the typed channel. This does not replace the way that we did things before. It augments it. It extends it. Underneath the covers, QBit is still using string addresses, but if you do not provide a string address then QBit will use the fully qualified class name of the interface and it's methods as the event channels. You no longer have to use @OnEvent from your service, you could instead implement the channel interface.
Channels are unlike classical Java listeners in that the channel is both the interface to send the event from the producer and the interface to receive the event from the consumer. This is unlike anything we have seen before but I'm sure someone else has done it. I have a long history of thinking that I have come up with something very new & clever, then finding out that somebody wrote a college thesis about it 20 years ago.
A clear advantage to this strongly typed channel approach is you can use the IDE to quickly see who implements the methods of a Channel interface and then you get a list of all of your listeners. You can also see who calls the channel interface and then you get a list of all of your producers. When someone calls a channel interface or rather an implementation of a Channel interface, it is like they're calling each of the subscribers but of course the call is asynchronous.
We found through trial and error that it does not make sense to call a channel methodonEvent because it looks weird when you call it from the sender. It also does not make sense to calling a channel method something like sendEvent, because then it looks weird when you implement it in the event receiver. We've come up with the channel naming convention somethingHappened like rickSleptcarParkedbuildingBurned,transactionCompleted, etc. This does not look weird for the receiver which implements the interface or the sender that calls the channel. Naming is hard, but when you get it right the code makes more sense.

QBit Event Bus quick review

If you are new to QBit, fret not, here is some background information on the QBit Event Bus before we dive back into typed channels: QBit Java MicroService Lib Event Bus. This one shows examples of using @OnEvent: QBit Java MicroService Lib Event Bus using strongly typed interfaces. What we were calling strongly typed event bus interface, we are now simply calling event channels. Progress was made as described above. QBit event bus can be private per module or the shared system bus. QBit Event busses can be replicated over WebSocket. We even have integration with Consul to automatically cluster a QBit event bus and provide replication. The QBit event bus replicates by marshaling JSON calls in batches over WebSocket. QBit knows about the other nodes by using the microservice that provides service discovery called Consul. JSON allows QBit to implement a bus that uses the tolerant reader technique from microservices. QBit fully embraces microservices and its associated REST, WebSocket, JSON and dumb but fast pipes.

Example and test

Much easier to explain things with code.

unit test for this new feature

public class BoonEventChannelTestAndOnEvent extends TimedTesting {

    public  AtomicInteger eventCount = new AtomicInteger();

We define a new Service that sends messages. And then we defined some services that listen to Messages from the service that sends messages.

Event Channel

    @EventChannel
    interface MyChannelInterface {
        void somethingHappened(int i, String foo);
    }
The above is the event channel interface as you can see it takes a method calledsomethingHappened of course yours would take other types of methods liketransactionSuccessful or employeeHired etc. The method name is the thing that happened, It is the event. The @EventChannel annotation can be passed a channel name, but if it is not pasty name then the name becomes the fully qualified class name of the interface.
Next up we define the actual sender service. This service will use the channel to send events. Later will define three services that listen to the events from the above chance. Then we will evoke a method on the sender service and it will send a message over the bus that the three other methods are listening to.

Sender Service implementation and interface

    class MyServiceEventSender {

        final MyChannelInterface channel;

        MyServiceEventSender(MyChannelInterface channel) {
            this.channel = channel;
        }

        public void someServiceMethod() {
            channel.somethingHappened(1, "foo");
        }
    }

    interface MyServiceInterface {

        void someServiceMethod();
    }
Notice that we pass the channel to the constructor.
Now lets define two services that listen to this channel using the channel interface.

Two services that are listeners by implementing the channel interface

    class MyServiceEventReceiver implements MyChannelInterface{

        @Override
        public void somethingHappened(int i, String foo) {

            eventCount.incrementAndGet();
            System.out.println("MyServiceEventReceiver bar" + i + " foo " + foo);
        }
    }


    class MyServiceEventReceiver2 implements MyChannelInterface{

        @Override
        public void somethingHappened(int i, String foo) {

            eventCount.incrementAndGet();

            System.out.println("MyServiceEventReceiver2 bar2" + i + " foo " + foo);

        }
    }
Now let us define a third service that listens. This time the service will listen with the @OnEvent annotation.

Listening with @OnEvent annotation

    class MyServiceEventReceiver3 {

        @OnEvent("io.advantageous.qbit.events.impl.MyChannelInterface.somethingHappened")
        public void onSomethingHappened(int i, String foo) {

            eventCount.incrementAndGet();

            System.out.println("MyServiceEventReceiver2 bar2" + i + " foo " + foo);

        }
    }
Notice that we are using the fully qualified class name plus the event method as the address.

Now for our test, first wire up the services

    @Test
    public void test() throws Exception {

        eventCount.set(0);

        //Create the event bus and the channel
        final EventManager eventManager = QBit.factory().systemEventManager();
        final EventBusProxyCreator eventBusProxyCreator = QBit.factory().eventBusProxyCreator();
        /* Create a channel. */
        final MyChannelInterface channel = eventBusProxyCreator.createProxy(eventManager, MyChannelInterface.class);

        //Sender service, impl, serviceQueue and client proxy.
        /* Create the sender service. */
        final MyServiceEventSender serviceSender = new MyServiceEventSender(channel);
        /* Create the service queue for the sender. */
        final ServiceQueue serviceSenderQueue = serviceBuilder().setServiceObject(serviceSender).build();
        /* Create the client interface for the sender. */
        final MyServiceInterface serverSenderClient = serviceSenderQueue.createProxyWithAutoFlush(
                MyServiceInterface.class, 100, TimeUnit.MILLISECONDS);


        //Create the receiver services.
        final MyServiceEventReceiver receiverService = new MyServiceEventReceiver();
        final MyServiceEventReceiver2 receiverService2 = new MyServiceEventReceiver2();
        final MyServiceEventReceiver3 receiverService3 = new MyServiceEventReceiver3();
        final ServiceQueue receiverServiceQueue = serviceBuilder().setServiceObject(receiverService).build();
        final ServiceQueue receiverServiceQueue2 = serviceBuilder().setServiceObject(receiverService2).build();
        final ServiceQueue receiverServiceQueue3 = serviceBuilder().setServiceObject(receiverService3).build();


        /*Start the services. */
        serviceSenderQueue.start();
        receiverServiceQueue.start();
        receiverServiceQueue2.start();
        receiverServiceQueue3.start();

The above is all wiring and would typically be done in a spring configuration file or a guice module or in some application builder class.
Now we send the event and wait for all three listeners to get the message.

Test

        /* Now send a message with the client. */
        serverSenderClient.someServiceMethod();


        waitForTrigger(5, o -> eventCount.get() == 3);

        assertEquals(3, eventCount.get());
You can change the default names and the above test will still work

Changing event channel names

    @EventChannel ("FOO")
    interface MyChannelInterface {
        void somethingHappened(int i, String foo);
    }


    class MyServiceEventReceiver3 {


        @OnEvent("FOO.somethingHappened")
        public void onSomethingHappened(int i, String foo) {

            eventCount.incrementAndGet();

            System.out.println("MyServiceEventReceiver2 bar2" 
               + i + " foo " + foo);

        }
    }
Nothing else in the example changed. You have the strongly typed aspect and you have string addresses. Pie and cake and ice cream! Yummy! This is a good thing for integrating with the likes of JMS, ActiveMQ, RabbitMQ, 0MQ, and Kafka.
This would also work.

You can rename the channel as you see fit.

    @EventChannel ("FOO")
    static interface MyChannelInterface {

        @EventChannel ("bam")
        void somethingHappened(int i, String foo);
    }

    class MyServiceEventReceiver3 {


        @OnEvent("FOO.bam")
        public void onSomethingHappened(int i, String foo) {

            eventCount.incrementAndGet();

            System.out.println("MyServiceEventReceiver2 bar2" 
                 + i + " foo " + foo);

        }
    }
Keep in mind that none of the other classes had to change at all.

This one only works for OnEvent not the others

    static interface MyChannelInterface {

        @EventChannel ("FOO.bam")
        void somethingHappened(int i, String foo);
    }
If you are using the strongly typed channels, then you have to mark the class as @EventChannel so the above would still work for the @OnEvent listener but not the other service listeners.

Revisiting HR example

If you read the original article regarding channels, the you may recall the employee HR system with events. Let's revisit that example and use this new style of event management.
We have two channels, on like the old, and one like the new:

Event Channels in HR Example

    @EventChannel
    interface SalaryChangedChannel {


        void salaryChanged(Employee employee, int newSalary);

    }


    interface EmployeeEventManager {

        @EventChannel(NEW_HIRE_CHANNEL)
        void sendNewEmployee(Employee employee);



    }
A channel name is just the name passed to the event channel annotation method unless an annotation is defined at the class level then the name becomes classChannelName + "." + methodChannel name.
This method defines the naming behavior of channel names.
    public static String createChannelName(final String channelPrefix, final String classChannelNamePart, final String methodChannelNamePart) {


        //If Channel prefix is null then just use class channel name and method channel name
        if (channelPrefix == null) {

            //If the class channel name is null just return the method channel name.
            if (classChannelNamePart == null) {
                return methodChannelNamePart;
            } else {

                //Channel name takes the form ${classChannelNamePart.methodChannelNamePart}
                return Str.join('.', classChannelNamePart, methodChannelNamePart);
            }
        } else {
            //If classChannelNamePart null then channel name takes the form ${channelPrefix.methodChannelNamePart}
            if (classChannelNamePart == null) {
                return Str.join('.', channelPrefix, methodChannelNamePart);
            } else {
                //Nothing was null so the channel name takes the form ${channelPrefix.classChannelNamePart.methodChannelNamePart}
                return Str.join('.', channelPrefix, classChannelNamePart, methodChannelNamePart);
            }
        }
    }
The interface level name is only captured if the interface is annotated which is defined by this internal method:
    public static <T> String getClassEventChannelName(ClassMeta<T> classMeta, AnnotationData classAnnotation) {
        //They could even use enum as we are getting a string value

        String classEventBusName = classAnnotation != null
                && classAnnotation.getValues().get("value") !=null ? classAnnotation.getValues().get("value").toString() : null;

        if (Str.isEmpty(classEventBusName) && classAnnotation !=null) {
            classEventBusName = classMeta.longName();
        }
        return classEventBusName;
    }
Here is our EmployeeHiringService example revisited.

EmployeeHiringService

    public static class EmployeeHiringService {

        final EmployeeEventManager eventManager;
        final SalaryChangedChannel salaryChangedChannel;

        public EmployeeHiringService(final EmployeeEventManager employeeEventManager,
                                     final SalaryChangedChannel salaryChangedChannel) {
            this.eventManager = employeeEventManager;
            this.salaryChangedChannel = salaryChangedChannel;
        }


        @QueueCallback(QueueCallbackType.EMPTY)
        private void noMoreRequests() {


            flushServiceProxy(salaryChangedChannel);
            flushServiceProxy(eventManager);
        }


        @QueueCallback(QueueCallbackType.LIMIT)
        private void hitLimitOfRequests() {

            flushServiceProxy(salaryChangedChannel);
            flushServiceProxy(eventManager);
        }


        public void hireEmployee(final Employee employee) {

            int salary = 100;
            System.out.printf("Hired employee %s\n", employee);

            //Does stuff to hire employee


            eventManager.sendNewEmployee(employee);
            salaryChangedChannel.salaryChanged(employee, salary);


        }

    }
Then the we just change the payroll service to implement the channel (it was using the @OnEvent):

Payroll Service

    public static class PayrollService implements SalaryChangedChannel{

        @Override
        public void salaryChanged(Employee employee, int newSalary) {
            System.out.printf("DIRECT FROM CHANNEL SalaryChangedChannel Employee added 
                    to payroll  %s %d %d\n",
                    employee.getFirstName(), employee.getEmployeeId(), newSalary);

        }
    }




What is QBit again?

QBit is a queuing library for microservices. It is similar to many other projects like Akka, Spring Reactor, etc. QBit is just a library not a platform. QBit has libraries to put a service behind a queue. You can use QBit queues directly or you can create a service. QBit services can be exposed by WebSocket, HTTP, HTTP pipeline, and other types of remoting. A service in QBit is a Java class whose methods are executed behind service queues. QBit implements apartment model threading and is similar to the Actor model or a better description would be Active Objects. QBit does not use a disruptor. It uses regular Java Queues. QBit can do north of 100 million ping pong calls per second which is an amazing speed (seen as high as 200M). QBit also supports calling services via REST, and WebSocket. QBit is microservices in the pure Web sense: JSON, HTTP, WebSocket, etc. QBit uses micro batching to push messages through the pipe (queue, IO, etc.) faster to reduce thread hand-off.

QBit lingo

QBit is a Java microservice lib supporting REST, JSON and WebSocket. It is written in Java but we could one day write a version in Rust or Go or C# (but that would require a large payday).
Service POJO (plain old Java object) behind a queue that can receive method calls via proxy calls or events (May have one thread managing events, method calls, and responses or two one for method calls and events and the other for responses so response handlers do not block service. One is faster unless responses block). Services can use Spring MVC style REST annotations to expose themselves to the outside world via REST and WebSocket.
ServiceBundle Many POJOs behind one response queue and many receive queues. There may be one thread for all responses or not. They also can be one receive queue.
Queue A thread managing a queue. It supports batching. It has events for empty, reachedLimit, startedBatch, idle. You can listen to these events from services that sit behind a queue. You don't have to use Services. You can use Queue's direct. In QBit, you have sender queues and receivers queues. They are separated to support micro-batching.
ServiceServer ServiceBundle that is exposed to REST and WebSocket communication.
EventBus EventBus is a way to send a lot of messages to services that may be loosely coupled.
ClientProxy ClientProxy is a way to invoke service through async interface, service can be inproc (same process) or remoted over WebSocket.
Non-blocking QBit is a non-blocking lib. You use CallBacks via Java 8 Lambdas. You can also send event messages and get replies. Messaging is built into the system so you can easily coordinate complex tasks. QBit takes an object-oriented approach to service development so services look like normal Java services that you already write, but the services live behind a queue/thread. This is not a new concept. Microsoft did this with DCOM/COM and called it active objects. Akka does it with actors and called them strongly typed Actors. The important concepts is that you get the speed of reactive and actor style messaging but you develop in a natural OOP approach. QBit is not the first. QBit is not the only.
Speed QBit is VERY fast. There is a of course a lot of room for improvement. But already 200M+ TPS inproc ping pong, 10M-20M+ TPS event bus, 500K TPS RPC calls over WebSocket/JSON, etc. More work needs to be done to improve speed, but now it is fast enough where we are focusing more on usability. The JSON support uses Boon by default which is up to 4x faster than other JSON parsers for the REST/JSON, WebSocket/JSON use case.

To learn more about QBit see the following:

  • Home
  • [Detailed Tutorial] QBit microservice example
  • [Detailed Tutorial] Building a single page; Todo List Application with QBit
  • [Detailed Tutorial] Working with inproc MicroServices within QBit.
  • [Doc] QBit Java microservice lib auto flushing service queue proxies
  • [Doc] QBit Java microservice lib introducing EventBus replication and EventBus connectors
  • [Doc] QBit microservice Java lib Using auto flushing client proxies
  • [Doc] QBit Microservice WebSocket wire protocol draft 01 JSON ASCII method batching RPC
  • [Doc] Queue Callbacks for QBit queue based services
  • [Doc] Using QBit microservice lib's HttpClient GET, POST, et al, JSON, Java 8 Lambda
  • [Doc] Using QBit microservice lib's WebSocket support
  • [Quick Start] Building a simple Rest web microservice server with QBit
  • [Quick Start] Building a single page; Todo List Application with QBit
  • [Quick Start] building a single web page application with QBit
  • [Quick Start] Building a TODO web microservice client with QBit
  • [Quick Start] Building a TODO web microservice server with QBit
  • [Quick Start] Building boon for the QBit microservice engine
  • [Quick Start] Building QBit the microservice lib for Java
  • [Quick Start] Working with inproc MicroServices within QBit
  • [Rough Cut] Delivering up Single Page Applications from QBit Java JSON Microservice lib
  • [Rough Cut] QBit Java Microservice Lib Working With Workers Sharded and Pooled
  • [Rough Cut] QBit Microservice Lib Working With CallBacks
  • [Rough Cut] QBit Microservices using Service Workers and sharded service workers
  • [Rough Cut] Using QBit microservice lib with Spring Boot
  • [Rough Cut] Using QBit microservice lib's REST support with URI Params
  • [Rough Cut] Working with event bus for QBit the microservice engine
  • [Rough Cut] Working with Event Channels and strongly typed event bus with the QBit Microservice Java, JSON, WebSocket Lib
  • [Rough Cut] Working with inproc MicroServices
  • [Rough Cut] Working with private event bus for inproc microservices
  • [Rough Cut] Working with strongly typed event bus proxies for QBit Java Microservice lib
  • [Rough Cut] Working with System Manager for QBit Mircoservice lib
  • [Z Blog] Qbit servlet support also a preview of using jetty and the QBit microservice lib together
  • [Z Blog] Using Consul from QBit, the Java, JSON, WebSocket and REST microservice library, for health monitoring, clustering and service discovery
  • [Z Notebook] More benchmarking internal
  • [Z Notebook] Performance testing for REST
  • [Z Notebook] Roadmap
  • Introduction to QBit
  • Local Service Proxies
  • OLD Home
  • QBit Boon New Wave of JSON HTTP and Websocket
  • QBit Docs

  • Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training