Rick

Rick
Rick

Tuesday, June 21, 2016

KPI Microservices Monitoring with QBit

KPI Microservices Monitoring

Mammatus Tech
We recently revised this. Here is the old version. You can see how much QBit and Reakt have progressed.
There has been a lot written on the subject of Microservices Monitoring. Monitoring is a bit of an overloaded term. There is service health monitoring, which can be done with tools like Mesosphere/Marathon, Nomad, Consul, etc. There is also KPI monitoring, which is done with tools like Grafana, Graphite, InfluxDB, StatsD, etc. Then there is log monitoring and search with tools like the ELK stack (elastic-search, LogStash and Kibana) and Splunk, where you can easily trace logs down to the requests or client ID in a request header. And, then there is system monitoring (JVM, slow query logs, network traffic, etc.), with tools like SystemD, and more. You will want all of this when you are doing Microservices Development.
The more insight you have into your system, the easier it will be to support and debug. Microservices imply async distributed development. Doing async distributed development without monitoring is like running with scissors.
To summarize Microservices Monitoring is:
  • KPI Monitoring (e.g., StatsD, Grafana, Graphite, InfluxDB, etc.)
  • Health Monitoring (e.g., Consul, Nomad, Mesosphere/Marathon, Heroku, etc.)
  • Log monitoring (e.g., ELK stack, Splunk, etc.)
QBit has support for ELK/Splunk by providing support for MDC. QBit has support for systems that can monitor health like Mesosphere/Marathon, Heroku, Consul, Nomad, etc. by having an internal health system that QBit service actors all check-in with that then gets rolled up to other systems like Mesosphere/Marathon, Heroku, Consul, Nomad, etc.
In this tutorial we are going to just cover KPI monitoring for microservices which is sometimes called Metrics Monitoring or Stats Monitoring. KPI stands for Key Performance Indicators. These are the things you really care about to see if your system is up and running, and how hard it is getting hit, and how it is performing.
At the heart of the QBit KPI system is the Metrics collector. QBit uses the Metrik interface for tracking Microservice KPIs.

Metrik Interface for tracking KPIs

public interface MetricsCollector {

    default void increment(final String name) {
        recordCount(name, 1);
    }

    default void recordCount(String name, long count) {
    }

    default void recordLevel(String name, long level) {
    }

    default void recordTiming(String name, long duration) {
    }

}
We are recording counts per time period, current level or gauge at this instance in time and timings which is how long did something take.

Demonstrating using QBit metrics

This guide assumes you have read through the main overview of QBit and have gone through the first tutorials, but you should be able to follow along if you have not, you just will be able to follow along better if you read the docs (at least skimmed) and went through the first set of tutorials.
Let's show it. First we need to build. Use Gradle as follows:

gradle.build

group 'qbit-ex'
version '1.0-SNAPSHOT'

apply plugin: 'java'


apply plugin: 'application'


mainClassName = "com.mammatustech.todo.TodoServiceMain"


compileJava {
    sourceCompatibility = 1.8
}

repositories {
    mavenCentral()
    mavenLocal()
}

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.11'
    compile 'io.advantageous.reakt:reakt:2.8.17'
    compile 'io.advantageous.qbit:qbit-admin:1.10.0.RELEASE'
    compile 'io.advantageous.qbit:qbit-vertx:1.10.0.RELEASE'
}


dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.11'
}
Read the comments in all of the code listings.
In this example we will use StatsD but QBit is not limited to StatsD for Microservice KPI monitoring. In fact QBit can do amazing things with its StatsService like clustered rate limiting based on OAuth header info, but that is beyond this tutorial.
StatsD is a protocol that you can send KPI messages to over UDP. Digital Ocean has a really nicedescription of StatsD.
The easiest way to setup StatsD is to use Docker. Docker is a great tool for development, and using Docker with Nomad, CoreOS, Mesosphere/Marathon, etc. is a great way to deploy Docker containers, but at a minimum you should be using the Docker tools for development.
Set up the StatsD server stack by using this public docker container.
QBit ships with StatsD support in the qbit-admin lib (jar). It has done this for a long time.
We will connect to StatsD with this URI.

URI to connect to with StatsD

final URI statsdURI = URI.create("udp://192.168.99.100:8125");
Depending on how you have Docker setup, your URI might look a bit different. If you are running Docker tools with a Mac, then that should be your URI. (On Linux the above IO is likely to belocalhost not 192.168.99.100, go through the docker tool tutorials if you are lost at this point. It will be worth your time. I promise. I promise.invoke promise.)
If you have not already followed the instructions at statsD, grafana, influxdb docker container docs, do so now.

Running Docker

docker run -d \
  --name docker-statsd-influxdb-grafana \
  -p 3003:9000 \
  -p 3004:8083 \
  -p 8086:8086 \
  -p 22022:22 \
  -p 8125:8125/udp \
  samuelebistoletti/docker-statsd-influxdb-grafana
The above yields

Servers

Host        Port        Service

3003        9000            grafana        to see the results
8086        8086            influxdb       to store the results 
3004        8083            influxdb-admin to query the results
8125        8125            statsd         server that listens to statsD UPD messages
22022       22              sshd
If you want to see the metrics and see if this is working, go through the influxDB tutorial and look around at the measurements with the influx-admin. Influx is a time series database. Grafana allows you to see pretty graphs and charts of the microservice KPIs that we are collecting. You will want to learn grafana as well.
We use the host and port of the URI to connect to the StatsD daemon that is running on the docker container.

Setting up StatsD by using QBit managedServiceBuilder

       ...
        managedServiceBuilder.enableStatsD(URI.create("udp://192.168.99.100:8125"));
        managedServiceBuilder.getContextMetaBuilder().setTitle("TodoMicroService");
We covered using and setting up the managedServiceBuilder in the first tutorials, and the complete code listing is below. You could use managedServiceBuilder to create astatsCollector as follows:

You could do this... managedServiceBuilder to create the StatsCollector/MetricsCollector

        StatsCollector statsCollector = managedServiceBuilder.createStatsCollector();

        /* Start the service. */
        managedServiceBuilder.addEndpointService(new TodoService(reactor, statsCollector))
Since services typically deal with the health system, the reactor (callback management, tasks management, repeating tasks) and the stats collector we created a ServiceManagementBundle that is a facade over the health system, stats, and the reactor.

Better way to work with stats, health and the reactor

        /** Create the management bundle for this service. */
        final ServiceManagementBundle serviceManagementBundle =
                serviceManagementBundleBuilder().setServiceName("TodoServiceImpl")
                        .setManagedServiceBuilder(managedServiceBuilder).build();
The QBit StatsCollector interface extends the Metrik MetricsCollector interface (from QBit 1.5 onwards). ServiceManagementBundle has a stats method that returns a StatsCollector as well as common facade methods on the ServiceManagementBundle

Using the StatsCollector.

Then we just need to use it.

Using the StatsCollector to collect KPIs about our service

For kicks, we track the KPI todoservice.i.am.alive every three seconds.

Tracking KPI i.am.am.alive

@RequestMapping("/todo-service")
public class TodoServiceImpl implements TodoService {


    private final Map<String, Todo> todoMap = new TreeMap<>();

    private final ServiceManagementBundle mgmt;

    public TodoServiceImpl(ServiceManagementBundle mgmt) {
        this.mgmt = mgmt;
        /** Send stat count i.am.alive every three seconds.  */
        mgmt.reactor().addRepeatingTask(Duration.ofSeconds(3),
                () -> mgmt.increment("i.am.alive"));

    }

Tracking calls to add method

    @Override
    @POST(value = "/todo")
    public Promise<Boolean> addTodo(final Todo todo) {
        return invokablePromise(promise -> {
            /** Send KPI addTodo called every time the addTodo method gets called. */
            mgmt.increment("addTodo.called");
            todoMap.put(todo.getId(), todo);
            promise.accept(true);
        });
    }

Tracking calls to remove method

    @Override
    @DELETE(value = "/todo")
    public final Promise<Boolean> removeTodo(final @RequestParam("id") String id) {
        return invokablePromise(promise -> {
            /** Send KPI addTodo.removed every time the removeTodo method gets called. */
            mgmt.increment("removeTodo.called");
            todoMap.remove(id);
            promise.accept(true);
        });
    }

You can register repeating tasks with @QueueCallback as follows:

Managing callbacks and repeating tasks

    @QueueCallback({EMPTY, IDLE, LIMIT})
    public void process() {
        reactor.process();
    }
But you do not need to if you use the serviceManagementBundle. Just specify it when you add the service to the managedServiceBuilder.

Adding service to managedServiceBuilder with a serviceManagementBundle

        /* Start the service. */
        managedServiceBuilder
                //Register TodoServiceImpl
                .addEndpointServiceWithServiceManagmentBundle(todoService, serviceManagementBundle)
                //Build and start the server.
                .startApplication();

Complete example

Todo.java

package com.mammatustech.todo;

public class Todo {

    private  String id;

    private final String name;
    private final String description;
    private final long createTime;

    public Todo(String name, String description, long createTime) {
        this.name = name;
        this.description = description;
        this.createTime = createTime;

        this.id = name + "::" + createTime;
    }


    public String getId() {
        if (id == null) {
            this.id = name + "::" + createTime;
        }
        return id;
    }

    public String getName() {
        return name;
    }

    public String getDescription() {
        return description;
    }

    public long getCreateTime() {
        return createTime;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        Todo todo = (Todo) o;

        if (createTime != todo.createTime) return false;
        return !(name != null ? !name.equals(todo.name) : todo.name != null);

    }

    @Override
    public int hashCode() {
        int result = name != null ? name.hashCode() : 0;
        result = 31 * result + (int) (createTime ^ (createTime >>> 32));
        return result;
    }
}

TodoServiceImpl.java to show tracking KPIs.

package com.mammatustech.todo;

import io.advantageous.qbit.admin.ServiceManagementBundle;
import io.advantageous.qbit.annotation.RequestMapping;
import io.advantageous.qbit.annotation.RequestMethod;
import io.advantageous.qbit.annotation.RequestParam;
import io.advantageous.qbit.annotation.http.DELETE;
import io.advantageous.qbit.annotation.http.GET;
import io.advantageous.qbit.annotation.http.POST;
import io.advantageous.reakt.promise.Promise;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
import java.util.TreeMap;

import static io.advantageous.reakt.promise.Promises.invokablePromise;


/**
 * Default port for admin is 7777.
 * Default port for main endpoint is 8888.
 * <p>
 * <pre>
 * <code>
 *
 *     Access the service:
 *
 *    $ curl http://localhost:8888/v1/...
 *
 *
 *     To see swagger file for this service:
 *
 *    $ curl http://localhost:7777/__admin/meta/
 *
 *     To see health for this service:
 *
 *    $ curl http://localhost:8888/__health -v
 *     Returns "ok" if all registered health systems are healthy.
 *
 *     OR if same port endpoint health is disabled then:
 *
 *    $ curl http://localhost:7777/__admin/ok -v
 *     Returns "true" if all registered health systems are healthy.
 *
 *
 *     A node is a service, service bundle, queue, or server endpoint that is being monitored.
 *
 *     List all service nodes or endpoints
 *
 *    $ curl http://localhost:7777/__admin/all-nodes/
 *
 *
 *      List healthy nodes by name:
 *
 *    $ curl http://localhost:7777/__admin/healthy-nodes/
 *
 *      List complete node information:
 *
 *    $ curl http://localhost:7777/__admin/load-nodes/
 *
 *
 *      Show service stats and metrics
 *
 *    $ curl http://localhost:8888/__stats/instance
 * </code>
 * </pre>
 */
@RequestMapping("/todo-service")
public class TodoServiceImpl implements TodoService {


    private final Map<String, Todo> todoMap = new TreeMap<>();

    private final ServiceManagementBundle mgmt;

    public TodoServiceImpl(ServiceManagementBundle mgmt) {
        this.mgmt = mgmt;
        /** Send stat count i.am.alive every three seconds.  */
        mgmt.reactor().addRepeatingTask(Duration.ofSeconds(3),
                () -> mgmt.increment("i.am.alive"));

    }


    @Override
    @POST(value = "/todo")
    public Promise<Boolean> addTodo(final Todo todo) {
        return invokablePromise(promise -> {
            /** Send KPI addTodo called every time the addTodo method gets called. */
            mgmt.increment("addTodo.called");
            todoMap.put(todo.getId(), todo);
            promise.accept(true);
        });
    }


    @Override
    @DELETE(value = "/todo")
    public final Promise<Boolean> removeTodo(final @RequestParam("id") String id) {
        return invokablePromise(promise -> {
            /** Send KPI addTodo.removed every time the removeTodo method gets called. */
            mgmt.increment("removeTodo.called");
            todoMap.remove(id);
            promise.accept(true);
        });
    }


    @Override
    @GET(value = "/todo", method = RequestMethod.GET)
    public final Promise<ArrayList<Todo>> listTodos() {
        return invokablePromise(promise -> {
            /** Send KPI addTodo.listTodos every time the listTodos method gets called. */
            mgmt.increment("listTodos.called");
            promise.accept(new ArrayList<>(todoMap.values()));
        });
    }


}

TodoServiceMain.java showing how to configure StatsD QBit for MicroService KPI tracking

package com.mammatustech.todo;


import io.advantageous.qbit.admin.ManagedServiceBuilder;
import io.advantageous.qbit.admin.ServiceManagementBundle;

import java.net.URI;

import static io.advantageous.qbit.admin.ManagedServiceBuilder.managedServiceBuilder;
import static io.advantageous.qbit.admin.ServiceManagementBundleBuilder.serviceManagementBundleBuilder;

public class TodoServiceMain {


    public static void main(final String... args) throws Exception {



        /* Create the ManagedServiceBuilder which manages a clean shutdown, health, stats, etc. */
        final ManagedServiceBuilder managedServiceBuilder = managedServiceBuilder()
                .setRootURI("/v1") //Defaults to services
                .setPort(8888); //Defaults to 8080 or environment variable PORT

        managedServiceBuilder.enableStatsD(URI.create("udp://192.168.99.100:8125"));
        managedServiceBuilder.getContextMetaBuilder().setTitle("TodoMicroService");

        /** Create the management bundle for this service. */
        final ServiceManagementBundle serviceManagementBundle =
                serviceManagementBundleBuilder().setServiceName("TodoServiceImpl")
                        .setManagedServiceBuilder(managedServiceBuilder).build();

        final TodoService todoService = new TodoServiceImpl(serviceManagementBundle);

        /* Start the service. */
        managedServiceBuilder
                //Register TodoServiceImpl
                .addEndpointServiceWithServiceManagmentBundle(todoService, serviceManagementBundle)
                //Build and start the server.
                .startApplication();

        /* Start the admin builder which exposes health end-points and swagger meta data. */
        managedServiceBuilder.getAdminBuilder().build().startServer();

        System.out.println("Todo Server and Admin Server started");

    }
}

TodoService interface

package com.mammatustech.todo;

import io.advantageous.reakt.promise.Promise;

import java.util.ArrayList;

public interface TodoService {
    Promise<Boolean> addTodo(Todo todo);

    Promise<Boolean> removeTodo(String id);

    Promise<ArrayList<Todo>> listTodos();
}

TodoServiceImplTest that shows how to unit test with Reakt

package com.mammatustech.todo;

import io.advantageous.qbit.admin.ManagedServiceBuilder;
import io.advantageous.qbit.admin.ServiceManagementBundle;
import io.advantageous.qbit.queue.QueueCallBackHandler;
import io.advantageous.qbit.service.ServiceBuilder;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

import static io.advantageous.qbit.admin.ManagedServiceBuilder.managedServiceBuilder;
import static io.advantageous.qbit.admin.ServiceManagementBundleBuilder.serviceManagementBundleBuilder;
import static junit.framework.TestCase.assertFalse;
import static org.junit.Assert.assertTrue;

public class TodoServiceImplTest {
    @Test
    public void test() throws Exception {
        final TodoService todoService = createTodoService();

        final Todo rick = new Todo("foo", "rick", 1L);

        //Add Rick
        assertTrue(todoService
                .addTodo(rick)
                .invokeAsBlockingPromise().get());


        //Add Diana
        assertTrue(todoService
                .addTodo(new Todo("bar", "diana", 1L))
                .invokeAsBlockingPromise().get());

        //Remove Rick
        assertTrue(todoService.removeTodo(rick.getId())
                .invokeAsBlockingPromise().get());

        //Make sure Diana is in the listTodos
        assertTrue(todoService.listTodos()
                .invokeAsBlockingPromise()
                .get()
                .stream()
                .filter(
                        todo -> todo.getDescription().equals("diana")

                )
                .findFirst()
                .isPresent()
        );


        //Make sure Rick is not in the listTodos
        assertFalse(todoService.listTodos()
                .invokeAsBlockingPromise()
                .get()
                .stream()
                .filter(
                        todo -> todo.getDescription().equals("rick")

                )
                .findFirst()
                .isPresent()
        );

    }

    private TodoService createTodoService() {
    /* Create the ManagedServiceBuilder which manages a clean shutdown, health, stats, etc. */
        final ManagedServiceBuilder managedServiceBuilder = managedServiceBuilder(); //Defaults to 8080 or environment variable PORT


        /** Create the management bundle for this service. */
        final ServiceManagementBundle serviceManagementBundle =
                serviceManagementBundleBuilder().setServiceName("TodoService")
                        .setManagedServiceBuilder(managedServiceBuilder).build();

        final TodoService todoServiceImpl = new TodoServiceImpl(serviceManagementBundle);


        return ServiceBuilder.serviceBuilder().setServiceObject(todoServiceImpl).addQueueCallbackHandler(
                new QueueCallBackHandler() {
                    @Override
                    public void queueProcess() {
                        serviceManagementBundle.process();
                    }
                })
                .buildAndStartAll()
                .createProxyWithAutoFlush(TodoService.class, 50, TimeUnit.MILLISECONDS);

    }
}
Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training