For some background on why this is important for microservices see Reactive Microservices Monitoring.
QBit supports Metrics, KPI gathering
QBit support collecting metrics for microservices. The QBit runtime statistics system can be queried, it can be clustered, and it can replicate to any statistic system. The core interfaces for the QBit runtime stats system is
io.advantageous.qbit.service.stats
. The main interface for collecting stats is StatsCollector.StatsCollector
package io.advantageous.qbit.service.stats;
import io.advantageous.qbit.client.ClientProxy;
/**
* Collects stats
* This collects key performance indicators: timings, counts and levels/gauges.
* Created by rick on 6/6/15.
*/
public interface StatsCollector extends ClientProxy {
/** Increment a counter by 1.
* This is a short cut for recordCount(name, 1);
* @param name name name of metric, KPI, metric.
*/
default void increment(String name) {
}
/**
* Record a a count.
* Used to record things like how many users used the site.
*
* @param name name of the metric, KPI, stat
* @param count count to record.
*/
default void recordCount(String name, long count) {
}
/**
* This is used to record things like the count of current threads or
* free system memory or free disk, etc.
* Record Level. Some systems call this a gauge.
* @param name name of the gauge or level
* @param level level
*/
default void recordLevel(String name, long level) {
}
/**
* This is used to record timings.
* This would be things like how long did it take this service to call
* this remote service.
* @param name name of the timing
* @param duration duration
*/
default void recordTiming(String name, long duration) {
}
}
ServiceStatsListener
You will probably never use a
StatsCollector
but a StatsCollectorBuffer
instead as it buffers metric calls to reduce IO and reporting to the stats engine. Another important concept in this package is the ServiceStatsListener
. The ServiceStatsListener
gets registered on your behalf if you use the ManagedServiceBuilder
.
The
ServiceStatsListener
is used to intercept queue calls for the ServiceQueue
. All services and end-points end up using the ServiceQueue
. This class is able to track stats for services.Default Service Stat Keys
startBatchCountKey = serviceName + ".startBatchCount";
receiveCountKey = serviceName + ".receiveCount";
receiveTimeKey = serviceName + ".callTimeSample";
this.queueRequestSizeKey = serviceName + ".queueRequestSize";
this.queueResponseSizeKey = serviceName + ".queueResponseSize";
The
This can tell you how well your batching is setup.
${serviceName}.startBatchCount
tracks how many times a batch has been sent.This can tell you how well your batching is setup.
The
${serviceName}.receiveCount
is how many times the service has been called.
The
${serviceName}.callTimeSample
is how long do methods take for this service (if enabled, call times are sampled).
The
${serviceName}.queueRequestSize
keeps track of how large the request queue is. This is an indication of calls not getting handled if greater than 0. If this continues to rise then the service could be down. (Note there is a health check to see a queue is blocked, and the service will be marked unhealthy.)
The
${serviceName}.queueResponseSize
keeps track of how large the response queue is getting. This is an indication that responses are not getting drained.
All of the classes that we covered so far are in QBit core. This means that stats, KPI gathering is just part of the QBit system. It is an integral part of microservices so it is an integral part of QBit.
StatService and StatsD
The
StatService
is in QBit admin package. The StatService
interface allows you to both record stats, KPI, and metrics for microservices and to query the services. TheStatService
can replicate KPIs (key performance indicators) to replicators. It does this efficiently.
Let's look at the
StatService
interface and its comments.package io.advantageous.qbit.metrics;
import io.advantageous.qbit.reactive.Callback;
import io.advantageous.qbit.service.stats.Stats;
import io.advantageous.qbit.service.stats.StatsCollector;
/**
* The StatService collects stats, and allows stats to be queried.
* This collects key performance indicators: timings, counts and levels/gauges.
* It also allow internal or external clients to query this system.
*
* Created by rick on 6/6/15.
*/
public interface StatService extends StatsCollector {
/**
* Get the last n Seconds of stats (up to two minutes of stats typically
* kept in memory).
*
* The `Stat` object has the mean, median, etc.
*
* ```java
*
* private final float mean;
* private final float stdDev;
* private final float variance;
* private final long sum;
* private final long max;
* private final long min;
* private final long median;
* ```
* @param callback callback to get Stat
* @param name name metric, KPI, etc.
* @param secondCount secondCount
*/
default void statsForLastSeconds(Callback<Stats> callback, String name,
int secondCount) {
}
/**
* Gets the average last n Seconds of of a level.
*
* @param callback callback
* @param name name of metric, KPI, etc.
* @param secondCount secondCount
*/
default void averageLastLevel(Callback<Long> callback, String name,
int secondCount) {
}
/**
* Gets count of the current minute
*
* @param callback callback
* @param name name of metric
*/
default void currentMinuteCount(Callback<Long> callback, String name) {
}
/**
* Gets count of the current second.
*
* @param callback callback
* @param name name of metric
*/
default void currentSecondCount(Callback<Long> callback, String name) {
}
/**
* Gets count of the last recorded full second.
*
* @param callback callback
* @param name name of metric
*/
default void lastSecondCount(Callback<Long> callback, String name) {
}
/**
* Gets count of the last recorded ten full seconds.
*
* @param callback callback
* @param name name of metric
*/
default void lastTenSecondCount(Callback<Long> callback, String name) {
}
/**
* Gets count of the last recorded five full seconds.
*
* @param callback callback
* @param name name of metric
*/
default void lastFiveSecondCount(Callback<Long> callback, String name) {
}
/**
* Gets count of the last recorded N full seconds.
*
* @param callback callback
* @param name name of metric
*/
default void lastNSecondsCount(Callback<Long> callback, String name,
int secondCount) {
}
/**
* Gets count of the last recorded N full seconds.
* This is more exact if the count overlaps two minutes.
*
* @param callback callback
* @param name name of metric
*/
default void lastNSecondsCountExact(Callback<Long> callback, String name,
int secondCount) {
}
/**
* Gets count of the last recorded N full seconds.
* This is more exact if the count overlaps two minutes.
*
* @param callback callback
* @param name name of metric
*/
default void lastTenSecondCountExact(Callback<Long> callback, String name) {
}
/**
* Gets count of the last recorded N full seconds.
* This is more exact if the count overlaps two minutes.
*
* @param callback callback
* @param name name of metric
*/
default void lastFiveSecondCountExact(Callback<Long> callback, String name) {
}
/**
* Bulk record.
* @param name name of metric
* @param count count
* @param timestamp timestamp
*/
default void recordWithTime(String name, int count, long timestamp) {
}
/**
* Bulk record.
* @param names names of metric
* @param counts counts of metrics
* @param timestamp timestamp
*/
default void recordAll(long timestamp, String[] names, long[] counts) {
}
/**
* Bulk record.
* @param names names of metric
* @param counts counts of metrics
* @param times times
*/
default void recordAllWithTimes(String[] names,
long[] counts, long[] times){
}
}
You can query the metrics system and provide reactive support. For example, you could query the current REQUESTS PER SECOND to a service and dynamically change the size of buffering to increase throughput.
QBit does not only monitor metrics, but it makes the metrics queryable so your microservices can be reactive.
The
StatService
system that comes with QBit can replicate changes to other systems via the StatReplicator
./**
* Stat Replicator.
* This is used to replicate stats to another system.
* created by rhightower on 1/28/15.
*/
public interface StatReplicator extends RemoteTCPClientProxy, ServiceFlushable, Stoppable {
void replicateCount(String name, long count, long time);
void replicateLevel(String name, long level, long time);
void replicateTiming(String name, long timing, long time);
}
The QBit Admin package has two built-in collectors. The
StatsDReplicator
(notice the statsD) implements StatReplicator
and replicates via UDP to a StatsD server (e.g.,Graphite, Statsite, and more). The StatsD is a wire protocol over UDP to send stats. TheStatsDReplicator
implements this wire protocol to talk UDP to a given host and port over UDP.
The QBit Admin package has two built-in collectors. The
StatsDReplicator
(notice the statsD) implements StatReplicator
and replicates via UDP to a StatsD server (e.g.,Graphite, Statsite, and more). The StatsD is a wire protocol over UDP to send stats. TheStatsDReplicator
implements this wire protocol to talk UDP to a given host and port over UDP. The other built-collector the LocalStatsCollector
which just sends stats over a REST endpoint (/__stats/instance
) that will deliver up a JSON version of the stats (and it resets the stats after the REST request) or it keeps collecting them until some other system queries the /__stats/instance
REST endpoint. Both theStatsDReplicator
and the LocalStatsCollector
have builders, but you typically build them for free by using the ManagedServiceBuilder
. We use LocalStatsCollector
for Heroku-like environments.
You can configure StatsD via the
ManagedServiceBuilder
.Configure StatsD with ManagedServiceBuilder
if (config.isStatsD()) {
managedServiceBuilder.setEnableStatsD(true);
managedServiceBuilder.getStatsDReplicatorBuilder()
.setHost(config.getStatsDHost());
if (config.getStatsDPort() != -1) {
managedServiceBuilder.getStatsDReplicatorBuilder()
.setPort(config.getStatsDPort());
}
}
If you are using the JSON config file, you setup StatsD as follows:
{
"statsD" : true,
"statsDHost" : "lab99.myhost.com",
}
You can send your own stats and not just the ones that are sent via the default stats gathering.
Assuming you have a service called TodoService
Using stats from your own service
/** Create a stats collector. */
final StatsCollector statsCollector = managedServiceBuilder
.getStatServiceBuilder().buildStatsCollector();
final TodoService tododService = new TodoService(statsCollector,
ReactorBuilder.reactorBuilder().build(),
taskRepo,
Timer.timer());
/** Add the todo service to the managedServiceBuilder. */
managedServiceBuilder.addEndpointService(tododService);
Passing stats collector, reactor and timer.
public TodoService(final StatsCollector statsCollector,
final Reactor reactor,
final TaskRepo taskRepo,
final Timer timer) {
this.statsCollector = statsCollector;
this.timer = timer;
this.taskRepo = taskRepo;
this.reactor = reactor;
this.reactor.addServiceToFlush(statsCollector);
Calling
reactor.addServiceToFlush
and passing the statsCollector
will ensure that when service queue that is managing the TodoService is idle or full that all of the stats will be flushed if there are any to save. The statsCollector
is the one does buffering as mentioned earlier.
The
reactor
does not auto flush unless it is told to do. For now, you always use the reactor with the falling queue callback (no magic).Calling reactor so that it can run jobs, coordinate calls and flush proxies
/** Process Reactor stuff. */
@QueueCallback({QueueCallbackType.LIMIT, QueueCallbackType.EMPTY})
public void process(){
reactor.process();
time = timer.time();
}
The
reactor.process
will flush all calls to statsCollector
which will then send the stats to actual StatService
where they will be replicated to all outstanding replicators.Using recordCount
/**
* Load TODOs from TodoRepo.
*/
@RequestMapping(value = "/todo", summary = "Load TODOs",
...)
public void loadTodo(final Callback<Boolean> callback) {
final Set<TodoCategory> categories = new HashSet<>(this.categories);
/* If there are no categories or if service is paused, then return right away. */
if (categories.size() > 0 && !stop) {
loadFromTodoRepoCache++;
statsCollector.recordCount("Todo.repo.call.count", 1);
} else {
logger.warn("Service can't load categories count {} or stopped {}",
components.size(), stop);
return;
}
...
Notice the use of
statsCollector.recordCount("Todo.repo.call.count", 1)
since this is just incrementing one time we can callstatsCollector.increment("Todo.repo.call.count", 1)
.Using increment
/**
* Load TODOs from TodoRepo.
*/
@RequestMapping(value = "/todo", summary = "Load TODOs",
...)
public void loadTodo(final Callback<Boolean> callback) {
final Set<TodoCategory> categories = new HashSet<>(this.categories);
/* If there are no categories or if service is paused, then return right away. */
if (categories.size() > 0 && !stop) {
loadFromTodoRepoCache++;
statsCollector.increment("Todo.repo.call.count");
} else {
logger.warn("Service can't load categories count {} or stopped {}",
components.size(), stop);
return;
}
...
Now lets show a timing.
statsCollector.recordTiming Timing how long a bunch of async calls took
/**
* Load TODOs from TodoRepo.
*/
@RequestMapping(value = "/todo", summary = "Load TODOs",
...)
public void loadTodo(final Callback<Boolean> callback) {
final Set<TodoCategory> categories = new HashSet<>(this.categories);
/* If there are no categories or if service is paused, then return right away.*/
...
final long startTime = timer.time();
/* For each TodoCategory call TodoRepo to load the todo items. */
categories.forEach(category -> {
final Callback<List<Todo>> todoCacheCallback =
createLoadFromCacheCallback(count, errorCount, category);
taskRepo.loadTodosFromCache(todoCacheCallback, category);
});
/* Coordinate all of the callbacks are done. */
reactor.coordinatorBuilder()
/* If the success count is equal to the
component size, we are done. */
.setCoordinator(() -> {
if (logger.isDebugEnabled()) {
logger.debug("COUNT " + count.get());
}
return count.get() == components.size();
}
)
/* Set the timeout to be seconds times two since
we are calling two services. */
.setTimeoutDuration(config.getTimeoutMakingRemoteCallInSeconds() * 2)
.setTimeoutTimeUnit(TimeUnit.SECONDS)
/* If there were no errors, then return success. */
.setFinishedHandler(() -> {
statsCollector
.recordTiming("Todo.loadCache.time",
timer.time() - startTime);
})
/* Set the timeout handler to return no
success and log that there was a timeout. */
.setTimeOutHandler(() -> {
logger.error("Timeout while loading todo items" );
callback.returnThis(false);
}).build();
...
This records a start time
startTime = timer.time()
then it makes a bunch of async calls. And when all of the async calls return, we then send a timing
to record how long the process took using statsCollector.recordTiming
. To really understand the complex call coordination with the QBit reactor
, you first need to understand how QBit coordinates calls, etc. You can learn more about this at QBit Reactive Microservices Tutorial for handling async calls with the reactor.
Here is a simpler timing example timing a call to Cassandra.
Timing how long a single call to Cassandra took
public void executeAsyncCassandraCall(final Callback<ResultSet> callback,
final Statement stmt) {
final ResultSetFuture future = this.session.executeAsync(stmt);
final long startTime = timer.time();
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
statsCollector
.recordTiming("Cassandra.load.time",
timer.time() - startTime);
callback.accept(result);
}
@Override
public void onFailure(Throwable t) {
statsCollector
.recordTiming("Cassandra.load.error.time",
timer.time() - startTime);
callback.onError(t);
}
});
}
Notice that we use
final long startTime = timer.time()
and we record two timings either how long the successful call took or how long the error took.
To store a level just use style.
Store a level
statsCollector.recordLevel("Todo.categories.size",
categories.size());
Remember is a level is a gauge like how large is my cache, how many outstanding items are in my queue, etc.