Rick

Rick
Rick

Monday, March 16, 2015

Using Consul from Java for QBit MicroService lib

Using Consul from QBit, the Java, JSON, WebSocket and REST microservice library, for health monitoring, clustering and service discovery

QBit is a microservice framework that focuses on easy to use and develop services written in Java that can be exposed via WebSocket/JSOn and REST/JSON interfaces.
Consul is a service discovery system that provides a microservice style interface to services, service topology and service health.
Early steps have been taken to integrate QBit with Consul.

Consul

Consul provides, service discovery, health monitoring and config services.

With service discovery you can look up services which are organized in the topology of your datacenters. Consul uses client agents and RAFT to provide a consistent view of services. Consul provides a consistent view of configuration as well also using RAFT. Consul provides a microservice interface to a replicated view of your service topology and its configuration. Consul can monitor and change services topology based on health of individual nodes.
Consul provides scalable distributed health checks. Consul only does minimal datacenter to datacenter communication so each datacenter has its own Consul cluster. Consul provides a domain model for managing topology of datacenters, server nodes, and services running on server nodes along with their configuration and current health status.
Consul is like combining the features of a DNS server plus Consistent Key/Value Store like etcd plus features of ZooKeeper for service discovery, and health monitoring like Nagios but all rolled up into a consistent system. Essentially, Consul is all the bits you need to have a coherent domain service model available to provide service discovery, health and replicated config, service topology and health status. Consul also provides a nice REST interface and Web UI to see your service topology and distributed service config.
Consul organizes your services in a Catalog called the Service Catalog and then provides a DNS and REST/HTTP/JSON interface to it.
To use Consul you start up an agent process. The Consul agent process is a long running daemon on every member of Consul cluster. The agent process can be run in server mode or client mode. Consul agent clients would run on every physical server or OS virtual machine (if that makes more sense). Client runs on server hosting services. The clients use gossip and RPC calls to stay in sync with Consul.
A client, consul agent running in client mode, forwards request to a server, consul agent running in server mode. Clients are mostly stateless. The client does LAN gossip to the server nodes to communicate changes.
A server, consul agent running in server mode, is like a client agent but with more tasks. The consul servers use the RAFT quorum mechanism to see who is the leader. The consul servers maintain cluster state like the Service Catalog. The leader manages a consistent view of config key/value pairs, and service health and topology. Consul servers also handle WAN gossip to other datacenters. Consul server nodes forwards queries to leader, and forward queries to other datacenters.
A Datacenter is fairly obvious. It is anything that allows for fast communication between nodes, with as few or no hops, little or no routing, and in short: high speed communication. This could be an Amazon EC2 availability zone, a networking environment like a subnet, or any private, low latency, high bandwidth network. You can decide exactly what datacenter means for your organization.
Consul server nodes use consensus to determine who is the leader. They have agreement on who is the leader. They use a transactional finite state machine (FSM) to make sure all server nodes are in lock step with critical tasks. They also employ a replicated log, FSM, peer set (who gets the replicated log), quorum (majority of peers agree), committed entry, and a leader. The end result is consistent view of configuration and live services and their topology.
Consul is built on top of Serf. Serf is a full gossip protocol and provides membership, failure detection, and event broadcast mechanisms. Serf provides the clustering for the server nodes. Consul uses LAN gossip for client and servers in the same local network or datacenter. Consul uses WAN Gossip but it is only for servers.
With Consul you run 3 to five servers per datacenter. You use WAN gossip to communicate over Internet or wide area networks. Consul also provides an RPC - Remote Procedure Call which is a request / response mechanism allowing a client to make a request of a server to for example join a cluster, leave a cluster, etc.
Consul client agents maintain its set of service, check registrations and health information. Clients also update health checks and local state.
Local agent state (agent client node) is different than catalog state (agent server node). Local Agent notifies Catalog of changes. Updates are synced right away from client agent to the services Catalog which is stored in the triad of servers.
The local agents (agents running in client mode) check to make sure its view of the world matches the catalog, and if not the catalog is updated. An Example would be an client registers a new service check with a client agent, then the client agent notifies the consul server to update catalog that this service check exists. When a check is removed from the local client agent, it is removed from catalog which exists in the cluster of consul servers.
If the agents run health checks, and status changes, then update is sent to catalog. Agent is the authority of that node (client and server), and the services that exist on that node. The Agent scope is the Server or Virtual Machine running the agent. The Catalog scope is a single datacenter or local area network or EC2 availability zone. Changes are also synchronized when a change is made and periodically every minute to ten minutes depending on the size of the cluster.
Consul provides the following REST endpoints for interacting with Consul:
  • kv - key value
  • agent - api for dealing with agent
  • catalog - dealing with datacenter catalog
  • health - show health checks
  • sessions - group operations and manage consistent view
  • events - fire fast gossip based events
  • acl - setup access control lists and security for Consul
  • status - check status
Each endpoint provides operations which either take JSON, request params and deliver JSON responses. In addition to programmatically configuring Consul via REST, you can use JSON config files stored locally to bootstrap config and service topology.
Consul provides a series of health checks. The most common is called the Dead man switch or
time to live health check, the service has to dial in with status update every X time period. The other health check is the HTTP ping every N period of time where HTTP code 200-299 means PASS, HTTP code 429 is WARN and any other HTTP status code is FAIL. Lastly you can run a script every with the client agent (or server agent). If the process returns 0, then the check passes, 1 and it is considered a warning, any other process return value is considered a FAIL.

QBit Microservice Library for Java using Consul

QBit is a actor/reactive style Java microservice engine that provides an event bus, REST and WebSocket using JSON.
QBit added the ability to cluster servers running QBit with Consul for its event bus.
At first we started looking at Consul Client which is from Orbitz. This seemed to be the most complete version of a Java client API for Consul and the one most like the kind we would write. After some further investigation and trial and error, we decided that adding 13+ extra jar files for doing JSON parsing and HTTP calls was a bit much for a framework like QBit which does JSON parsing, marshaling and HTTP calls. Therefore we forked the code, ripped out the HTTP lib it was using and the JSON parser and a few other random things and used QBit equivalent. There was nothing wrong with the libs the Consul Client lib was using, we just did not want an extra 13 jar files for something so intrinsic to QBit. You can find the QBit version of Consul Client as a subproject of QBit, the microservice, JSON, WebSocket, REST library for Java.
Qbit has an event bus already. It can be remote via WebSocket and REST. It can be replicated via WebSocket. But the QBit event bus requires mostly programatic configuration. You could in theory have a config file and based on this config file, configure nodes. This is basically what we are doing but we are using the Consul Client lib for this.
Prior to the integration with Consul, QBit had a EventRemoteReplicatorService and aEventBusReplicationClientBuilder which configured a remote EventConnector that talks to the EventRemoteReplicatorService. The EventRemoteReplicatorService is a WebSocket endpoint for method invocations and is an EventConnector. AnEventConnector interface defines a method for passing events between event busses. In the core of QBit there is the EventConnectorHub, which can take a collection ofEventConnector and provides a single EventConnector interface for the entire collection. An EventManager manages and EventBus, and it takes an EventConnectorwhich can be an EventConnectorHub.
To this effort we added an EventBusRing which talks to Consul and periodically adds or removes members from an EventConnectorHub based on changes in the services topology from Consul.
EventBusRingBuilder is used to construct EventBusRing. You can pass theEventBusRingBuilder an existing EventManager or it can create a new EventManager. With the EventBusRingBuilder you can specify where Consul is located, how often we should poll Consul, how long the long poll to Consul should be for topology changes and how often we should ping Consul to say that this event bus node is still alive as well as set the name of the event bus.

EventBusRing

public class **EventBusRing** implements Startable, Stoppable {

    …
    private AtomicReference<Consul> consul = new AtomicReference<>();

    private AtomicInteger lastIndex = new AtomicInteger();


    public EventBusRing(…) {
        this.consul.set(Consul.consul(consulHost, consulPort));
In the start method, we start the local service queue, we start consul, which starts the Qbit HttpClient class that talks REST/JSON to Consul. We also start the replicator service by calling startServerReplicator which exposes a WebSocket endpoint that other instances running EventBusRing can talk to, i.e., it starts aEventRemoteReplicatorService microservice that speaks WebSocket and QBit/JSON wire protocol. Then we register the local event bus with Consul by callingregisterLocalBusInConsul.

EventBusRing start

    public void start() {

        consul.get().start();

        if (eventServiceQueue !=null) {
            eventServiceQueue.start();
        }

        startServerReplicator();

        registerLocalBusInConsul();

        healthyNodeMonitor = periodicScheduler.repeat(this::healthyNodeMonitor, peerCheckTimeInterval, peerCheckTimeUnit);

        if (replicationServerCheckInIntervalInSeconds > 2) {
            consulCheckInMonitor  = periodicScheduler.repeat(this::checkInWithConsul, replicationServerCheckInIntervalInSeconds / 2, TimeUnit.SECONDS);
        } else {
            consulCheckInMonitor  = periodicScheduler.repeat(this::checkInWithConsul, 100, TimeUnit.MILLISECONDS);
        }
    }
Notice that we start up two scheduled set of tasks. Using the QBit periodicSchedulerwhich is a timer to schedule tasks. We periodically call healthyNodeMonitor andcheckInWithConsul. The healthyNodeMonitor method does a long poll operation to Consul to check for changes to the list of healthy services. The checkInWithConsul uses Consul dead man switch (time to live health check) to notify Consul that this service, this event bus, is still alive.
The registerLocalBusInConsul calls the consul agent endpoint to register this service with Consul.

EventBusRing registerLocalBusInConsul

    private void registerLocalBusInConsul() {
        consul.get().agent().registerService(replicationPortLocal, 
                replicationServerCheckInIntervalInSeconds, eventBusName, localEventBusId, tag);
    }
The default eventBusName is “eventBus”. The default id (localEventBusId) is “eventBus-“+UUID. The tag is settable via the builder as is the event bus name. ThereplicationServerCheckInIntervalInSeconds is settable via the builder (EventBusRingBuilder) and specifies how many seconds before Consul thinks this eventBus service is dead if it does not check in.
The healthyNodeMonitor is used to periodically get a list of healthy event bus nodes from Consul.

EventBusRing healthyNodeMonitor

    private void healthyNodeMonitor() {

        try {
            rebuildHub(getHealthyServices());
        } catch (Exception ex) {
            logger.error("unable to contact consul or problems rebuilding event hub", ex);
            Consul oldConsul = consul.get();
            consul.compareAndSet(oldConsul, startNewConsul(oldConsul));
        }

    }
The healthyNodeMonitor calls getHealthyServices and passes the results ofgetHealthyServices to rebuildHub.
The method getHealthyServices uses the Consul REST health endpoint to get a list of healthy service nodes from Consul.

EventBusRing getHealthyServices

    private List<ServiceHealth> getHealthyServices() {
        final ConsulResponse<List<ServiceHealth>> consulResponse = consul.get().health()
                .getHealthyServices(eventBusName, datacenter, tag, requestOptions);
        this.lastIndex.set(consulResponse.getIndex());

        final List<ServiceHealth> healthyServices = consulResponse.getResponse();

        if (debug) {
            showHealthyServices(healthyServices);
        }
        buildRequestOptions();
        return healthyServices;
    }
Also periodically we have to check in our current node with Consul so it is not marked unhealthy. This is done as you recall by doing a periodic operation backed by thecheckInWithConsul method.

EventBusRing

    private void checkInWithConsul() {
        try {
            consul.get().agent().pass(localEventBusId, "still running");
        } catch (NotRegisteredException ex) {
            registerLocalBusInConsul();
        } catch (Exception ex) {
            Consul oldConsul = consul.get();
            consul.compareAndSet(oldConsul, startNewConsul(oldConsul));
        }
    }
The checkInWithConsul method calls the Consul REST endpoint agent’s pass operation to notify to Consul that this service node is still passing. If Consul does not know this service exists, then we reregister it. If we get any other exception then we drop our Consul instance and create a new one.
To test this, I run a bunch of these:

EventBusRing

        EventBusRing eventBusRing;

        int port = 0;

                   //grabs first local port in this range that is free
        port = useOneOfThePortsInThisRange(9000, 9900);
        EventBusRingBuilder eventBusRingBuilder = eventBusRingBuilder();
        eventBusRingBuilder.setConsulHost("localhost");
        eventBusRingBuilder.setReplicationPortLocal(port);

        eventBusRing = eventBusRingBuilder.build();
        eventBusRing.start();



    public void test() {

        eventBusRing.eventManager().register("mom", new EventConsumer<Object>() {
            @Override
            public void listen(Event<Object> event) {
                puts(event.channel(), event.body());
            }
        });

        for (int index = 0; index < 100; index++) {
            Sys.sleep(1000);
            eventBusRing.eventManager().send("mom", "hi mom " + port);


        }

    }


    public static void main(String... args) {

        EventBusRingBuilderTest test = new EventBusRingBuilderTest();
        test.setup();
        test.test();

    }
Then I start dropping them. I can also open up the Consul GUI and take a peek of what is healthy or not.
This is just the start, we plan on perhaps integrating with other similar frameworks, and perhaps even abstracting some of the clustering details of Consul in a common clustering interface. The project etcd is on our list and was in fact our first attempt but Consul seemed to be more like what we were trying to build with etcd so it seemed like a natural first pick. Expect more integration with QBit and Consul.

What is QBit again?

QBit is a queuing library for microservices. It is similar to many other projects like Akka, Spring Reactor, etc. QBit is just a library not a platform. QBit has libraries to put a service behind a queue. You can use QBit queues directly or you can create a service. QBit services can be exposed by WebSocket, HTTP, HTTP pipeline, and other types of remoting. A service in QBit is a Java class whose methods are executed behind service queues. QBit implements apartment model threading and is similar to the Actor model or a better description would be Active Objects. QBit does not use a disruptor. It uses regular Java Queues. QBit can do north of 100 million ping pong calls per second which is an amazing speed (seen as high as 200M). QBit also supports calling services via REST, and WebSocket. QBit is microservices in the pure Web sense: JSON, HTTP, WebSocket, etc. QBit uses micro batching to push messages through the pipe (queue, IO, etc.) faster to reduce thread hand-off.

QBit lingo

QBit is a Java microservice lib supporting REST, JSON and WebSocket. It is written in Java but we could one day write a version in Rust or Go or C# (but that would require a large payday).
Service POJO (plain old Java object) behind a queue that can receive method calls via proxy calls or events (May have one thread managing events, method calls, and responses or two one for method calls and events and the other for responses so response handlers do not block service. One is faster unless responses block). Services can use Spring MVC style REST annotations to expose themselves to the outside world via REST and WebSocket.
ServiceBundle Many POJOs behind one response queue and many receive queues. There may be one thread for all responses or not. They also can be one receive queue.
Queue A thread managing a queue. It supports batching. It has events for empty, reachedLimit, startedBatch, idle. You can listen to these events from services that sit behind a queue. You don't have to use Services. You can use Queue's direct. In QBit, you have sender queues and receivers queues. They are separated to support micro-batching.
ServiceServer ServiceBundle that is exposed to REST and WebSocket communication.
EventBus EventBus is a way to send a lot of messages to services that may be loosely coupled.
ClientProxy ClientProxy is a way to invoke service through async interface, service can be inproc (same process) or remoted over WebSocket.
Non-blocking QBit is a non-blocking lib. You use CallBacks via Java 8 Lambdas. You can also send event messages and get replies. Messaging is built into the system so you can easily coordinate complex tasks. QBit takes an object-oriented approach to service development so services look like normal Java services that you already write, but the services live behind a queue/thread. This is not a new concept. Microsoft did this with DCOM/COM and called it active objects. Akka does it with actors and called them strongly typed Actors. The important concepts is that you get the speed of reactive and actor style messaging but you develop in a natural OOP approach. QBit is not the first. QBit is not the only.
Speed QBit is VERY fast. There is a of course a lot of room for improvement. But already 200M+ TPS inproc ping pong, 10M-20M+ TPS event bus, 500K TPS RPC calls over WebSocket/JSON, etc. More work needs to be done to improve speed, but now it is fast enough where we are focusing more on usability. The JSON support uses Boon by default which is up to 4x faster than other JSON parsers for the REST/JSON, WebSocket/JSON use case.

To learn more about QBit see the following:

  • Home
  • [Detailed Tutorial] QBit microservice example
  • [Detailed Tutorial] Building a single page; Todo List Application with QBit
  • [Detailed Tutorial] Working with inproc MicroServices within QBit.
  • [Doc] QBit Java microservice lib auto flushing service queue proxies
  • [Doc] QBit Java microservice lib introducing EventBus replication and EventBus connectors
  • [Doc] QBit microservice Java lib Using auto flushing client proxies
  • [Doc] QBit Microservice WebSocket wire protocol draft 01 JSON ASCII method batching RPC
  • [Doc] Queue Callbacks for QBit queue based services
  • [Doc] Using QBit microservice lib's HttpClient GET, POST, et al, JSON, Java 8 Lambda
  • [Doc] Using QBit microservice lib's WebSocket support
  • [Quick Start] Building a simple Rest web microservice server with QBit
  • [Quick Start] Building a single page; Todo List Application with QBit
  • [Quick Start] building a single web page application with QBit
  • [Quick Start] Building a TODO web microservice client with QBit
  • [Quick Start] Building a TODO web microservice server with QBit
  • [Quick Start] Building boon for the QBit microservice engine
  • [Quick Start] Building QBit the microservice lib for Java
  • [Quick Start] Working with inproc MicroServices within QBit
  • [Rough Cut] Delivering up Single Page Applications from QBit Java JSON Microservice lib
  • [Rough Cut] QBit Java Microservice Lib Working With Workers Sharded and Pooled
  • [Rough Cut] QBit Microservice Lib Working With CallBacks
  • [Rough Cut] QBit Microservices using Service Workers and sharded service workers
  • [Rough Cut] Using QBit microservice lib with Spring Boot
  • [Rough Cut] Using QBit microservice lib's REST support with URI Params
  • [Rough Cut] Working with event bus for QBit the microservice engine
  • [Rough Cut] Working with inproc MicroServices
  • [Rough Cut] Working with private event bus for inproc microservices
  • [Rough Cut] Working with strongly typed event bus proxies for QBit Java Microservice lib
  • [Rough Cut] Working with System Manager for QBit Mircoservice lib
  • [Z Blog] Qbit servlet support also a preview of using jetty and the QBit microservice lib together
  • [Z Blog] Using Consul from QBit, the Java, JSON, WebSocket and REST microservice library, for health monitoring, clustering and service discovery
  • [Z Notebook] More benchmarking internal
  • [Z Notebook] Performance testing for REST
  • [Z Notebook] Roadmap
  • Introduction to QBit
  • Local Service Proxies
  • OLD Home
  • QBit Boon New Wave of JSON HTTP and Websocket
  • QBit Docs
  • Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training