QBit event channels are unlike classical Java event listeners. The channel is both the interface to send the event from the producer and the interface to receive the event. A clear advantage to this strongly typed channel approach is you can use the IDE to quickly see what implements the channel methods, i.e., event listeners and what calls it to see all of your event producers (call is asynchronous). QBit event channels are part of QBit's event bus system which uses a microservice approach for communication and Consul for clustering and JSON and WebSocket for high-speed replication that is reader tolerant.
Event channels are now strongly typed interfaces which are IDE friendly
So I was on my way home from the city yesterday and me and my buddy discussed using command objects as event objects in an event bus so the type of the command object becomes the channel, mostly for clarity. The command objects would've been for the event objects.
The struggle
We were trying to come up with a way to have a strongly typed event bus without passing around a lot of strings. I was arguing that the strings as addresses are common like JMS, RabbitMQ, HTTP URLs, WebsSocket URLS, etc. I like the idea of having a string as an address. But as my buddy argued, it just had a certain smell to it. We had events and OnEvent annotations sort of spread throughout the code base, and it was somewhat hard to see where events were sent from and sent to.
He was quite enamored with the idea of using a command object as the event object. And I was quite against it. I've been on too many projects where they're just a proliferation of command objects and the code became very hard to maintain and understand. It brought back memories which were not good memories. A big part of the discussion was why. Why do you want command objects? His reasons were sound: clarity and the ability to re-factor and comprehend the code base using standard tools that are readily available, i,e., your IDE. I think this back-and-forth brought out a better solution than either just using strings or using the command object. It was a meeting of the minds. It was a confluence. It was a successful brainstorming session. We avoided groupthink through constructive confrontation and no one got hurt.
Just to reiterate, he made the point that there was no way from the IDE to clearly see exactly where the events were coming from or going. We did a little bit of brainstorming. We didn't want to give up the integration capabilities with things like JMS, STOMP, Kafka, 0MQ, etc. for the event bus. But we also wanted a clear, clean way to associate events, their publishers and their consumers in a way that is IDE friendly. We view IDE friendly as developer friendly.
Typed channels are born
After some back-and-forth and some heated discussion about command objects and events objects, we came up with the concept of the typed channel. This does not replace the way that we did things before. It augments it. It extends it. Underneath the covers, QBit is still using string addresses, but if you do not provide a string address then QBit will use the fully qualified class name of the interface and it's methods as the event channels. You no longer have to use @OnEvent from your service, you could instead implement the channel interface.
Channels are unlike classical Java listeners in that the channel is both the interface to send the event from the producer and the interface to receive the event from the consumer. This is unlike anything we have seen before but I'm sure someone else has done it. I have a long history of thinking that I have come up with something very new & clever, then finding out that somebody wrote a college thesis about it 20 years ago.
A clear advantage to this strongly typed channel approach is you can use the IDE to quickly see who implements the methods of a Channel interface and then you get a list of all of your listeners. You can also see who calls the channel interface and then you get a list of all of your producers. When someone calls a channel interface or rather an implementation of a Channel interface, it is like they're calling each of the subscribers but of course the call is asynchronous.
We found through trial and error that it does not make sense to call a channel methodonEvent because it looks weird when you call it from the sender. It also does not make sense to calling a channel method something like sendEvent, because then it looks weird when you implement it in the event receiver. We've come up with the channel naming convention somethingHappened like rickSlept, carParked, buildingBurned,transactionCompleted, etc. This does not look weird for the receiver which implements the interface or the sender that calls the channel. Naming is hard, but when you get it right the code makes more sense.
QBit Event Bus quick review
Example and test
Much easier to explain things with code.
unit test for this new feature
public class BoonEventChannelTestAndOnEvent extends TimedTesting {
public AtomicInteger eventCount = new AtomicInteger();
We define a new Service that sends messages. And then we defined some services that listen to Messages from the service that sends messages.
Event Channel
@EventChannel
interface MyChannelInterface {
void somethingHappened(int i, String foo);
}
The above is the event channel interface as you can see it takes a method calledsomethingHappened
of course yours would take other types of methods liketransactionSuccessful
or employeeHired
etc. The method name is the thing that happened, It is the event. The @EventChannel
annotation can be passed a channel name, but if it is not pasty name then the name becomes the fully qualified class name of the interface.
Next up we define the actual sender service. This service will use the channel to send events. Later will define three services that listen to the events from the above chance. Then we will evoke a method on the sender service and it will send a message over the bus that the three other methods are listening to.
Sender Service implementation and interface
class MyServiceEventSender {
final MyChannelInterface channel;
MyServiceEventSender(MyChannelInterface channel) {
this.channel = channel;
}
public void someServiceMethod() {
channel.somethingHappened(1, "foo");
}
}
interface MyServiceInterface {
void someServiceMethod();
}
Notice that we pass the channel to the constructor.
Now lets define two services that listen to this channel using the channel interface.
Two services that are listeners by implementing the channel interface
class MyServiceEventReceiver implements MyChannelInterface{
@Override
public void somethingHappened(int i, String foo) {
eventCount.incrementAndGet();
System.out.println("MyServiceEventReceiver bar" + i + " foo " + foo);
}
}
class MyServiceEventReceiver2 implements MyChannelInterface{
@Override
public void somethingHappened(int i, String foo) {
eventCount.incrementAndGet();
System.out.println("MyServiceEventReceiver2 bar2" + i + " foo " + foo);
}
}
Now let us define a third service that listens. This time the service will listen with the @OnEvent annotation.
Listening with @OnEvent annotation
class MyServiceEventReceiver3 {
@OnEvent("io.advantageous.qbit.events.impl.MyChannelInterface.somethingHappened")
public void onSomethingHappened(int i, String foo) {
eventCount.incrementAndGet();
System.out.println("MyServiceEventReceiver2 bar2" + i + " foo " + foo);
}
}
Notice that we are using the fully qualified class name plus the event method as the address.
Now for our test, first wire up the services
@Test
public void test() throws Exception {
eventCount.set(0);
//Create the event bus and the channel
final EventManager eventManager = QBit.factory().systemEventManager();
final EventBusProxyCreator eventBusProxyCreator = QBit.factory().eventBusProxyCreator();
/* Create a channel. */
final MyChannelInterface channel = eventBusProxyCreator.createProxy(eventManager, MyChannelInterface.class);
//Sender service, impl, serviceQueue and client proxy.
/* Create the sender service. */
final MyServiceEventSender serviceSender = new MyServiceEventSender(channel);
/* Create the service queue for the sender. */
final ServiceQueue serviceSenderQueue = serviceBuilder().setServiceObject(serviceSender).build();
/* Create the client interface for the sender. */
final MyServiceInterface serverSenderClient = serviceSenderQueue.createProxyWithAutoFlush(
MyServiceInterface.class, 100, TimeUnit.MILLISECONDS);
//Create the receiver services.
final MyServiceEventReceiver receiverService = new MyServiceEventReceiver();
final MyServiceEventReceiver2 receiverService2 = new MyServiceEventReceiver2();
final MyServiceEventReceiver3 receiverService3 = new MyServiceEventReceiver3();
final ServiceQueue receiverServiceQueue = serviceBuilder().setServiceObject(receiverService).build();
final ServiceQueue receiverServiceQueue2 = serviceBuilder().setServiceObject(receiverService2).build();
final ServiceQueue receiverServiceQueue3 = serviceBuilder().setServiceObject(receiverService3).build();
/*Start the services. */
serviceSenderQueue.start();
receiverServiceQueue.start();
receiverServiceQueue2.start();
receiverServiceQueue3.start();
The above is all wiring and would typically be done in a spring configuration file or a guice module or in some application builder class.
Now we send the event and wait for all three listeners to get the message.
Test
/* Now send a message with the client. */
serverSenderClient.someServiceMethod();
waitForTrigger(5, o -> eventCount.get() == 3);
assertEquals(3, eventCount.get());
You can change the default names and the above test will still work
Changing event channel names
@EventChannel ("FOO")
interface MyChannelInterface {
void somethingHappened(int i, String foo);
}
class MyServiceEventReceiver3 {
@OnEvent("FOO.somethingHappened")
public void onSomethingHappened(int i, String foo) {
eventCount.incrementAndGet();
System.out.println("MyServiceEventReceiver2 bar2"
+ i + " foo " + foo);
}
}
Nothing else in the example changed. You have the strongly typed aspect and you have string addresses. Pie and cake and ice cream! Yummy! This is a good thing for integrating with the likes of JMS, ActiveMQ, RabbitMQ, 0MQ, and Kafka.
This would also work.
You can rename the channel as you see fit.
@EventChannel ("FOO")
static interface MyChannelInterface {
@EventChannel ("bam")
void somethingHappened(int i, String foo);
}
class MyServiceEventReceiver3 {
@OnEvent("FOO.bam")
public void onSomethingHappened(int i, String foo) {
eventCount.incrementAndGet();
System.out.println("MyServiceEventReceiver2 bar2"
+ i + " foo " + foo);
}
}
Keep in mind that none of the other classes had to change at all.
This one only works for OnEvent not the others
static interface MyChannelInterface {
@EventChannel ("FOO.bam")
void somethingHappened(int i, String foo);
}
If you are using the strongly typed channels, then you have to mark the class as @EventChannel so the above would still work for the @OnEvent listener but not the other service listeners.
Revisiting HR example
If you read the
original article regarding channels, the you may recall the employee HR system with events. Let's revisit that example and use this new style of event management.
We have two channels, on like the old, and one like the new:
Event Channels in HR Example
@EventChannel
interface SalaryChangedChannel {
void salaryChanged(Employee employee, int newSalary);
}
interface EmployeeEventManager {
@EventChannel(NEW_HIRE_CHANNEL)
void sendNewEmployee(Employee employee);
}
A channel name is just the name passed to the event channel annotation method unless an annotation is defined at the class level then the name becomes classChannelName + "." + methodChannel name.
This method defines the naming behavior of channel names.
public static String createChannelName(final String channelPrefix, final String classChannelNamePart, final String methodChannelNamePart) {
//If Channel prefix is null then just use class channel name and method channel name
if (channelPrefix == null) {
//If the class channel name is null just return the method channel name.
if (classChannelNamePart == null) {
return methodChannelNamePart;
} else {
//Channel name takes the form ${classChannelNamePart.methodChannelNamePart}
return Str.join('.', classChannelNamePart, methodChannelNamePart);
}
} else {
//If classChannelNamePart null then channel name takes the form ${channelPrefix.methodChannelNamePart}
if (classChannelNamePart == null) {
return Str.join('.', channelPrefix, methodChannelNamePart);
} else {
//Nothing was null so the channel name takes the form ${channelPrefix.classChannelNamePart.methodChannelNamePart}
return Str.join('.', channelPrefix, classChannelNamePart, methodChannelNamePart);
}
}
}
The interface level name is only captured if the interface is annotated which is defined by this internal method:
public static <T> String getClassEventChannelName(ClassMeta<T> classMeta, AnnotationData classAnnotation) {
//They could even use enum as we are getting a string value
String classEventBusName = classAnnotation != null
&& classAnnotation.getValues().get("value") !=null ? classAnnotation.getValues().get("value").toString() : null;
if (Str.isEmpty(classEventBusName) && classAnnotation !=null) {
classEventBusName = classMeta.longName();
}
return classEventBusName;
}
Here is our EmployeeHiringService example revisited.
EmployeeHiringService
public static class EmployeeHiringService {
final EmployeeEventManager eventManager;
final SalaryChangedChannel salaryChangedChannel;
public EmployeeHiringService(final EmployeeEventManager employeeEventManager,
final SalaryChangedChannel salaryChangedChannel) {
this.eventManager = employeeEventManager;
this.salaryChangedChannel = salaryChangedChannel;
}
@QueueCallback(QueueCallbackType.EMPTY)
private void noMoreRequests() {
flushServiceProxy(salaryChangedChannel);
flushServiceProxy(eventManager);
}
@QueueCallback(QueueCallbackType.LIMIT)
private void hitLimitOfRequests() {
flushServiceProxy(salaryChangedChannel);
flushServiceProxy(eventManager);
}
public void hireEmployee(final Employee employee) {
int salary = 100;
System.out.printf("Hired employee %s\n", employee);
//Does stuff to hire employee
eventManager.sendNewEmployee(employee);
salaryChangedChannel.salaryChanged(employee, salary);
}
}
Then the we just change the payroll service to implement the channel (it was using the @OnEvent):
Payroll Service
public static class PayrollService implements SalaryChangedChannel{
@Override
public void salaryChanged(Employee employee, int newSalary) {
System.out.printf("DIRECT FROM CHANNEL SalaryChangedChannel Employee added
to payroll %s %d %d\n",
employee.getFirstName(), employee.getEmployeeId(), newSalary);
}
}
QBit is a queuing library for microservices. It is similar to many other projects like Akka, Spring Reactor, etc. QBit is just a library not a platform. QBit has libraries to put a service behind a queue. You can use QBit queues directly or you can create a service. QBit services can be exposed by WebSocket, HTTP, HTTP pipeline, and other types of remoting. A service in QBit is a Java class whose methods are executed behind service queues. QBit implements apartment model threading and is similar to the Actor model or a better description would be Active Objects. QBit does not use a disruptor. It uses regular Java Queues. QBit can do north of 100 million ping pong calls per second which is an amazing speed (seen as high as 200M). QBit also supports calling services via REST, and WebSocket. QBit is microservices in the pure Web sense: JSON, HTTP, WebSocket, etc. QBit uses micro batching to push messages through the pipe (queue, IO, etc.) faster to reduce thread hand-off.
QBit is a Java microservice lib supporting REST, JSON and WebSocket. It is written in Java but we could one day write a version in Rust or Go or C# (but that would require a large payday).
No comments:
Post a Comment