Rick

Rick
Rick

Friday, November 27, 2015

QBit Microservices Lib: EventBus using Consul and QBit to wire together an event bus

EventBus using Consul and QBit to wire together an event bus



QBit, the microservices library for Java, has an event system. You have likely seen it if you have read through the QBit documents.
QBit allows the event bus to be connected to remote instance of QBit forming a cluster.
It does this through the ServiceDiscovery that QBit provides.
By default the EventBusClusterBuilder will use the ConsulServiceDiscoveryBuilder if you do not provide it a ServiceDiscovery. You can read more about Consul and Service Discovery.
To construct an event bus, you first start up Consul.

Using Consul for Microservice Service Discovery

consul agent -server -bootstrap-expect 1 -dc dc1 -data-dir /tmp/consulqbit -ui-dir ./support/ui/
Next you use the EventBusClusterBuilder to construct an event bus cluster as follows:

EventBusClusterBuilder

        final EventBusClusterBuilder eventBusClusterBuilder = EventBusClusterBuilder.eventBusClusterBuilder();
        eventBusClusterBuilder.setEventBusName("event-bus");
        eventBusClusterBuilder.setReplicationPortLocal(replicatorPort);
        final EventBusCluster eventBusCluster = eventBusClusterBuilder.build();
        eventBusCluster.start();
Then you inject eventBusCluster event manager into builders.

Inject the event manager from EventBusCluster into service builders

        final ManagedServiceBuilder managedServiceBuilder = ManagedServiceBuilder
                .managedServiceBuilder().setRootURI("/")
                .setEventManager(eventBusCluster.eventManagerImpl())
                .setPort(webPort);
You inject client proxies of the event manager into other services.

inject client proxies of the event manager into other services.

        final EventExampleService eventExampleService = new EventExampleService(
                eventBusCluster.createClientEventManager(),
                "event.",
                ReactorBuilder.reactorBuilder().build(),
                Timer.timer(),
                managedServiceBuilder.getStatServiceBuilder().buildStatsCollector());

        managedServiceBuilder.addEndpointService(eventExampleService);
Here is a complete example:

Complete example

package io.advantageous.qbit.example.event.bus;


import io.advantageous.qbit.admin.ManagedServiceBuilder;
import io.advantageous.qbit.annotation.Listen;
import io.advantageous.qbit.annotation.RequestMapping;
import io.advantageous.qbit.annotation.http.DELETE;
import io.advantageous.qbit.annotation.http.GET;
import io.advantageous.qbit.annotation.http.POST;
import io.advantageous.qbit.eventbus.EventBusCluster;
import io.advantageous.qbit.eventbus.EventBusClusterBuilder;
import io.advantageous.qbit.events.EventManager;
import io.advantageous.qbit.events.EventManagerBuilder;
import io.advantageous.qbit.reactive.Reactor;
import io.advantageous.qbit.reactive.ReactorBuilder;
import io.advantageous.qbit.service.BaseService;
import io.advantageous.qbit.service.stats.StatsCollector;
import io.advantageous.qbit.util.Timer;

import java.util.ArrayList;
import java.util.List;

/**
 * curl http://localhost:8080/event/
 * curl -X POST  -H "Content-Type: application/json"  http://localhost:8080/event -d '{"id":"123", "message":"hello"}'
 */
@RequestMapping("/")
public class EventExampleService extends BaseService{

    private final EventManager eventManager;
    private final List<MyEvent> events = new ArrayList<>();

    public EventExampleService(final EventManager eventManager,
                               final String statKeyPrefix,
                               final Reactor reactor,
                               final Timer timer,
                               final StatsCollector statsCollector) {
        super(statKeyPrefix, reactor, timer, statsCollector);
        this.eventManager = eventManager;
        reactor.addServiceToFlush(eventManager);
    }

    @POST("/event")
    public boolean sendEvent(MyEvent event) {
        eventManager.sendArguments("myevent", event);
        return true;
    }


    @DELETE("/event/")
    public boolean clearEvents() {
         events.clear();
         return true;
    }

    @GET("/event/")
    public List<MyEvent> getEvents() {
        return events;
    }

    @Listen("myevent")
    public void listenEvent(final MyEvent event) {
        events.add(event);
    }

    public static void run(final int webPort, final int replicatorPort)  {

        final EventBusClusterBuilder eventBusClusterBuilder = EventBusClusterBuilder.eventBusClusterBuilder();
        eventBusClusterBuilder.setEventBusName("event-bus");
        eventBusClusterBuilder.setReplicationPortLocal(replicatorPort);
        final EventBusCluster eventBusCluster = eventBusClusterBuilder.build();
        eventBusCluster.start();


        final ManagedServiceBuilder managedServiceBuilder = ManagedServiceBuilder
                .managedServiceBuilder().setRootURI("/")
                .setEventManager(eventBusCluster.eventManagerImpl())
                .setPort(webPort);

        final EventExampleService eventExampleService = new EventExampleService(
                eventBusCluster.createClientEventManager(),
                "event.",
                ReactorBuilder.reactorBuilder().build(),
                Timer.timer(),
                managedServiceBuilder.getStatServiceBuilder().buildStatsCollector());
        managedServiceBuilder.addEndpointService(eventExampleService);

        managedServiceBuilder.getEndpointServerBuilder().build().startServerAndWait();



    }
    public static void main(final String... args)  {

        run(8080, 7070);

    }
}

....

package io.advantageous.qbit.example.event.bus;

public class MyEvent {

    private String id;
    private String message;


    public String getId() {
        return id;
    }

    public MyEvent setId(String id) {
        this.id = id;
        return this;
    }

    public String getMessage() {
        return message;
    }

    public MyEvent setMessage(String message) {
        this.message = message;
        return this;
    }
}
...
package io.advantageous.qbit.example.event.bus;

public class SecondEventExample {
    public static void main(final String... args)  {

        EventExampleService.run(6060, 5050);

    }

}

....
package io.advantageous.qbit.example.event.bus;

public class ThirdEventExample {

    public static void main(final String... args)  {
        EventExampleService.run(4040, 3030);
    }
}

To run this run Consul, EventExampleService, ThirdEventExample, and SecondEventExample, then use the following curl commands.

Curl commands to exercise example.

## Send an event
$ curl -X POST  -H "Content-Type: application/json"  http://localhost:8080/event -d '{"id":"123", "message":"hello"}'
true

## See the event is on each of the nodes.
$ curl http://localhost:8080/event/
[{"id":"123","message":"hello"}]

$ curl http://localhost:6060/event/
[{"id":"123","message":"hello"}]

$ curl http://localhost:4040/event/
[{"id":"123","message":"hello"}]

No comments:

Post a Comment

Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training