Rick

Rick
Rick

Sunday, February 8, 2015

benchmarking QBit queues - 200M+ messaging (microservice: HTTP, WebSocket, JSON, Java lib)

Doing some perf benchmarks so I know when refactors make performance better or worse.
This first test is a no op. Just sending a message and counting the messages sent.

QBit Queue speed 200M messages

Graph
Doing some perf benchmarks so I know when refactors make performance better or worse.

QBit with ArrayBlockingQueue/LinkTransferQueue various batches and writer thread counts and implementations

        final LongList times = new LongList();

        for (int writers = 0; writers < 20; writers++) {
            final QueueBuilder qb = queueBuilder()
                                    .setBatchSize(10)
                                    .setSize(10_000_000)
                                    .setArrayBlockingQueue();

            puts("NUM WRITER THREADS", writers+1);
            perfTest(qb, 1, writers+1, 100_000_000, 5_000, times, 10_000);
            System.gc();
            Sys.sleep(1000);

        }
Graph
Not that the above is log for ms. 500 ms is doable for 100M messages with a batch size of 1,000.
A batch size of 10,000 comes out to a rate of 280M messages a second.
Looking at in non-log for time:
Graph
Here is what 200M messages look like.
Graph
At a batch size of 100 it does ok. 1,000 it does quite well. The larger the batch size, the better job it does no giving up the thread so the 1 reader thread can work.
We can see the rate for the larger batch sizes (1,000, 10,000 and 100,000) achieve a rate of 400 M messages a second.
Now pushing it to 400 M messaging, we get
Graph
At this point we are running into garbage collector overhead (I think). You can see the effective rate has slipped. It went down to around 200 M messages a second.
After running the profiler, there really does not appear to be that much garbage collection. It could be an issue of buffer creation time. One could create a feedback system to return spent buffers. The thread handoff cost savings, seems to be canceled out the buffer creation time at a certain level.
Two ways to get around this, is adopt a full ring buffer approach and/or adopt a buffer recycling approach to return spent buffers during idle periods.
Graph
Adding a 10 ms pause in the writer threads every 10,000,000 sends seems to smooth out the chart. Not sure why. But it makes the difference between 1,000, 10,000, and 100,000 mostly go away.
Also tried this test with the LinkedTransferQueue as an option.
Graph
One writer does very poorly as does a batch size of 100. Let's increase the minimum writer to 2, and drop the 100 batch size so we can see the numbers a little better.
Graph
229 M messages a second using QBit LinkedTransferQueue.
Graph
        final QueueBuilder queueBuilder = queueBuilder()
                .setBatchSize(batchSize)
                .setLinkTransferQueue().setCheckEvery(checkEvery);
You have the option for CPU intensive readers, to check to see if the reader (or readers) are busy. If they reader is not busy, then you can send him what you have before you reach the full batch size.
In this perf test, this feature is a wash since the reader is more or less a no-op.
Next, we do 1,000,000 operations for each of the 400 M messages.
Graph
        final int batchSize = 10_000;
        final int totalSends = 400_000_000;
        final int timeout = 5_000;
        final int fudgeFactor = 100;
        final int sleepAmount = 10;
        final int sleepEvery = 10_000_000;
        final int checkEvery = batchSize/10;
        final boolean cpuIntensive = true;
        final int times = 1_000_000;

//         LTQ check every & try transfer
//        final QueueBuilder queueBuilder = queueBuilder()
//                .setBatchSize(batchSize)
//                .setLinkTransferQueue()
//                .setCheckEvery(checkEvery).setTryTransfer(true);


        //Check every
        final QueueBuilder queueBuilder = queueBuilder()
                .setBatchSize(batchSize)
                .setLinkTransferQueue()
                .setCheckEvery(checkEvery);

          //LTQ
//        final QueueBuilder queueBuilder = queueBuilder()
//                .setBatchSize(batchSize)
//                .setLinkTransferQueue();

           //LBQ
//        final QueueBuilder queueBuilder = queueBuilder()
//                .setBatchSize(batchSize)
//                .setSize(10_000_000).setArrayBlockingQueue();

       //warmup
        perfTest(queueBuilder, 1, 10, totalSends, 50_000, new LongList(), fudgeFactor, sleepAmount, sleepEvery, cpuIntensive, times);
        System.gc();
        Sys.sleep(10_000);


        final LongList timeMeasurements = new LongList();

        for (int writers = 0; writers < 25; writers+=5) {
            int numThreads = writers == 0 ? writers+2 : writers;
            perfTest(queueBuilder, 1, numThreads, totalSends, timeout, timeMeasurements, fudgeFactor, sleepAmount, sleepEvery, cpuIntensive, times);
            Sys.sleep(500);
            System.gc();
            Sys.sleep(10_000);
        }

Add more intensive CPU operation (added a loop around last one).
Graph
        final int batchSize = 10_000;
        final int totalSends = 100_000_000;
        final int timeout = 5_000;
        final int fudgeFactor = 100;
        final int sleepAmount = 10;
        final int sleepEvery = 10_000_000;
        final int checkEvery = 1000;
        final boolean cpuIntensive = true;
        final int times = 2_000_000_000;


    //        final QueueBuilder queueBuilder = queueBuilder()
    //                .setBatchSize(batchSize)
    //                .setLinkTransferQueue()
    //                .setCheckEvery(checkEvery);//.setTryTransfer(true);


//        final QueueBuilder queueBuilder = queueBuilder()
//                .setBatchSize(batchSize)
//                .setLinkTransferQueue();


        final QueueBuilder queueBuilder = queueBuilder()
                .setBatchSize(batchSize)
                .setSize(10_000_000).setArrayBlockingQueue();

        perfTest(queueBuilder, 1, 10, totalSends, 50_000, new LongList(), fudgeFactor, sleepAmount, sleepEvery, cpuIntensive, times);
        System.gc();
        Sys.sleep(10_000);


        final LongList timeMeasurements = new LongList();

        for (int writers = 0; writers < 25; writers+=5) {
            int numThreads = writers == 0 ? writers+2 : writers;
            perfTest(queueBuilder, 1, numThreads, totalSends, timeout, timeMeasurements, fudgeFactor, sleepAmount, sleepEvery, cpuIntensive, times);
            Sys.sleep(500);
            System.gc();
            Sys.sleep(3_000);
        }
Loop for CPU intensive
  public void read() {

            Integer value = receiveQueue.poll();

            while (true) {

                while (value != null) {
                    total += value;

                    if (total % 10 == 0) {
                        totalOut = total;
                    }
                    if (cpuIntensive && total % 13 == 0) {
                        doSomething(value);
                    }
                    value = receiveQueue.poll();
                }

                totalOut = total;
                value = receiveQueue.pollWait();
                if (stop.get()) {
                    return;
                }
            }
        }



        private void doSomething(Integer value) {

            long lv = 0;
            for (int j = 0; j < 10; j++) {
                for (int index = 0; index < times; index++) {
                    lv = value * index % 13 + index;
                    lv = lv * 47;
                    lv = lv * 1000;
                    lv = lv * 13 + lv % 31;
                }
                this.answer.set(this.answer.get() + lv);
            }
        }

Code for test before cleanup and LinkedTransferQueue work

I hand edited the different params in the main method of this class.
package io.advantageous.qbit.perf;

import io.advantageous.qbit.queue.Queue;
import io.advantageous.qbit.queue.QueueBuilder;
import io.advantageous.qbit.queue.ReceiveQueue;
import io.advantageous.qbit.queue.SendQueue;
import org.boon.collections.LongList;
import org.boon.core.Sys;
import org.junit.Before;

import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static io.advantageous.qbit.queue.QueueBuilder.queueBuilder;
import static org.boon.Boon.puts;

/**
 * Created by rhightower on 2/7/15.
 */
public class PerfTest {

    static class TestReader {

        int total;

        volatile int totalOut;

        AtomicBoolean stop = new AtomicBoolean();

        private final Queue<Integer> queue;

        private final ReceiveQueue<Integer> receiveQueue;

        TestReader(final Queue<Integer> queue) {
            this.queue = queue;
            this.receiveQueue = queue.receiveQueue();
        }


        public void stop() {
            stop.set(true);
        }
        public void read() {

            Integer value = receiveQueue.poll();

            while (true) {

                while (value != null) {
                    total += value;

                    if (total % 10 == 0) {
                        totalOut = total;
                    }
                    value = receiveQueue.poll();
                }

                totalOut = total;
                value = receiveQueue.pollWait();
                if (stop.get()) {
                    return;
                }
            }
        }
    }


    public static String fmt(int num) {
        return String.format("%,d", num);
    }

    public static String fmt(long num) {
        return String.format("%,d", num);
    }

    public static void  perfTest(
            final QueueBuilder queueBuilder,
            final int readers,
            final int writers,
            final int totalCountExpected,
            final int timeOut,
            LongList readTimes,
            int extra, int sleepAmount, int sleepEvery ) throws Exception{

        final int itemsEachThread = totalCountExpected / writers +1;


        final long start = System.currentTimeMillis();

        puts("---------------------------------------------------------");


        final Queue<Integer> queue = queueBuilder.build();
        final List<TestReader> readerList = new ArrayList<>(readers);
        final List<Thread> writeThreadList = new ArrayList<>(writers);
        final List<Thread> readerThreadList = new ArrayList<>(readers);


        for (int index = 0; index < writers; index++) {
            int amountEachThread = itemsEachThread + (totalCountExpected%writers) + extra;
            createWriterThread(writeThreadList, queue, amountEachThread, sleepAmount, sleepEvery);
        }

        final long writeThreadsStarted = System.currentTimeMillis();


        for (int index = 0; index < readers; index++) {

            final TestReader reader = new TestReader(queue);
            readerList.add(reader);
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    reader.read();
                }
            });
            thread.start();
            readerThreadList.add(thread);
        }

        final long readThreadsStarted = System.currentTimeMillis();
        final AtomicLong writeDuration = new AtomicLong();

        final Thread writerThreadCounter = writeTimer(writeThreadList, writeThreadsStarted, writeDuration);

        int localCount = 0;

        int statusCount =0;


        exitLoop:
        while (true) {
            Sys.sleep(10);
            statusCount++;

            if (statusCount % 10 == 0) {
                System.out.print(".");
            }


            if (statusCount % 100 == 0) {
                System.out.println(fmt(localCount));
            }
            if (start - System.currentTimeMillis() > timeOut) {
                break;
            }
            for (TestReader reader : readerList) {
                localCount = reader.totalOut;
                if (localCount >= totalCountExpected) {
                    break exitLoop;
                }
            }
        }


        long end = System.currentTimeMillis();



        for (TestReader testReader : readerList) {
            testReader.stop();
        }
        Sys.sleep(100);



        puts("\n---------------------------------------------------------");
        puts(
                "\nThreads readers    \t", readers,
                "\n        writers    \t", writers,
                "\nMessage count      \t", fmt(totalCountExpected),
                "\nMsg cnt per thrd   \t", fmt(itemsEachThread),
                "\nBatch size         \t", fmt(queueBuilder.getBatchSize()),
                "\nNum batches        \t", fmt(queueBuilder.getSize()),
                "\n---------------------------------------------------");


        puts( "\nCount          \t", fmt(localCount),
              "\nRead time total\t", fmt(end - readThreadsStarted),
              "\nWrite time     \t", fmt(writeDuration.get())
                );

        readTimes.add(end - readThreadsStarted);


        try {
            writerThreadCounter.join(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }



    }

    private static Thread writeTimer(final List<Thread> writeThreadList,
                                     final long writeThreadsStarted,
                                     final AtomicLong writeDuration) throws Exception {


        ListIterator<Thread> threadListIterator = writeThreadList.listIterator();

        while (threadListIterator.hasNext()) {
            Thread thread = threadListIterator.next();
            if (!thread.isAlive()) {
                threadListIterator.remove();
            }
        }
        Sys.sleep(10);

        Thread thread = new Thread(new Runnable() {
            @Override
            public void run()  {

                Sys.sleep(10);

                ListIterator<Thread> threadListIterator = writeThreadList.listIterator();

                while (threadListIterator.hasNext()) {
                    Thread thread = threadListIterator.next();
                    if (!thread.isAlive()) {
                        threadListIterator.remove();
                    }
                }
                Sys.sleep(10);


                while (threadListIterator.hasNext()) {
                    Thread thread = threadListIterator.next();
                    if (!thread.isAlive()) {
                        threadListIterator.remove();
                    }
                }
                Sys.sleep(10);
                for (Thread writerThread : writeThreadList) {
                    try {
                        writerThread.join(1000);
                    } catch (InterruptedException e) {
                        continue;
                    }
                }
                writeDuration.set(System.currentTimeMillis() - writeThreadsStarted);
            }
        });
        thread.start();
        return thread;
    }

    private static void createWriterThread(final List<Thread> threadList,
                                           final Queue<Integer> queue,
                                           final int itemsEachThread,
                                           final int sleepAmount,
                                           final int sleepEvery) {
        Thread thread = new Thread(new Runnable() {

            @Override
            public void run() {
                final SendQueue<Integer> integerSendQueue = queue.sendQueue();

                int count = 0;

                for (int index = 0; index < itemsEachThread; index++) {
                    count++;
                    if (count > sleepEvery) {
                        count = 0;
                        if (sleepAmount>0) {
                            Sys.sleep(sleepAmount);
                        }
                    }

                    integerSendQueue.send(1);
                }
                integerSendQueue.flushSends();
            }
        });
        threadList.add(thread);
        thread.start();
    }


    public static void main (String... args) throws Exception {

        final QueueBuilder queueBuilder = queueBuilder().setBatchSize(100_000).setSize(10_000_000).setArrayBlockingQueue();
        /*

    public static void perfTest(
                         final QueueBuilder queueBuilder,
                         final int readers,
                         final int writers,
                         final int itemsEachThread,
                         final int timeOut) {

         */

        perfTest(queueBuilder, 1, 10, 500_000_000, 50_000, new LongList(), 100, 10, 1_000_000);

        System.gc();
        Sys.sleep(10_000);


        final LongList times = new LongList();

        for (int writers = 0; writers < 20; writers+=3) {
            final QueueBuilder qb = queueBuilder().setBatchSize(100).setSize(10_000_000).setArrayBlockingQueue();

            puts("NUM WRITER THREADS", writers+1);
            perfTest(qb, 1, writers+1, 400_000_000, 5_000, times, 100, 10, 10_000_000);
            System.gc();
            Sys.sleep(5_000);

        }

//        final LongList times = new LongList();
//
//        for (int writers = 0; writers < 20; writers++) {
//            final QueueBuilder qb = queueBuilder().setBatchSize(100).setLinkTransferQueue();
//            puts("NUM WRITER THREADS", writers+1);
//            perfTest(queueBuilder, 1, writers+1, 500_000_000, 5_000, times, 10_000);
//            System.gc();
//            Sys.sleep(1000);
//
//        }


        for (Long value : times) {
            puts(value);
        }

        puts(times);
        puts(
                "\nmin    \t", times.min(),
                "\nmax    \t", times.max(),
                "\nmean   \t", times.mean(),
                "\nmedian \t", times.median(),
                "\nstddev \t", times.standardDeviation());
    }

    @Before
    public void setup() {

    }
}

Raw output of first run

--------------------------------------------------------- 
........
--------------------------------------------------------- 

Threads readers      1 
        writers      10 
Message count        100,000,000 
Msg cnt per thrd     10,000,001 
Batch size           1,000 
Num batches          10,000 
--------------------------------------------------- 

Count            100,100,000 
Read time total  1,041 
Write time       983 
NUM WRITER THREADS 1 
--------------------------------------------------------- 
..........182,866,530
..........356,086,570
.......
--------------------------------------------------------- 

Threads readers      1 
        writers      1 
Message count        500,000,000 
Msg cnt per thrd     500,000,001 
Batch size           1,000 
Num batches          10,000 
--------------------------------------------------- 

Count            500,010,000 
Read time total  2,954 
Write time       1,022 
NUM WRITER THREADS 2 
--------------------------------------------------------- 
..........233,081,330
.........
--------------------------------------------------------- 

Threads readers      1 
        writers      2 
Message count        500,000,000 
Msg cnt per thrd     250,000,001 
Batch size           1,000 
Num batches          10,000 
--------------------------------------------------- 

Count            500,020,000 
Read time total  2,057 
Write time       2,015 
NUM WRITER THREADS 3 
--------------------------------------------------------- 
..........132,838,200
..........267,134,950
..........401,589,720
.......
--------------------------------------------------------- 

Threads readers      1 
        writers      3 
Message count        500,000,000 
Msg cnt per thrd     166,666,667 
Batch size           1,000 
Num batches          10,000 
--------------------------------------------------- 

Count            500,028,000 
Read time total  3,936 
Write time       3,022 
NUM WRITER THREADS 4 
--------------------------------------------------------- 
..........130,954,760
..........264,850,920
..........399,794,450
.......
--------------------------------------------------------- 

Threads readers      1 
        writers      4 
Message count        500,000,000 
Msg cnt per thrd     125,000,001 
Batch size           1,000 
Num batches          10,000 
--------------------------------------------------- 

Count            500,040,000 
Read time total  3,885 
Write time       3,801 
NUM WRITER THREADS 5 
--------------------------------------------------------- 
..........133,938,020
..........270,768,490
..........406,734,990
......
--------------------------------------------------------- 

Threads readers      1 
        writers      5 
Message count        500,000,000 
Msg cnt per thrd     100,000,001 
Batch size           1,000 
Num batches          10,000 
--------------------------------------------------- 

Count            500,050,000 
Read time total  3,870 
Write time       3,798 
NUM WRITER THREADS 6 
--------------------------------------------------------- 
..........132,912,550
..........268,018,930
..........402,073,920
.......
--------------------------------------------------------- 

Threads readers      1 
        writers      6 
Message count        500,000,000 
Msg cnt per thrd     83,333,334 
Batch size           1,000 
Num batches          10,000 
--------------------------------------------------- 

Count            500,058,000 
Read time total  3,872 
Write time       3,814 
NUM WRITER THREADS 7 
--------------------------------------------------------- 
..........131,462,020
..........265,798,160
..........399,964,000
.......
--------------------------------------------------------- 

Threads readers      1 
        writers      7 
Message count        500,000,000 
Msg cnt per thrd     71,428,572 
Batch size           1,000 
Num batches          10,000 
--------------------------------------------------- 

Count            500,066,000 
Read time total  3,911 
Write time       3,843 
NUM WRITER THREADS 8 
--------------------------------------------------------- 
..........133,118,780
..........267,053,030
..........400,814,320
.......
--------------------------------------------------------- 

Threads readers      1 
        writers      8 
Message count        500,000,000 
Msg cnt per thrd     62,500,001 
Batch size           1,000 
Num batches          10,000 
--------------------------------------------------- 

Count            500,080,000 
Read time total  3,885 
Write time       3,819 
NUM WRITER THREADS 9 
--------------------------------------------------------- 
..........134,822,520
..........270,485,380
..........403,792,710
.......
--------------------------------------------------------- 

Threads readers      1 
        writers      9 
Message count        500,000,000 
Msg cnt per thrd     55,555,556 
Batch size           1,000 
Num batches          10,000 
--------------------------------------------------- 

Count            500,085,000 
Read time total  3,878 
Write time       3,807 
NUM WRITER THREADS 10 
--------------------------------------------------------- 
..........133,540,490
..........267,256,550
..........404,794,160
......
--------------------------------------------------------- 

Threads readers      1 
        writers      10 
Message count        500,000,000 
Msg cnt per thrd     50,000,001 
Batch size           1,000 
Num batches          10,000 
--------------------------------------------------- 

Count            500,100,000 
Read time total  3,845 
Write time       3,778 
NUM WRITER THREADS 11 
--------------------------------------------------------- 
..........131,918,610
..........268,519,590
..........403,707,110
.......
--------------------------------------------------------- 

Threads readers      1 
        writers      11 
Message count        500,000,000 
Msg cnt per thrd     45,454,546 
Batch size           1,000 
Num batches          10,000 
--------------------------------------------------- 

Count            500,104,000 
Read time total  3,857 
Write time       3,790 
NUM WRITER THREADS 12 
--------------------------------------------------------- 
..........134,007,270
..........267,971,530
..........401,474,590
.......
--------------------------------------------------------- 

Threads readers      1 
        writers      12 
Message count        500,000,000 
Msg cnt per thrd     41,666,667 
Batch size           1,000 
Num batches          10,000 
--------------------------------------------------- 

Count            500,112,000 
Read time total  3,867 
Write time       3,794 
NUM WRITER THREADS 13 
--------------------------------------------------------- 
..........133,613,020
..........266,512,930
..........400,441,870
........
--------------------------------------------------------- 

Threads readers      1 
        writers      13 
Message count        500,000,000 
Msg cnt per thrd     38,461,539 
Batch size           1,000 
Num batches          10,000 
--------------------------------------------------- 

Count            500,123,000 
Read time total  3,960 
Write time       3,895 
NUM WRITER THREADS 14 
--------------------------------------------------------- 
..........131,527,420
..........250,755,900
..........382,014,070
........
--------------------------------------------------------- 

Threads readers      1 
        writers      14 
Message count        500,000,000 
Msg cnt per thrd     35,714,286 
Batch size           1,000 
Num batches          10,000 
--------------------------------------------------- 

Count            500,136,000 
Read time total  4,017 
Write time       3,951 
NUM WRITER THREADS 15 
--------------------------------------------------------- 
..........129,543,750
..........262,070,730
..........392,889,410
.......
--------------------------------------------------------- 

Threads readers      1 
        writers      15 
Message count        500,000,000 
Msg cnt per thrd     33,333,334 
Batch size           1,000 
Num batches          10,000 
--------------------------------------------------- 

Count            500,145,000 
Read time total  3,916 
Write time       3,846 
NUM WRITER THREADS 16 
--------------------------------------------------------- 
..........132,140,000
..........266,103,410
..........398,092,510
........
--------------------------------------------------------- 

Threads readers      1 
        writers      16 
Message count        500,000,000 
Msg cnt per thrd     31,250,001 
Batch size           1,000 
Num batches          10,000 
--------------------------------------------------- 

Count            500,160,000 
Read time total  3,969 
Write time       3,904 
NUM WRITER THREADS 17 
--------------------------------------------------------- 
..........132,230,880
..........263,316,360
..........397,953,520
........
--------------------------------------------------------- 

Threads readers      1 
        writers      17 
Message count        500,000,000 
Msg cnt per thrd     29,411,765 
Batch size           1,000 
Num batches          10,000 
--------------------------------------------------- 

Count            500,157,000 
Read time total  3,976 
Write time       3,907 
NUM WRITER THREADS 18 
--------------------------------------------------------- 
..........132,937,000
..........267,508,690
..........402,516,870
.......
--------------------------------------------------------- 

Threads readers      1 
        writers      18 
Message count        500,000,000 
Msg cnt per thrd     27,777,778 
Batch size           1,000 
Num batches          10,000 
--------------------------------------------------- 

Count            500,166,000 
Read time total  3,949 
Write time       3,861 
NUM WRITER THREADS 19 
--------------------------------------------------------- 
..........130,062,110
..........265,614,200
..........386,620,890
........
--------------------------------------------------------- 

Threads readers      1 
        writers      19 
Message count        500,000,000 
Msg cnt per thrd     26,315,790 
Batch size           1,000 
Num batches          10,000 
--------------------------------------------------- 

Count            500,175,000 
Read time total  4,009 
Write time       3,944 
NUM WRITER THREADS 20 
--------------------------------------------------------- 
..........134,731,950
..........269,057,010
..........392,053,940
.......
--------------------------------------------------------- 

Threads readers      1 
        writers      20 
Message count        500,000,000 
Msg cnt per thrd     25,000,001 
Batch size           1,000 
Num batches          10,000 
--------------------------------------------------- 

Count            500,200,000 
Read time total  3,957 
Write time       3,895 
[2954, 2057, 3936, 3885, 3870, 3872, 3911, 3885, 3878, 3845, 3857, 3867, 3960, 4017, 3916, 3969, 3976, 3949, 4009, 3957] 

min      2057 
max      4017 
mean     3779 
median   3898 
stddev   450 

LinkedTransfer Queue

For the linked transfer queue. I client up the code a bit.

Cleaned up code using linked transfer queue.

package io.advantageous.qbit.perf;

import io.advantageous.qbit.queue.Queue;
import io.advantageous.qbit.queue.QueueBuilder;
import io.advantageous.qbit.queue.ReceiveQueue;
import io.advantageous.qbit.queue.SendQueue;
import org.boon.collections.LongList;
import org.boon.core.Sys;
import org.junit.Before;

import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static io.advantageous.qbit.queue.QueueBuilder.queueBuilder;
import static org.boon.Boon.puts;

/**
 * Created by rhightower on 2/7/15.
 */
public class PerfTest {

    static class TestReader {

        int total;

        volatile int totalOut;

        AtomicBoolean stop = new AtomicBoolean();

        private final Queue<Integer> queue;

        private final ReceiveQueue<Integer> receiveQueue;

        TestReader(final Queue<Integer> queue) {
            this.queue = queue;
            this.receiveQueue = queue.receiveQueue();
        }


        public void stop() {
            stop.set(true);
        }
        public void read() {

            Integer value = receiveQueue.poll();

            while (true) {

                while (value != null) {
                    total += value;

                    if (total % 10 == 0) {
                        totalOut = total;
                    }
                    value = receiveQueue.poll();
                }

                totalOut = total;
                value = receiveQueue.pollWait();
                if (stop.get()) {
                    return;
                }
            }
        }
    }


    public static String fmt(int num) {
        return String.format("%,d", num);
    }

    public static String fmt(long num) {
        return String.format("%,d", num);
    }

    public static void  perfTest(
            final QueueBuilder queueBuilder,
            final int readers,
            final int writers,
            final int totalCountExpected,
            final int timeOut,
            LongList readTimes,
            int extra, int sleepAmount, int sleepEvery ) throws Exception{

        final int itemsEachThread = totalCountExpected / writers +1;


        final long start = System.currentTimeMillis();

        puts("---------------------------------------------------------");


        final Queue<Integer> queue = queueBuilder.build();
        final List<TestReader> readerList = new ArrayList<>(readers);
        final List<Thread> writeThreadList = new ArrayList<>(writers);
        final List<Thread> readerThreadList = new ArrayList<>(readers);


        for (int index = 0; index < writers; index++) {
            int amountEachThread = itemsEachThread + (totalCountExpected%writers) + extra;
            createWriterThread(writeThreadList, queue, amountEachThread, sleepAmount, sleepEvery);
        }

        final long writeThreadsStarted = System.currentTimeMillis();


        for (int index = 0; index < readers; index++) {

            final TestReader reader = new TestReader(queue);
            readerList.add(reader);
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    reader.read();
                }
            });
            thread.start();
            readerThreadList.add(thread);
        }

        final long readThreadsStarted = System.currentTimeMillis();
        final AtomicLong writeDuration = new AtomicLong();

        final Thread writerThreadCounter = writeTimer(writeThreadList, writeThreadsStarted, writeDuration);

        int localCount = 0;

        int statusCount =0;


        exitLoop:
        while (true) {
            Sys.sleep(10);
            statusCount++;

            if (statusCount % 10 == 0) {
                System.out.print(".");
            }


            if (statusCount % 100 == 0) {
                System.out.println(fmt(localCount));
            }
            if (start - System.currentTimeMillis() > timeOut) {
                break;
            }
            for (TestReader reader : readerList) {
                localCount = reader.totalOut;
                if (localCount >= totalCountExpected) {
                    break exitLoop;
                }
            }
        }


        long end = System.currentTimeMillis();



        for (TestReader testReader : readerList) {
            testReader.stop();
        }
        Sys.sleep(100);



        puts("\n---------------------------------------------------------");
        puts(
                "\nThreads readers    \t", readers,
                "\n        writers    \t", writers,
                "\nMessage count      \t", fmt(totalCountExpected),
                "\nMsg cnt per thrd   \t", fmt(itemsEachThread),
                "\nBatch size         \t", fmt(queueBuilder.getBatchSize()),
                "\nNum batches        \t", fmt(queueBuilder.getSize()),
                "\n---------------------------------------------------");


        puts( "\nCount          \t", fmt(localCount),
              "\nRead time total\t", fmt(end - readThreadsStarted),
              "\nWrite time     \t", fmt(writeDuration.get())
                );

        readTimes.add(end - readThreadsStarted);


        try {
            writerThreadCounter.join(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }



    }

    private static Thread writeTimer(final List<Thread> writeThreadList,
                                     final long writeThreadsStarted,
                                     final AtomicLong writeDuration) throws Exception {


        ListIterator<Thread> threadListIterator = writeThreadList.listIterator();

        while (threadListIterator.hasNext()) {
            Thread thread = threadListIterator.next();
            if (!thread.isAlive()) {
                threadListIterator.remove();
            }
        }
        Sys.sleep(10);

        Thread thread = new Thread(new Runnable() {
            @Override
            public void run()  {

                Sys.sleep(10);

                ListIterator<Thread> threadListIterator = writeThreadList.listIterator();

                while (threadListIterator.hasNext()) {
                    Thread thread = threadListIterator.next();
                    if (!thread.isAlive()) {
                        threadListIterator.remove();
                    }
                }
                Sys.sleep(10);


                while (threadListIterator.hasNext()) {
                    Thread thread = threadListIterator.next();
                    if (!thread.isAlive()) {
                        threadListIterator.remove();
                    }
                }
                Sys.sleep(10);
                for (Thread writerThread : writeThreadList) {
                    try {
                        writerThread.join(1000);
                    } catch (InterruptedException e) {
                        continue;
                    }
                }
                writeDuration.set(System.currentTimeMillis() - writeThreadsStarted);
            }
        });
        thread.start();
        return thread;
    }

    private static void createWriterThread(final List<Thread> threadList,
                                           final Queue<Integer> queue,
                                           final int itemsEachThread,
                                           final int sleepAmount,
                                           final int sleepEvery) {
        Thread thread = new Thread(new Runnable() {

            @Override
            public void run() {
                final SendQueue<Integer> integerSendQueue = queue.sendQueue();

                int count = 0;

                for (int index = 0; index < itemsEachThread; index++) {
                    count++;
                    if (count > sleepEvery) {
                        count = 0;
                        if (sleepAmount>0) {
                            Sys.sleep(sleepAmount);
                        }
                    }

                    integerSendQueue.send(1);
                }
                integerSendQueue.flushSends();
            }
        });
        threadList.add(thread);
        thread.start();
    }


    public static void main (String... args) throws Exception {


        final int batchSize = 100;
        final int totalSends = 400_000_000;
        final int timeout = 5_000;
        final int fudgeFactor = 100;
        final int sleepAmount = 100;
        final int sleepEvery = 10_000_000;

        final QueueBuilder warmUpBuilder = queueBuilder().setBatchSize(batchSize).setLinkTransferQueue();
        perfTest(warmUpBuilder, 1, 10, totalSends, 50_000, new LongList(), fudgeFactor, sleepAmount, sleepEvery);
        System.gc();
        Sys.sleep(10_000);


        final LongList timeMeasurements = new LongList();
        final QueueBuilder qb = queueBuilder().setBatchSize(batchSize).setLinkTransferQueue();

        for (int writers = 0; writers < 25; writers+=5) {
            int numThreads = writers == 0 ? writers+2 : writers;
            perfTest(qb, 1, numThreads, totalSends, timeout, timeMeasurements, fudgeFactor, sleepAmount, sleepEvery);
            Sys.sleep(500);
            System.gc();
            Sys.sleep(5000);
        }

//        final LongList times = new LongList();
//
//        for (int writers = 0; writers < 20; writers++) {
//            final QueueBuilder qb = queueBuilder().setBatchSize(100).setLinkTransferQueue();
//            puts("NUM WRITER THREADS", writers+1);
//            perfTest(queueBuilder, 1, writers+1, 500_000_000, 5_000, times, 10_000);
//            System.gc();
//            Sys.sleep(1000);
//
//        }


        for (Long value : timeMeasurements) {
            puts(value);
        }

        puts(timeMeasurements);
        puts(
                "\nmin    \t", timeMeasurements.min(),
                "\nmax    \t", timeMeasurements.max(),
                "\nmean   \t", timeMeasurements.mean(),
                "\nmedian \t", timeMeasurements.median(),
                "\nstddev \t", timeMeasurements.standardDeviation());
    }

    @Before
    public void setup() {

    }
}

Test after adding CPU intensive gak

package io.advantageous.qbit.perf;

import io.advantageous.qbit.queue.Queue;
import io.advantageous.qbit.queue.QueueBuilder;
import io.advantageous.qbit.queue.ReceiveQueue;
import io.advantageous.qbit.queue.SendQueue;
import org.boon.collections.LongList;
import org.boon.core.Sys;
import org.junit.Before;

import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static io.advantageous.qbit.queue.QueueBuilder.queueBuilder;
import static org.boon.Boon.puts;

/**
 * Created by rhightower on 2/7/15.
 */
public class PerfTest {

    static class TestReader {

        private final boolean cpuIntensive;
        private final int times;
        int total;

        volatile int totalOut;

        public AtomicLong answer = new AtomicLong();

        AtomicBoolean stop = new AtomicBoolean();

        private final Queue<Integer> queue;

        private final ReceiveQueue<Integer> receiveQueue;

        TestReader(final Queue<Integer> queue, boolean cpuIntensive, int times) {
            this.queue = queue;
            this.receiveQueue = queue.receiveQueue();
            this.cpuIntensive = cpuIntensive;
            this.times = times;
        }


        public void stop() {
            stop.set(true);
        }
        public void read() {

            Integer value = receiveQueue.poll();

            while (true) {

                while (value != null) {
                    total += value;

                    if (total % 10 == 0) {
                        totalOut = total;
                    }
                    if (cpuIntensive) {
                        doSomething(value);
                    }
                    value = receiveQueue.poll();
                }

                totalOut = total;
                value = receiveQueue.pollWait();
                if (stop.get()) {
                    return;
                }
            }
        }

        private void doSomething(Integer value) {

            long lv = 0;
            for (int index = 0; index< times; index++) {
                lv = value * index % 13 + index;
            }
            this.answer.set(lv);
        }
    }


    public static String fmt(int num) {
        return String.format("%,d", num);
    }

    public static String fmt(long num) {
        return String.format("%,d", num);
    }

    public static void  perfTest(
            final QueueBuilder queueBuilder,
            final int readers,
            final int writers,
            final int totalCountExpected,
            final int timeOut,
            LongList readTimes,
            int extra, int sleepAmount, int sleepEvery,
            boolean cpuIntensive, int times) throws Exception{

        final int itemsEachThread = totalCountExpected / writers +1;


        final long start = System.currentTimeMillis();

        puts("---------------------------------------------------------");


        final Queue<Integer> queue = queueBuilder.build();
        final List<TestReader> readerList = new ArrayList<>(readers);
        final List<Thread> writeThreadList = new ArrayList<>(writers);
        final List<Thread> readerThreadList = new ArrayList<>(readers);


        for (int index = 0; index < writers; index++) {
            int amountEachThread = itemsEachThread + (totalCountExpected%writers) + extra;
            createWriterThread(writeThreadList, queue, amountEachThread, sleepAmount, sleepEvery);
        }

        final long writeThreadsStarted = System.currentTimeMillis();


        for (int index = 0; index < readers; index++) {

            final TestReader reader = new TestReader(queue, cpuIntensive, times);
            readerList.add(reader);
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    reader.read();
                }
            });
            thread.start();
            readerThreadList.add(thread);
        }

        final long readThreadsStarted = System.currentTimeMillis();
        final AtomicLong writeDuration = new AtomicLong();

        final Thread writerThreadCounter = writeTimer(writeThreadList, writeThreadsStarted, writeDuration);

        int localCount = 0;

        int statusCount =0;


        exitLoop:
        while (true) {
            Sys.sleep(10);
            statusCount++;

            if (statusCount % 10 == 0) {
                System.out.print(".");
            }


            if (statusCount % 100 == 0) {
                System.out.println(fmt(localCount));
            }
            if (start - System.currentTimeMillis() > timeOut) {
                break;
            }
            for (TestReader reader : readerList) {
                localCount = reader.totalOut;
                if (localCount >= totalCountExpected) {
                    break exitLoop;
                }
            }
        }


        long end = System.currentTimeMillis();



        for (TestReader testReader : readerList) {
            testReader.stop();
            puts(testReader.answer.get());
        }
        Sys.sleep(100);



        puts("\n---------------------------------------------------------");
        puts(
                "\nThreads readers    \t", readers,
                "\n        writers    \t", writers,
                "\nMessage count      \t", fmt(totalCountExpected),
                "\nMsg cnt per thrd   \t", fmt(itemsEachThread),
                "\nBatch size         \t", fmt(queueBuilder.getBatchSize()),
                "\nNum batches        \t", fmt(queueBuilder.getSize()),
                "\n---------------------------------------------------");


        puts( "\nCount          \t", fmt(localCount),
              "\nRead time total\t", fmt(end - readThreadsStarted),
              "\nWrite time     \t", fmt(writeDuration.get())
                );

        readTimes.add(end - readThreadsStarted);


        try {
            writerThreadCounter.join(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }



    }

    private static Thread writeTimer(final List<Thread> writeThreadList,
                                     final long writeThreadsStarted,
                                     final AtomicLong writeDuration) throws Exception {


        ListIterator<Thread> threadListIterator = writeThreadList.listIterator();

        while (threadListIterator.hasNext()) {
            Thread thread = threadListIterator.next();
            if (!thread.isAlive()) {
                threadListIterator.remove();
            }
        }
        Sys.sleep(10);

        Thread thread = new Thread(new Runnable() {
            @Override
            public void run()  {

                Sys.sleep(10);

                ListIterator<Thread> threadListIterator = writeThreadList.listIterator();

                while (threadListIterator.hasNext()) {
                    Thread thread = threadListIterator.next();
                    if (!thread.isAlive()) {
                        threadListIterator.remove();
                    }
                }
                Sys.sleep(10);


                while (threadListIterator.hasNext()) {
                    Thread thread = threadListIterator.next();
                    if (!thread.isAlive()) {
                        threadListIterator.remove();
                    }
                }
                Sys.sleep(10);
                for (Thread writerThread : writeThreadList) {
                    try {
                        writerThread.join(1000);
                    } catch (InterruptedException e) {
                        continue;
                    }
                }
                writeDuration.set(System.currentTimeMillis() - writeThreadsStarted);
            }
        });
        thread.start();
        return thread;
    }

    private static void createWriterThread(final List<Thread> threadList,
                                           final Queue<Integer> queue,
                                           final int itemsEachThread,
                                           final int sleepAmount,
                                           final int sleepEvery) {
        Thread thread = new Thread(new Runnable() {

            @Override
            public void run() {
                final SendQueue<Integer> integerSendQueue = queue.sendQueue();

                int count = 0;

                for (int index = 0; index < itemsEachThread; index++) {
                    count++;
                    if (count > sleepEvery) {
                        count = 0;
                        if (sleepAmount>0) {
                            Sys.sleep(sleepAmount);
                        }
                    }

                    integerSendQueue.send(1);
                }
                integerSendQueue.flushSends();
            }
        });
        threadList.add(thread);
        thread.start();
    }


    public static void main (String... args) throws Exception {


        final int batchSize = 10_000;
        final int totalSends = 400_000_000;
        final int timeout = 5_000;
        final int fudgeFactor = 100;
        final int sleepAmount = 10;
        final int sleepEvery = 10_000_000;
        final int checkEvery = batchSize/10;
        final boolean cpuIntensive = true;
        final int times = 1_000_000;


        final QueueBuilder queueBuilder = queueBuilder()
                .setBatchSize(batchSize)
                .setLinkTransferQueue()
                .setCheckEvery(checkEvery);


//        final QueueBuilder queueBuilder = queueBuilder()
//                .setBatchSize(batchSize)
//                .setLinkTransferQueue();


//        final QueueBuilder queueBuilder = queueBuilder()
//                .setBatchSize(batchSize)
//                .setSize(10_000_000).setArrayBlockingQueue();

        perfTest(queueBuilder, 1, 10, totalSends, 50_000, new LongList(), fudgeFactor, sleepAmount, sleepEvery, cpuIntensive, times);
        System.gc();
        Sys.sleep(10_000);


        final LongList timeMeasurements = new LongList();

        for (int writers = 0; writers < 25; writers+=5) {
            int numThreads = writers == 0 ? writers+2 : writers;
            perfTest(queueBuilder, 1, numThreads, totalSends, timeout, timeMeasurements, fudgeFactor, sleepAmount, sleepEvery, cpuIntensive, times);
            Sys.sleep(500);
            System.gc();
            Sys.sleep(10_000);
        }



        for (Long value : timeMeasurements) {
            puts(value);
        }

        puts(timeMeasurements);
        puts(
                "\nmin    \t", timeMeasurements.min(),
                "\nmax    \t", timeMeasurements.max(),
                "\nmean   \t", timeMeasurements.mean(),
                "\nmedian \t", timeMeasurements.median(),
                "\nstddev \t", timeMeasurements.standardDeviation());
    }

    @Before
    public void setup() {

    }
}

Making it EXTRA Cpu intensive and spikey

package io.advantageous.qbit.perf;

import io.advantageous.qbit.queue.Queue;
import io.advantageous.qbit.queue.QueueBuilder;
import io.advantageous.qbit.queue.ReceiveQueue;
import io.advantageous.qbit.queue.SendQueue;
import org.boon.collections.LongList;
import org.boon.core.Sys;
import org.junit.Before;

import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static io.advantageous.qbit.queue.QueueBuilder.queueBuilder;
import static org.boon.Boon.puts;

/**
 * Created by rhightower on 2/7/15.
 */
public class PerfTest {

    static class TestReader {

        private final boolean cpuIntensive;
        private final int times;
        int total;

        volatile int totalOut;

        public AtomicLong answer = new AtomicLong();

        AtomicBoolean stop = new AtomicBoolean();

        private final Queue<Integer> queue;

        private final ReceiveQueue<Integer> receiveQueue;

        TestReader(final Queue<Integer> queue, boolean cpuIntensive, int times) {
            this.queue = queue;
            this.receiveQueue = queue.receiveQueue();
            this.cpuIntensive = cpuIntensive;
            this.times = times;
        }


        public void stop() {
            stop.set(true);
        }
        public void read() {

            Integer value = receiveQueue.poll();

            while (true) {

                while (value != null) {
                    total += value;

                    if (total % 10 == 0) {
                        totalOut = total;
                    }
                    if (cpuIntensive && total % 13 == 0) {
                        doSomething(value);
                    }
                    value = receiveQueue.poll();
                }

                totalOut = total;
                value = receiveQueue.pollWait();
                if (stop.get()) {
                    return;
                }
            }
        }

        private void doSomething(Integer value) {

            long lv = 0;
            for (int j = 0; j < 10; j++) {
                for (int index = 0; index < times; index++) {
                    lv = value * index % 13 + index;
                    lv = lv * 47;
                    lv = lv * 1000;
                    lv = lv * 13 + lv % 31;
                }
                this.answer.set(this.answer.get() + lv);
            }
        }
    }


    public static String fmt(int num) {
        return String.format("%,d", num);
    }

    public static String fmt(long num) {
        return String.format("%,d", num);
    }

    public static void  perfTest(
            final QueueBuilder queueBuilder,
            final int readers,
            final int writers,
            final int totalCountExpected,
            final int timeOut,
            LongList readTimes,
            int extra, int sleepAmount, int sleepEvery,
            boolean cpuIntensive, int times) throws Exception{

        final int itemsEachThread = totalCountExpected / writers +1;


        final long start = System.currentTimeMillis();

        puts("---------------------------------------------------------");


        final Queue<Integer> queue = queueBuilder.build();
        final List<TestReader> readerList = new ArrayList<>(readers);
        final List<Thread> writeThreadList = new ArrayList<>(writers);
        final List<Thread> readerThreadList = new ArrayList<>(readers);


        for (int index = 0; index < writers; index++) {
            int amountEachThread = itemsEachThread + (totalCountExpected%writers) + extra;
            createWriterThread(writeThreadList, queue, amountEachThread, sleepAmount, sleepEvery);
        }

        final long writeThreadsStarted = System.currentTimeMillis();


        for (int index = 0; index < readers; index++) {

            final TestReader reader = new TestReader(queue, cpuIntensive, times);
            readerList.add(reader);
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    reader.read();
                }
            });
            thread.start();
            readerThreadList.add(thread);
        }

        final long readThreadsStarted = System.currentTimeMillis();
        final AtomicLong writeDuration = new AtomicLong();

        final Thread writerThreadCounter = writeTimer(writeThreadList, writeThreadsStarted, writeDuration);

        int localCount = 0;

        int statusCount =0;


        exitLoop:
        while (true) {
            Sys.sleep(10);
            statusCount++;

            if (statusCount % 10 == 0) {
                System.out.print(".");
            }


            if (statusCount % 100 == 0) {
                System.out.println(fmt(localCount));
            }
            if (start - System.currentTimeMillis() > timeOut) {
                break;
            }
            for (TestReader reader : readerList) {
                localCount = reader.totalOut;
                if (localCount >= totalCountExpected) {
                    break exitLoop;
                }
            }
        }


        long end = System.currentTimeMillis();



        for (TestReader testReader : readerList) {
            testReader.stop();
            puts(testReader.answer.get());
        }
        Sys.sleep(100);



        puts("\n---------------------------------------------------------");
        puts(
                "\nThreads readers    \t", readers,
                "\n        writers    \t", writers,
                "\nMessage count      \t", fmt(totalCountExpected),
                "\nMsg cnt per thrd   \t", fmt(itemsEachThread),
                "\nBatch size         \t", fmt(queueBuilder.getBatchSize()),
                "\nNum batches        \t", fmt(queueBuilder.getSize()),
                "\n---------------------------------------------------");


        puts( "\nCount          \t", fmt(localCount),
              "\nRead time total\t", fmt(end - readThreadsStarted),
              "\nWrite time     \t", fmt(writeDuration.get())
                );

        readTimes.add(end - readThreadsStarted);


        try {
            writerThreadCounter.join(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }



    }

    private static Thread writeTimer(final List<Thread> writeThreadList,
                                     final long writeThreadsStarted,
                                     final AtomicLong writeDuration) throws Exception {


        ListIterator<Thread> threadListIterator = writeThreadList.listIterator();

        while (threadListIterator.hasNext()) {
            Thread thread = threadListIterator.next();
            if (!thread.isAlive()) {
                threadListIterator.remove();
            }
        }
        Sys.sleep(10);

        Thread thread = new Thread(new Runnable() {
            @Override
            public void run()  {

                Sys.sleep(10);

                ListIterator<Thread> threadListIterator = writeThreadList.listIterator();

                while (threadListIterator.hasNext()) {
                    Thread thread = threadListIterator.next();
                    if (!thread.isAlive()) {
                        threadListIterator.remove();
                    }
                }
                Sys.sleep(10);


                while (threadListIterator.hasNext()) {
                    Thread thread = threadListIterator.next();
                    if (!thread.isAlive()) {
                        threadListIterator.remove();
                    }
                }
                Sys.sleep(10);
                for (Thread writerThread : writeThreadList) {
                    try {
                        writerThread.join(1000);
                    } catch (InterruptedException e) {
                        continue;
                    }
                }
                writeDuration.set(System.currentTimeMillis() - writeThreadsStarted);
            }
        });
        thread.start();
        return thread;
    }

    private static void createWriterThread(final List<Thread> threadList,
                                           final Queue<Integer> queue,
                                           final int itemsEachThread,
                                           final int sleepAmount,
                                           final int sleepEvery) {
        Thread thread = new Thread(new Runnable() {

            @Override
            public void run() {
                final SendQueue<Integer> integerSendQueue = queue.sendQueue();

                int count = 0;

                for (int index = 0; index < itemsEachThread; index++) {
                    count++;
                    if (count > sleepEvery) {
                        count = 0;
                        if (sleepAmount>0) {
                            Sys.sleep(sleepAmount);
                        }
                    }

                    integerSendQueue.send(1);
                }
                integerSendQueue.flushSends();
            }
        });
        threadList.add(thread);
        thread.start();
    }


    public static void main (String... args) throws Exception {


        final int batchSize = 10_000;
        final int totalSends = 100_000_000;
        final int timeout = 5_000;
        final int fudgeFactor = 100;
        final int sleepAmount = 10;
        final int sleepEvery = 10_000_000;
        final int checkEvery = 1000;
        final boolean cpuIntensive = true;
        final int times = 2_000_000_000;


    //        final QueueBuilder queueBuilder = queueBuilder()
    //                .setBatchSize(batchSize)
    //                .setLinkTransferQueue()
    //                .setCheckEvery(checkEvery);//.setTryTransfer(true);


//        final QueueBuilder queueBuilder = queueBuilder()
//                .setBatchSize(batchSize)
//                .setLinkTransferQueue();


        final QueueBuilder queueBuilder = queueBuilder()
                .setBatchSize(batchSize)
                .setSize(10_000_000).setArrayBlockingQueue();

        perfTest(queueBuilder, 1, 10, totalSends, 50_000, new LongList(), fudgeFactor, sleepAmount, sleepEvery, cpuIntensive, times);
        System.gc();
        Sys.sleep(10_000);


        final LongList timeMeasurements = new LongList();

        for (int writers = 0; writers < 25; writers+=5) {
            int numThreads = writers == 0 ? writers+2 : writers;
            perfTest(queueBuilder, 1, numThreads, totalSends, timeout, timeMeasurements, fudgeFactor, sleepAmount, sleepEvery, cpuIntensive, times);
            Sys.sleep(500);
            System.gc();
            Sys.sleep(3_000);
        }



        for (Long value : timeMeasurements) {
            puts(value);
        }

        puts(timeMeasurements);
        puts(
                "\nmin    \t", timeMeasurements.min(),
                "\nmax    \t", timeMeasurements.max(),
                "\nmean   \t", timeMeasurements.mean(),
                "\nmedian \t", timeMeasurements.median(),
                "\nstddev \t", timeMeasurements.standardDeviation());
    }

    @Before
    public void setup() {

    }
}

If you want to learn more about QBit, follows some of these links:



  • [Detailed Tutorial] QBit microservice example
  • [Doc] Queue Callbacks for QBit queue based services
  • [Quick Start] Building a simple Rest web microservice server with QBit
  • [Quick Start] Building a TODO web microservice client with QBit
  • [Quick Start] Building a TODO web microservice server with QBit
  • [Quick Start] Building boon for the QBit microservice engine
  • [Quick Start] Building QBit the microservice lib for Java
  • [Rough Cut] Delivering up Single Page Applications from QBit Java JSON Microservice lib
  • [Rough Cut] Working with event bus for QBit the microservice engine
  • [Rough Cut] Working with inproc MicroServices
  • [Rough Cut] Working with private event bus for inproc microservices
  • [Rough Cut] Working with strongly typed event bus proxies for QBit Java Microservice lib
  • [Rough Cut] Working with System Manager for QBit Mircoservice lib
  • [Z Notebook] More benchmarking internal
  • [Z Notebook] Performance testing for REST
  • [Z Notebook] Roadmap
  • Home
  • Introduction to QBit
  • Local Service Proxies
  • QBit Boon New Wave of JSON HTTP and Websocket
  • QBit Docs
  • Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training