Rick

Rick
Rick

Saturday, December 12, 2015

Micro-batching and QBit - tuning micro-batches

Intro

I have been working on QBit for two years or so. Not me alone, but others as well. QBit has improved leaps and bounds and we have used it in anger on several projects, and it continues to improve. I like to tell you how it all started, and then share some code talk and perf tests.

Early days

QBit started by working on a project where we had a CPU intensive user preference engine running in memory. The engine could calculate around 30 thousand user requests a second. It looks at user actions which are summarized and stored in memory, and then would generate a list of recommendations with some rules based mix-ins based on users stated preferences versus their actual actions. I did not design the algorithm. I did rewrite most of the original with a focus on basic performance. There was a working prototype but it would not handle the load we were expecting (the way it was written).
The application could get up to 100 million users. And the users would not trickle in as there were peak hours in a week when almost all of the users would be using the system, and then the system would slow down to just a trickle for most of the week. In production, I think the application never had more than 13 million users (not sure about this at most it was 20 million) but we had to assume the worse (or best).
We tried several approaches. We ended up using Vertx and running 9 verticles (this is based on my memory of the project ). We had an IO vertical and then 8 verticles that were running the rules engine. The first issue was we could not get much throughput. Each front end request would look at the user id, hash it, and then pick one of the 8 verticle running the rules preference engine. In a tight loop, the rules preference engine could easily do 30K requests per second (generating a recommendation list), but in this scenario, the whole system with 8 verticles could only do 15K to 30K requests. We used wrk and some custom Lua scripts to pound out the worst case scenario. (A vertical in Vertx is like a module that has its own event loop).
I thought about working on making the preference engine faster. But it seemed to me that the real problem or rather the problem that was burning us the most was the ability to get more requests into preference engine at a time. Also since we were constantly modifying the engine as the requirements changed, I felt optimizing it would be waste when we were having a hard time using what we had because it was hard to bridge the gap of time spent in message passing. Also we tried several designs and the project was on a tight timeframe, we needed some quick wins or we were dead in the water.
I read through the Mechanical Sympathy blog prior to this project and papers as best I could and I already employed ideas and techniques from the ideas in production to great effect. I also had a few mentors in this space. This was not my first time to bat with these ideas. I had some previous success but this project was the primary driver for QBit so I talk about it.
My idea for this project was simple. The workload in my opinion was hard to adapt to a disruptor (at least hard based on my current skill-set at the time, and I am still not sure it made sense for this use-case, but I leave the idea open for future experimentation). My idea was pretty simple, what if I sent a list of requests instead of a single request to the rules engine (which were running in-memory and we were using the Vertx Event bus to issue requests). Under peak load each machine could get thousands of requests per second (not just to generate the list of recommendations but to update the stats that influenced the generation of the recommendations). The application had to be fast, but we needed it to be flexible as well as the requirements were changing all of the time (even days before launch). Later this was also some logic for the rules engine to say hey, I am pretty busy, make the request lists bigger. This seemed to work well. We were allowing the back pressure to control the batch size.
We came up with what I called at the time, poor man's disruptor. It was a confluence of ideas from reading Akka in ActionLMAX papers, working with Vertx and just my own empirical observations. Now after hearing a few talks on Apache Spark, I think the more correct term is micro-batching, but I did not know that at the time. I used micro-batching on another project before this one which was called the disk-batcher which was able to write users events to disk at a rate of 720 MB per second, but this was the first time I used the technique for a CPU intensive application.
The short of it is this, it worked. It was not perfect, but we were able to improve the per server throughput from 30K TPS to 150K TPS to later 200K TPS. Later we would employ the same sort of technique to send event data to be stored into the backend (data backup) and to stream users into memory as the load increased. We got these numbers while employing a full array of servers getting hit by a cloud load testing company.
A similar application was in production that was using 2,000 servers. Another application was in production that was using 150 servers at a competitor. Our final product could have run on six servers or less in production, we ran it on 13 for reliability. It could handle 10x to 100x the load as the other solutions. It was truly a high-speed in-memory microservice. The actual load never reached what it could handle. It seemed like whenever we had a problem, micro-batching was the way to solve it at scale.
By the way, I promised I could run the whole service on three servers in production so I missed, but striving for a higher goal made me focused. If I had twice the time to work on the project, maybe we could have, but there was a time to market aspect as well. It was not just that the code had to run fast. We had to get the project done fast so the real trick was finding the right techniques to meet the goals.
The idea behind micro-batching is simple, at least in my definition of it. Attempt to increase throughput by reducing thread hand-off of messages, by sending many messages at one time. If a certain period of time passes, send what you have, if you are under heavy load, only send what you have if you have reached the batch size or the timeout has been reached. You can also send what you have if the processing side tells you it is not busy or you can create larger batches if the processing side (or your response monitoring) tells you that the processor is bigger. If possible under larger load, detect this state and create larger batches. Micro-batching makes the application slower under light load, but much, much faster under high-load. The trick is how to balance and tweak the batch size and the batch time, timeout (and set up back pressure signals). I don’t think this fully solves this problem, but we created something that works and that we have employed it to do things and handle load that surprises a lot of people.
One of the issues we had was a lot of the micro-batching was hand-coded (by me). I had the same sort of idea repeated in six or seven areas of the application with slight modifications. All hand tuned. Then we started to create a reusable library that I called QBit. It had nothing to do with REST per se. (Although I also wrote a lot of code to handle REST and JSON parsing for the various calls that this service ended up supporting).
I threw must of the original code away, and started QBit from a clean slate. Early QBit code exists in a few projects but QBit is on github.
Using QBit on a recent project after this one, we did some initial load testing and we were handling 10x more load than we actually get in production so we did not have a chance to tune it, which I honestly think we could have increased the performance by another 10x (the app not QBit) but there was no need (which was a bit of a disappointment for me). QBit was good enough out of the box.
The first application was fairly cool (pre-QBit). We used Vertx at scale. We implemented in-memory CPU intensive applications at scale. We employed micro-batching. The ideas from that project are in QBit. QBit could not exist without that experience, and would not be where is was today without working on several other applications with QBit since then.
One issue we had in making the approaches more wide spread was other developers were not familiar with async programming. And there was no easy way to do things like define a REST endpoint (pre-QBit), and coordinate calls to other async services (in-memory and remote). It was still easier to write applications using traditional Spring MVC and if the application did not need the throughput writing it using Vertx and the hodgepodge of libs we wrote (pre QBit) did not make sense. For high-load application, it was worth it. For a smaller load application, it was not.
QBit was the framework I started to make applications like we built easier to produce. I find that small-load services over time, often start getting a lot more load. I could write a whole article on this point. But basically if you write a useful service and others (in the same company) or others start integrating it in new ways with new load demands that you never expected, then it is better to have a lot of head-room. QBit tries to make async, microservice development easier so you can even write your smaller services with it.

Detour Microservices, reactive programming and 12 factor applications

What does QBit have to do with Microservices. When we came up with the design for the preference engine, people said it was a Microservice. This was a while ago.
I ignore all such terms as Microservice until someone tells me what it means. It saves time. Then I go and study it. There are so many ideas out there to study, and if you chased them all down, there would be no time to write code. I learn by doing. Reading and then doing some more. To me the ideas behind Microservices really clicked.
Apparently, I had been on many projects that employed Microservice ideas before I knew what the term meant. I thought I was doing Restful SOA, simple SOA or whatever. Once I found out what Microservices meant, I was sure that this is what I wanted to encourage with QBit.

12 Factor Microservices

More recently people have got me interested in reactive streaming, reactive programming and 12-factor microservice development. Again, many of these things we have been doing in one shape, form or another but giving the ideas name gives them power as they are easier to communicate. For a while now, after working on some of these high-speed, cloud deployed, services, I have been wanting to add (and have added in ad hoc ideas) many of the concepts from these ideas into QBIt and the application that we have written with QBit and Vertx. I see that Vertx 3 added a lot of the same things that I added to QBit for microservice monitoring. The idea for 12-factor microservice development time has come. Vertx 3 did such a job, that I decided to make it so you could embed QBit inside of Vertx (again as this was how it was originally) as well as use Vertx as a network lib.

Back to micro-batching

QBit support micro-batching. It is built into QBit’s core. One of the first set of experiments I did with QBit was try to find a decently fast implementation of micro-batching (although at the time I did not know the term was even called micro-batching and I was calling it a batching queue or the poor man’s disruptor).

Code walk through

Let's do some perf testing.
The very core of QBit is the queue. Use these flags when starting up the garbage collector.

Setup garbage collector

-Xms4g -Xmx4g -XX:+UseG1GC

Trade class

    public class Trade {
        final String name;
        final long amount;

        public Trade(String name, long amount) {
            this.name = name;
            this.amount = amount;
        }

        public String getName() {
            return name;
        }

        public long getAmount() {
            return amount;
        }
    }
We have a simple trade class. We will then send this through the queue at a rate of 100 million a second. Here is how we construct the queue.

Create a queue

        final QueueBuilder queueBuilder = QueueBuilder
                .queueBuilder()
                    .setName("trades")
                    .setBatchSize(batchSize)
                    .setSize(size)
                    .setPollWait(pollWait);


        final Queue<Trade> queue = queueBuilder.build();
The size is the size of the underlying java util queue (if appropriate). The batch size is how many messages that we are sending each time. Unless flush is called. The poll wait is how long you want to wait (poll wait) after you get a null from a poll.
To listen to trades coming into the queue, we will use a simple mechanism.

Increment an atomic integer

        final AtomicLong tradeCounter = new AtomicLong();

        queue.startListener(item -> {

            tradeCounter.incrementAndGet();
        });
We could make this more efficient.
The micro batching is on the client side of the equation.

SendQueue

        final SendQueue<Trade> tradeSendQueue = queue.sendQueue();
        for (int c = 0; c < tradeCount; c++) {
            tradeSendQueue.send(new Trade("ibm", 100L));
        }
        tradeSendQueue.flushSends();
We created a helper method to run messages through the queue (over and over).

Method to run our perf test

private static void run(int runs, int tradeCount, 
                                   int batchSize, int checkEvery, 
                                   int numThreads, int pollWait, 
                                   int size) {

        final QueueBuilder queueBuilder = QueueBuilder
                .queueBuilder()
                    .setName("trades")
                    .setBatchSize(batchSize)
                    .setSize(size)
                    .setPollWait(pollWait);
You can specify the numThreads (how many threads), runs (how many runs), and the parameters we talked about before.
With this setup:

Perf test

    public static void main(final String... args) throws Exception {

        final int runs = 76;
        final int tradeCount = 220_000;
        final int batchSize = 1_000;
        final int checkEvery = 0;
        final int numThreads = 6;
        final int pollWait = 1_000;
        final int size = 1_000_000;

        for (int index = 0; index < 100; index++) {
            run(runs, tradeCount, batchSize, checkEvery, numThreads, pollWait, size);
        }
    }
With this, we were able to get this:
DONE traded 100,320,000 in 1001 ms 
batchSize = 1,000, checkEvery = 0, threads= 6

DONE traded 100,320,000 in 999 ms 
batchSize = 1,000, checkEvery = 0, threads= 6 

DONE traded 100,320,000 in 987 ms 
batchSize = 1,000, checkEvery = 0, threads= 6 
It takes ten or so runs for the GC etc. to tune itself.
We consistently get over 100M messages per second. It took a while to tweak the runs, tradeCount, etc. to get 100M messages per second. If we drop to 50,000 we can be a lot more flexible with number of threads, trade count per thread, etc.
Here is the complete code listing to get 100M TPS.

Complete code listing for 100 M TPS test

package io.advantageous.qbit.example.perf;

import io.advantageous.boon.core.Sys;
import io.advantageous.qbit.queue.Queue;
import io.advantageous.qbit.queue.QueueBuilder;
import io.advantageous.qbit.queue.SendQueue;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public class QueuePerfMain {

    public static class Trade {
        final String name;
        final long amount;

        public Trade(String name, long amount) {
            this.name = name;
            this.amount = amount;
        }

        public String getName() {
            return name;
        }

        public long getAmount() {
            return amount;
        }
    }

    public static void main(final String... args) throws Exception {

        final int runs = 76;
        final int tradeCount = 220_000;
        final int batchSize = 1_000;
        final int checkEvery = 0;
        final int numThreads = 6;
        final int pollWait = 1_000;
        final int size = 1_000_000;


        for (int index = 0; index < 10; index++) {
            run(2, 1000, 10, 3, 2, 100, size);
        }

        for (int index = 0; index < 100; index++) {
            run(runs, tradeCount, batchSize, checkEvery, numThreads, pollWait, size);
        }
    }


    private static void run(int runs, int tradeCount, int batchSize, int checkEvery, int numThreads, int pollWait, int size) {

        final QueueBuilder queueBuilder = QueueBuilder
                .queueBuilder()
                    .setName("trades")
                    .setBatchSize(batchSize)
                    .setSize(size)
                    .setPollWait(pollWait);


        final int totalTrades = tradeCount * runs * numThreads;

        if (checkEvery > 0) {
            queueBuilder.setLinkTransferQueue();
            queueBuilder.setCheckEvery(checkEvery);
            queueBuilder.setBatchSize(batchSize);
        }

        final Queue<Trade> queue = queueBuilder.build();
        final AtomicLong tradeCounter = new AtomicLong();

        queue.startListener(item -> {
            item.getAmount();
            item.getName();
            tradeCounter.incrementAndGet();
        });


        final long startRun = System.currentTimeMillis();

        for (int r = 0; r < runs; r++) {
            runThreads(tradeCount, numThreads, queue, tradeCounter);
        }
        System.out.printf("DONE traded %,d in %d ms \nbatchSize = %,d, checkEvery = %,d, threads= %,d \n\n",
                totalTrades,
                System.currentTimeMillis() - startRun,
                batchSize,
                checkEvery,
                numThreads);
        queue.stop();

    }

    private static void runThreads(int tradeCount, int numThreads, Queue<Trade> queue, AtomicLong tradeCounter) {
        final List<Thread> threads = new ArrayList<>();
        for (int t = 0; t < numThreads; t++) {

            final Thread thread = new Thread(() -> {
                sendMessages(queue, tradeCount);
            });
            thread.start();
            threads.add(thread);
        }
        for (int index = 0; index < 100000; index++) {
            Sys.sleep(10);
            if (tradeCounter.get() >= (tradeCount * numThreads)) {
                break;
            }
        }
    }

    private static void sendMessages(final Queue<Trade> queue, final int tradeCount) {
        final SendQueue<Trade> tradeSendQueue = queue.sendQueue();
        for (int c = 0; c < tradeCount; c++) {
            tradeSendQueue.send(new Trade("ibm", 100L));
        }
        tradeSendQueue.flushSends();
    }


}

There are ways to optimize the test. I have similar tests running up to 200M TPS but the code is a lot harder to follow. This is fairly decent speed and the code is easy to follow and explain.
To compare micro-batching to not using micro-batching, I reduce the messages to 48 million instead of 100 million. I tried running no batching through 100 million and it seem to hang for a long time. I am sure it would have finished eventually. But I am not that patient.
It takes no batching about 3000 milliseconds to process 48 million messages. Fairly consistently with a really wide standard deviation. But a batch size of 1000, yields 550 milliseconds to process with a very tight standard deviation. This becomes more pronounced as the service becomes more CPU intensive.
More to come.
Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training