Doing some perf benchmarks so I know when refactors make performance better or worse.
This first test is a no op. Just sending a message and counting the messages sent.
QBit Queue speed 200M messages
Doing some perf benchmarks so I know when refactors make performance better or worse.
QBit with ArrayBlockingQueue/LinkTransferQueue various batches and writer thread counts and implementations
final LongList times = new LongList();
for (int writers = 0; writers < 20; writers++) {
final QueueBuilder qb = queueBuilder()
.setBatchSize(10)
.setSize(10_000_000)
.setArrayBlockingQueue();
puts("NUM WRITER THREADS", writers+1);
perfTest(qb, 1, writers+1, 100_000_000, 5_000, times, 10_000);
System.gc();
Sys.sleep(1000);
}
Not that the above is log for ms. 500 ms is doable for 100M messages with a batch size of 1,000.
A batch size of 10,000 comes out to a rate of 280M messages a second.
Looking at in non-log for time:
Here is what 200M messages look like.
At a batch size of 100 it does ok. 1,000 it does quite well. The larger the batch size, the better job it does no giving up the thread so the 1 reader thread can work.
We can see the rate for the larger batch sizes (1,000, 10,000 and 100,000) achieve a rate of 400 M messages a second.
Now pushing it to 400 M messaging, we get
At this point we are running into garbage collector overhead (I think). You can see the effective rate has slipped. It went down to around 200 M messages a second.
After running the profiler, there really does not appear to be that much garbage collection. It could be an issue of buffer creation time. One could create a feedback system to return spent buffers. The thread handoff cost savings, seems to be canceled out the buffer creation time at a certain level.
Two ways to get around this, is adopt a full ring buffer approach and/or adopt a buffer recycling approach to return spent buffers during idle periods.
Adding a 10 ms pause in the writer threads every 10,000,000 sends seems to smooth out the chart. Not sure why. But it makes the difference between 1,000, 10,000, and 100,000 mostly go away.
Also tried this test with the LinkedTransferQueue as an option.
One writer does very poorly as does a batch size of 100. Let's increase the minimum writer to 2, and drop the 100 batch size so we can see the numbers a little better.
229 M messages a second using QBit LinkedTransferQueue.
final QueueBuilder queueBuilder = queueBuilder()
.setBatchSize(batchSize)
.setLinkTransferQueue().setCheckEvery(checkEvery);
You have the option for CPU intensive readers, to check to see if the reader (or readers) are busy. If they reader is not busy, then you can send him what you have before you reach the full batch size.
In this perf test, this feature is a wash since the reader is more or less a no-op.
Next, we do 1,000,000 operations for each of the 400 M messages.
final int batchSize = 10_000;
final int totalSends = 400_000_000;
final int timeout = 5_000;
final int fudgeFactor = 100;
final int sleepAmount = 10;
final int sleepEvery = 10_000_000;
final int checkEvery = batchSize/10;
final boolean cpuIntensive = true;
final int times = 1_000_000;
// LTQ check every & try transfer
// final QueueBuilder queueBuilder = queueBuilder()
// .setBatchSize(batchSize)
// .setLinkTransferQueue()
// .setCheckEvery(checkEvery).setTryTransfer(true);
//Check every
final QueueBuilder queueBuilder = queueBuilder()
.setBatchSize(batchSize)
.setLinkTransferQueue()
.setCheckEvery(checkEvery);
//LTQ
// final QueueBuilder queueBuilder = queueBuilder()
// .setBatchSize(batchSize)
// .setLinkTransferQueue();
//LBQ
// final QueueBuilder queueBuilder = queueBuilder()
// .setBatchSize(batchSize)
// .setSize(10_000_000).setArrayBlockingQueue();
//warmup
perfTest(queueBuilder, 1, 10, totalSends, 50_000, new LongList(), fudgeFactor, sleepAmount, sleepEvery, cpuIntensive, times);
System.gc();
Sys.sleep(10_000);
final LongList timeMeasurements = new LongList();
for (int writers = 0; writers < 25; writers+=5) {
int numThreads = writers == 0 ? writers+2 : writers;
perfTest(queueBuilder, 1, numThreads, totalSends, timeout, timeMeasurements, fudgeFactor, sleepAmount, sleepEvery, cpuIntensive, times);
Sys.sleep(500);
System.gc();
Sys.sleep(10_000);
}
Add more intensive CPU operation (added a loop around last one).
final int batchSize = 10_000;
final int totalSends = 100_000_000;
final int timeout = 5_000;
final int fudgeFactor = 100;
final int sleepAmount = 10;
final int sleepEvery = 10_000_000;
final int checkEvery = 1000;
final boolean cpuIntensive = true;
final int times = 2_000_000_000;
// final QueueBuilder queueBuilder = queueBuilder()
// .setBatchSize(batchSize)
// .setLinkTransferQueue()
// .setCheckEvery(checkEvery);//.setTryTransfer(true);
// final QueueBuilder queueBuilder = queueBuilder()
// .setBatchSize(batchSize)
// .setLinkTransferQueue();
final QueueBuilder queueBuilder = queueBuilder()
.setBatchSize(batchSize)
.setSize(10_000_000).setArrayBlockingQueue();
perfTest(queueBuilder, 1, 10, totalSends, 50_000, new LongList(), fudgeFactor, sleepAmount, sleepEvery, cpuIntensive, times);
System.gc();
Sys.sleep(10_000);
final LongList timeMeasurements = new LongList();
for (int writers = 0; writers < 25; writers+=5) {
int numThreads = writers == 0 ? writers+2 : writers;
perfTest(queueBuilder, 1, numThreads, totalSends, timeout, timeMeasurements, fudgeFactor, sleepAmount, sleepEvery, cpuIntensive, times);
Sys.sleep(500);
System.gc();
Sys.sleep(3_000);
}
Loop for CPU intensive
public void read() {
Integer value = receiveQueue.poll();
while (true) {
while (value != null) {
total += value;
if (total % 10 == 0) {
totalOut = total;
}
if (cpuIntensive && total % 13 == 0) {
doSomething(value);
}
value = receiveQueue.poll();
}
totalOut = total;
value = receiveQueue.pollWait();
if (stop.get()) {
return;
}
}
}
private void doSomething(Integer value) {
long lv = 0;
for (int j = 0; j < 10; j++) {
for (int index = 0; index < times; index++) {
lv = value * index % 13 + index;
lv = lv * 47;
lv = lv * 1000;
lv = lv * 13 + lv % 31;
}
this.answer.set(this.answer.get() + lv);
}
}
Code for test before cleanup and LinkedTransferQueue work
I hand edited the different params in the main method of this class.
package io.advantageous.qbit.perf;
import io.advantageous.qbit.queue.Queue;
import io.advantageous.qbit.queue.QueueBuilder;
import io.advantageous.qbit.queue.ReceiveQueue;
import io.advantageous.qbit.queue.SendQueue;
import org.boon.collections.LongList;
import org.boon.core.Sys;
import org.junit.Before;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static io.advantageous.qbit.queue.QueueBuilder.queueBuilder;
import static org.boon.Boon.puts;
/**
* Created by rhightower on 2/7/15.
*/
public class PerfTest {
static class TestReader {
int total;
volatile int totalOut;
AtomicBoolean stop = new AtomicBoolean();
private final Queue<Integer> queue;
private final ReceiveQueue<Integer> receiveQueue;
TestReader(final Queue<Integer> queue) {
this.queue = queue;
this.receiveQueue = queue.receiveQueue();
}
public void stop() {
stop.set(true);
}
public void read() {
Integer value = receiveQueue.poll();
while (true) {
while (value != null) {
total += value;
if (total % 10 == 0) {
totalOut = total;
}
value = receiveQueue.poll();
}
totalOut = total;
value = receiveQueue.pollWait();
if (stop.get()) {
return;
}
}
}
}
public static String fmt(int num) {
return String.format("%,d", num);
}
public static String fmt(long num) {
return String.format("%,d", num);
}
public static void perfTest(
final QueueBuilder queueBuilder,
final int readers,
final int writers,
final int totalCountExpected,
final int timeOut,
LongList readTimes,
int extra, int sleepAmount, int sleepEvery ) throws Exception{
final int itemsEachThread = totalCountExpected / writers +1;
final long start = System.currentTimeMillis();
puts("---------------------------------------------------------");
final Queue<Integer> queue = queueBuilder.build();
final List<TestReader> readerList = new ArrayList<>(readers);
final List<Thread> writeThreadList = new ArrayList<>(writers);
final List<Thread> readerThreadList = new ArrayList<>(readers);
for (int index = 0; index < writers; index++) {
int amountEachThread = itemsEachThread + (totalCountExpected%writers) + extra;
createWriterThread(writeThreadList, queue, amountEachThread, sleepAmount, sleepEvery);
}
final long writeThreadsStarted = System.currentTimeMillis();
for (int index = 0; index < readers; index++) {
final TestReader reader = new TestReader(queue);
readerList.add(reader);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
reader.read();
}
});
thread.start();
readerThreadList.add(thread);
}
final long readThreadsStarted = System.currentTimeMillis();
final AtomicLong writeDuration = new AtomicLong();
final Thread writerThreadCounter = writeTimer(writeThreadList, writeThreadsStarted, writeDuration);
int localCount = 0;
int statusCount =0;
exitLoop:
while (true) {
Sys.sleep(10);
statusCount++;
if (statusCount % 10 == 0) {
System.out.print(".");
}
if (statusCount % 100 == 0) {
System.out.println(fmt(localCount));
}
if (start - System.currentTimeMillis() > timeOut) {
break;
}
for (TestReader reader : readerList) {
localCount = reader.totalOut;
if (localCount >= totalCountExpected) {
break exitLoop;
}
}
}
long end = System.currentTimeMillis();
for (TestReader testReader : readerList) {
testReader.stop();
}
Sys.sleep(100);
puts("\n---------------------------------------------------------");
puts(
"\nThreads readers \t", readers,
"\n writers \t", writers,
"\nMessage count \t", fmt(totalCountExpected),
"\nMsg cnt per thrd \t", fmt(itemsEachThread),
"\nBatch size \t", fmt(queueBuilder.getBatchSize()),
"\nNum batches \t", fmt(queueBuilder.getSize()),
"\n---------------------------------------------------");
puts( "\nCount \t", fmt(localCount),
"\nRead time total\t", fmt(end - readThreadsStarted),
"\nWrite time \t", fmt(writeDuration.get())
);
readTimes.add(end - readThreadsStarted);
try {
writerThreadCounter.join(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static Thread writeTimer(final List<Thread> writeThreadList,
final long writeThreadsStarted,
final AtomicLong writeDuration) throws Exception {
ListIterator<Thread> threadListIterator = writeThreadList.listIterator();
while (threadListIterator.hasNext()) {
Thread thread = threadListIterator.next();
if (!thread.isAlive()) {
threadListIterator.remove();
}
}
Sys.sleep(10);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
Sys.sleep(10);
ListIterator<Thread> threadListIterator = writeThreadList.listIterator();
while (threadListIterator.hasNext()) {
Thread thread = threadListIterator.next();
if (!thread.isAlive()) {
threadListIterator.remove();
}
}
Sys.sleep(10);
while (threadListIterator.hasNext()) {
Thread thread = threadListIterator.next();
if (!thread.isAlive()) {
threadListIterator.remove();
}
}
Sys.sleep(10);
for (Thread writerThread : writeThreadList) {
try {
writerThread.join(1000);
} catch (InterruptedException e) {
continue;
}
}
writeDuration.set(System.currentTimeMillis() - writeThreadsStarted);
}
});
thread.start();
return thread;
}
private static void createWriterThread(final List<Thread> threadList,
final Queue<Integer> queue,
final int itemsEachThread,
final int sleepAmount,
final int sleepEvery) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
final SendQueue<Integer> integerSendQueue = queue.sendQueue();
int count = 0;
for (int index = 0; index < itemsEachThread; index++) {
count++;
if (count > sleepEvery) {
count = 0;
if (sleepAmount>0) {
Sys.sleep(sleepAmount);
}
}
integerSendQueue.send(1);
}
integerSendQueue.flushSends();
}
});
threadList.add(thread);
thread.start();
}
public static void main (String... args) throws Exception {
final QueueBuilder queueBuilder = queueBuilder().setBatchSize(100_000).setSize(10_000_000).setArrayBlockingQueue();
/*
public static void perfTest(
final QueueBuilder queueBuilder,
final int readers,
final int writers,
final int itemsEachThread,
final int timeOut) {
*/
perfTest(queueBuilder, 1, 10, 500_000_000, 50_000, new LongList(), 100, 10, 1_000_000);
System.gc();
Sys.sleep(10_000);
final LongList times = new LongList();
for (int writers = 0; writers < 20; writers+=3) {
final QueueBuilder qb = queueBuilder().setBatchSize(100).setSize(10_000_000).setArrayBlockingQueue();
puts("NUM WRITER THREADS", writers+1);
perfTest(qb, 1, writers+1, 400_000_000, 5_000, times, 100, 10, 10_000_000);
System.gc();
Sys.sleep(5_000);
}
// final LongList times = new LongList();
//
// for (int writers = 0; writers < 20; writers++) {
// final QueueBuilder qb = queueBuilder().setBatchSize(100).setLinkTransferQueue();
// puts("NUM WRITER THREADS", writers+1);
// perfTest(queueBuilder, 1, writers+1, 500_000_000, 5_000, times, 10_000);
// System.gc();
// Sys.sleep(1000);
//
// }
for (Long value : times) {
puts(value);
}
puts(times);
puts(
"\nmin \t", times.min(),
"\nmax \t", times.max(),
"\nmean \t", times.mean(),
"\nmedian \t", times.median(),
"\nstddev \t", times.standardDeviation());
}
@Before
public void setup() {
}
}
Raw output of first run
---------------------------------------------------------
........
---------------------------------------------------------
Threads readers 1
writers 10
Message count 100,000,000
Msg cnt per thrd 10,000,001
Batch size 1,000
Num batches 10,000
---------------------------------------------------
Count 100,100,000
Read time total 1,041
Write time 983
NUM WRITER THREADS 1
---------------------------------------------------------
..........182,866,530
..........356,086,570
.......
---------------------------------------------------------
Threads readers 1
writers 1
Message count 500,000,000
Msg cnt per thrd 500,000,001
Batch size 1,000
Num batches 10,000
---------------------------------------------------
Count 500,010,000
Read time total 2,954
Write time 1,022
NUM WRITER THREADS 2
---------------------------------------------------------
..........233,081,330
.........
---------------------------------------------------------
Threads readers 1
writers 2
Message count 500,000,000
Msg cnt per thrd 250,000,001
Batch size 1,000
Num batches 10,000
---------------------------------------------------
Count 500,020,000
Read time total 2,057
Write time 2,015
NUM WRITER THREADS 3
---------------------------------------------------------
..........132,838,200
..........267,134,950
..........401,589,720
.......
---------------------------------------------------------
Threads readers 1
writers 3
Message count 500,000,000
Msg cnt per thrd 166,666,667
Batch size 1,000
Num batches 10,000
---------------------------------------------------
Count 500,028,000
Read time total 3,936
Write time 3,022
NUM WRITER THREADS 4
---------------------------------------------------------
..........130,954,760
..........264,850,920
..........399,794,450
.......
---------------------------------------------------------
Threads readers 1
writers 4
Message count 500,000,000
Msg cnt per thrd 125,000,001
Batch size 1,000
Num batches 10,000
---------------------------------------------------
Count 500,040,000
Read time total 3,885
Write time 3,801
NUM WRITER THREADS 5
---------------------------------------------------------
..........133,938,020
..........270,768,490
..........406,734,990
......
---------------------------------------------------------
Threads readers 1
writers 5
Message count 500,000,000
Msg cnt per thrd 100,000,001
Batch size 1,000
Num batches 10,000
---------------------------------------------------
Count 500,050,000
Read time total 3,870
Write time 3,798
NUM WRITER THREADS 6
---------------------------------------------------------
..........132,912,550
..........268,018,930
..........402,073,920
.......
---------------------------------------------------------
Threads readers 1
writers 6
Message count 500,000,000
Msg cnt per thrd 83,333,334
Batch size 1,000
Num batches 10,000
---------------------------------------------------
Count 500,058,000
Read time total 3,872
Write time 3,814
NUM WRITER THREADS 7
---------------------------------------------------------
..........131,462,020
..........265,798,160
..........399,964,000
.......
---------------------------------------------------------
Threads readers 1
writers 7
Message count 500,000,000
Msg cnt per thrd 71,428,572
Batch size 1,000
Num batches 10,000
---------------------------------------------------
Count 500,066,000
Read time total 3,911
Write time 3,843
NUM WRITER THREADS 8
---------------------------------------------------------
..........133,118,780
..........267,053,030
..........400,814,320
.......
---------------------------------------------------------
Threads readers 1
writers 8
Message count 500,000,000
Msg cnt per thrd 62,500,001
Batch size 1,000
Num batches 10,000
---------------------------------------------------
Count 500,080,000
Read time total 3,885
Write time 3,819
NUM WRITER THREADS 9
---------------------------------------------------------
..........134,822,520
..........270,485,380
..........403,792,710
.......
---------------------------------------------------------
Threads readers 1
writers 9
Message count 500,000,000
Msg cnt per thrd 55,555,556
Batch size 1,000
Num batches 10,000
---------------------------------------------------
Count 500,085,000
Read time total 3,878
Write time 3,807
NUM WRITER THREADS 10
---------------------------------------------------------
..........133,540,490
..........267,256,550
..........404,794,160
......
---------------------------------------------------------
Threads readers 1
writers 10
Message count 500,000,000
Msg cnt per thrd 50,000,001
Batch size 1,000
Num batches 10,000
---------------------------------------------------
Count 500,100,000
Read time total 3,845
Write time 3,778
NUM WRITER THREADS 11
---------------------------------------------------------
..........131,918,610
..........268,519,590
..........403,707,110
.......
---------------------------------------------------------
Threads readers 1
writers 11
Message count 500,000,000
Msg cnt per thrd 45,454,546
Batch size 1,000
Num batches 10,000
---------------------------------------------------
Count 500,104,000
Read time total 3,857
Write time 3,790
NUM WRITER THREADS 12
---------------------------------------------------------
..........134,007,270
..........267,971,530
..........401,474,590
.......
---------------------------------------------------------
Threads readers 1
writers 12
Message count 500,000,000
Msg cnt per thrd 41,666,667
Batch size 1,000
Num batches 10,000
---------------------------------------------------
Count 500,112,000
Read time total 3,867
Write time 3,794
NUM WRITER THREADS 13
---------------------------------------------------------
..........133,613,020
..........266,512,930
..........400,441,870
........
---------------------------------------------------------
Threads readers 1
writers 13
Message count 500,000,000
Msg cnt per thrd 38,461,539
Batch size 1,000
Num batches 10,000
---------------------------------------------------
Count 500,123,000
Read time total 3,960
Write time 3,895
NUM WRITER THREADS 14
---------------------------------------------------------
..........131,527,420
..........250,755,900
..........382,014,070
........
---------------------------------------------------------
Threads readers 1
writers 14
Message count 500,000,000
Msg cnt per thrd 35,714,286
Batch size 1,000
Num batches 10,000
---------------------------------------------------
Count 500,136,000
Read time total 4,017
Write time 3,951
NUM WRITER THREADS 15
---------------------------------------------------------
..........129,543,750
..........262,070,730
..........392,889,410
.......
---------------------------------------------------------
Threads readers 1
writers 15
Message count 500,000,000
Msg cnt per thrd 33,333,334
Batch size 1,000
Num batches 10,000
---------------------------------------------------
Count 500,145,000
Read time total 3,916
Write time 3,846
NUM WRITER THREADS 16
---------------------------------------------------------
..........132,140,000
..........266,103,410
..........398,092,510
........
---------------------------------------------------------
Threads readers 1
writers 16
Message count 500,000,000
Msg cnt per thrd 31,250,001
Batch size 1,000
Num batches 10,000
---------------------------------------------------
Count 500,160,000
Read time total 3,969
Write time 3,904
NUM WRITER THREADS 17
---------------------------------------------------------
..........132,230,880
..........263,316,360
..........397,953,520
........
---------------------------------------------------------
Threads readers 1
writers 17
Message count 500,000,000
Msg cnt per thrd 29,411,765
Batch size 1,000
Num batches 10,000
---------------------------------------------------
Count 500,157,000
Read time total 3,976
Write time 3,907
NUM WRITER THREADS 18
---------------------------------------------------------
..........132,937,000
..........267,508,690
..........402,516,870
.......
---------------------------------------------------------
Threads readers 1
writers 18
Message count 500,000,000
Msg cnt per thrd 27,777,778
Batch size 1,000
Num batches 10,000
---------------------------------------------------
Count 500,166,000
Read time total 3,949
Write time 3,861
NUM WRITER THREADS 19
---------------------------------------------------------
..........130,062,110
..........265,614,200
..........386,620,890
........
---------------------------------------------------------
Threads readers 1
writers 19
Message count 500,000,000
Msg cnt per thrd 26,315,790
Batch size 1,000
Num batches 10,000
---------------------------------------------------
Count 500,175,000
Read time total 4,009
Write time 3,944
NUM WRITER THREADS 20
---------------------------------------------------------
..........134,731,950
..........269,057,010
..........392,053,940
.......
---------------------------------------------------------
Threads readers 1
writers 20
Message count 500,000,000
Msg cnt per thrd 25,000,001
Batch size 1,000
Num batches 10,000
---------------------------------------------------
Count 500,200,000
Read time total 3,957
Write time 3,895
[2954, 2057, 3936, 3885, 3870, 3872, 3911, 3885, 3878, 3845, 3857, 3867, 3960, 4017, 3916, 3969, 3976, 3949, 4009, 3957]
min 2057
max 4017
mean 3779
median 3898
stddev 450
LinkedTransfer Queue
For the linked transfer queue. I client up the code a bit.
Cleaned up code using linked transfer queue.
package io.advantageous.qbit.perf;
import io.advantageous.qbit.queue.Queue;
import io.advantageous.qbit.queue.QueueBuilder;
import io.advantageous.qbit.queue.ReceiveQueue;
import io.advantageous.qbit.queue.SendQueue;
import org.boon.collections.LongList;
import org.boon.core.Sys;
import org.junit.Before;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static io.advantageous.qbit.queue.QueueBuilder.queueBuilder;
import static org.boon.Boon.puts;
/**
* Created by rhightower on 2/7/15.
*/
public class PerfTest {
static class TestReader {
int total;
volatile int totalOut;
AtomicBoolean stop = new AtomicBoolean();
private final Queue<Integer> queue;
private final ReceiveQueue<Integer> receiveQueue;
TestReader(final Queue<Integer> queue) {
this.queue = queue;
this.receiveQueue = queue.receiveQueue();
}
public void stop() {
stop.set(true);
}
public void read() {
Integer value = receiveQueue.poll();
while (true) {
while (value != null) {
total += value;
if (total % 10 == 0) {
totalOut = total;
}
value = receiveQueue.poll();
}
totalOut = total;
value = receiveQueue.pollWait();
if (stop.get()) {
return;
}
}
}
}
public static String fmt(int num) {
return String.format("%,d", num);
}
public static String fmt(long num) {
return String.format("%,d", num);
}
public static void perfTest(
final QueueBuilder queueBuilder,
final int readers,
final int writers,
final int totalCountExpected,
final int timeOut,
LongList readTimes,
int extra, int sleepAmount, int sleepEvery ) throws Exception{
final int itemsEachThread = totalCountExpected / writers +1;
final long start = System.currentTimeMillis();
puts("---------------------------------------------------------");
final Queue<Integer> queue = queueBuilder.build();
final List<TestReader> readerList = new ArrayList<>(readers);
final List<Thread> writeThreadList = new ArrayList<>(writers);
final List<Thread> readerThreadList = new ArrayList<>(readers);
for (int index = 0; index < writers; index++) {
int amountEachThread = itemsEachThread + (totalCountExpected%writers) + extra;
createWriterThread(writeThreadList, queue, amountEachThread, sleepAmount, sleepEvery);
}
final long writeThreadsStarted = System.currentTimeMillis();
for (int index = 0; index < readers; index++) {
final TestReader reader = new TestReader(queue);
readerList.add(reader);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
reader.read();
}
});
thread.start();
readerThreadList.add(thread);
}
final long readThreadsStarted = System.currentTimeMillis();
final AtomicLong writeDuration = new AtomicLong();
final Thread writerThreadCounter = writeTimer(writeThreadList, writeThreadsStarted, writeDuration);
int localCount = 0;
int statusCount =0;
exitLoop:
while (true) {
Sys.sleep(10);
statusCount++;
if (statusCount % 10 == 0) {
System.out.print(".");
}
if (statusCount % 100 == 0) {
System.out.println(fmt(localCount));
}
if (start - System.currentTimeMillis() > timeOut) {
break;
}
for (TestReader reader : readerList) {
localCount = reader.totalOut;
if (localCount >= totalCountExpected) {
break exitLoop;
}
}
}
long end = System.currentTimeMillis();
for (TestReader testReader : readerList) {
testReader.stop();
}
Sys.sleep(100);
puts("\n---------------------------------------------------------");
puts(
"\nThreads readers \t", readers,
"\n writers \t", writers,
"\nMessage count \t", fmt(totalCountExpected),
"\nMsg cnt per thrd \t", fmt(itemsEachThread),
"\nBatch size \t", fmt(queueBuilder.getBatchSize()),
"\nNum batches \t", fmt(queueBuilder.getSize()),
"\n---------------------------------------------------");
puts( "\nCount \t", fmt(localCount),
"\nRead time total\t", fmt(end - readThreadsStarted),
"\nWrite time \t", fmt(writeDuration.get())
);
readTimes.add(end - readThreadsStarted);
try {
writerThreadCounter.join(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static Thread writeTimer(final List<Thread> writeThreadList,
final long writeThreadsStarted,
final AtomicLong writeDuration) throws Exception {
ListIterator<Thread> threadListIterator = writeThreadList.listIterator();
while (threadListIterator.hasNext()) {
Thread thread = threadListIterator.next();
if (!thread.isAlive()) {
threadListIterator.remove();
}
}
Sys.sleep(10);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
Sys.sleep(10);
ListIterator<Thread> threadListIterator = writeThreadList.listIterator();
while (threadListIterator.hasNext()) {
Thread thread = threadListIterator.next();
if (!thread.isAlive()) {
threadListIterator.remove();
}
}
Sys.sleep(10);
while (threadListIterator.hasNext()) {
Thread thread = threadListIterator.next();
if (!thread.isAlive()) {
threadListIterator.remove();
}
}
Sys.sleep(10);
for (Thread writerThread : writeThreadList) {
try {
writerThread.join(1000);
} catch (InterruptedException e) {
continue;
}
}
writeDuration.set(System.currentTimeMillis() - writeThreadsStarted);
}
});
thread.start();
return thread;
}
private static void createWriterThread(final List<Thread> threadList,
final Queue<Integer> queue,
final int itemsEachThread,
final int sleepAmount,
final int sleepEvery) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
final SendQueue<Integer> integerSendQueue = queue.sendQueue();
int count = 0;
for (int index = 0; index < itemsEachThread; index++) {
count++;
if (count > sleepEvery) {
count = 0;
if (sleepAmount>0) {
Sys.sleep(sleepAmount);
}
}
integerSendQueue.send(1);
}
integerSendQueue.flushSends();
}
});
threadList.add(thread);
thread.start();
}
public static void main (String... args) throws Exception {
final int batchSize = 100;
final int totalSends = 400_000_000;
final int timeout = 5_000;
final int fudgeFactor = 100;
final int sleepAmount = 100;
final int sleepEvery = 10_000_000;
final QueueBuilder warmUpBuilder = queueBuilder().setBatchSize(batchSize).setLinkTransferQueue();
perfTest(warmUpBuilder, 1, 10, totalSends, 50_000, new LongList(), fudgeFactor, sleepAmount, sleepEvery);
System.gc();
Sys.sleep(10_000);
final LongList timeMeasurements = new LongList();
final QueueBuilder qb = queueBuilder().setBatchSize(batchSize).setLinkTransferQueue();
for (int writers = 0; writers < 25; writers+=5) {
int numThreads = writers == 0 ? writers+2 : writers;
perfTest(qb, 1, numThreads, totalSends, timeout, timeMeasurements, fudgeFactor, sleepAmount, sleepEvery);
Sys.sleep(500);
System.gc();
Sys.sleep(5000);
}
// final LongList times = new LongList();
//
// for (int writers = 0; writers < 20; writers++) {
// final QueueBuilder qb = queueBuilder().setBatchSize(100).setLinkTransferQueue();
// puts("NUM WRITER THREADS", writers+1);
// perfTest(queueBuilder, 1, writers+1, 500_000_000, 5_000, times, 10_000);
// System.gc();
// Sys.sleep(1000);
//
// }
for (Long value : timeMeasurements) {
puts(value);
}
puts(timeMeasurements);
puts(
"\nmin \t", timeMeasurements.min(),
"\nmax \t", timeMeasurements.max(),
"\nmean \t", timeMeasurements.mean(),
"\nmedian \t", timeMeasurements.median(),
"\nstddev \t", timeMeasurements.standardDeviation());
}
@Before
public void setup() {
}
}
Test after adding CPU intensive gak
package io.advantageous.qbit.perf;
import io.advantageous.qbit.queue.Queue;
import io.advantageous.qbit.queue.QueueBuilder;
import io.advantageous.qbit.queue.ReceiveQueue;
import io.advantageous.qbit.queue.SendQueue;
import org.boon.collections.LongList;
import org.boon.core.Sys;
import org.junit.Before;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static io.advantageous.qbit.queue.QueueBuilder.queueBuilder;
import static org.boon.Boon.puts;
/**
* Created by rhightower on 2/7/15.
*/
public class PerfTest {
static class TestReader {
private final boolean cpuIntensive;
private final int times;
int total;
volatile int totalOut;
public AtomicLong answer = new AtomicLong();
AtomicBoolean stop = new AtomicBoolean();
private final Queue<Integer> queue;
private final ReceiveQueue<Integer> receiveQueue;
TestReader(final Queue<Integer> queue, boolean cpuIntensive, int times) {
this.queue = queue;
this.receiveQueue = queue.receiveQueue();
this.cpuIntensive = cpuIntensive;
this.times = times;
}
public void stop() {
stop.set(true);
}
public void read() {
Integer value = receiveQueue.poll();
while (true) {
while (value != null) {
total += value;
if (total % 10 == 0) {
totalOut = total;
}
if (cpuIntensive) {
doSomething(value);
}
value = receiveQueue.poll();
}
totalOut = total;
value = receiveQueue.pollWait();
if (stop.get()) {
return;
}
}
}
private void doSomething(Integer value) {
long lv = 0;
for (int index = 0; index< times; index++) {
lv = value * index % 13 + index;
}
this.answer.set(lv);
}
}
public static String fmt(int num) {
return String.format("%,d", num);
}
public static String fmt(long num) {
return String.format("%,d", num);
}
public static void perfTest(
final QueueBuilder queueBuilder,
final int readers,
final int writers,
final int totalCountExpected,
final int timeOut,
LongList readTimes,
int extra, int sleepAmount, int sleepEvery,
boolean cpuIntensive, int times) throws Exception{
final int itemsEachThread = totalCountExpected / writers +1;
final long start = System.currentTimeMillis();
puts("---------------------------------------------------------");
final Queue<Integer> queue = queueBuilder.build();
final List<TestReader> readerList = new ArrayList<>(readers);
final List<Thread> writeThreadList = new ArrayList<>(writers);
final List<Thread> readerThreadList = new ArrayList<>(readers);
for (int index = 0; index < writers; index++) {
int amountEachThread = itemsEachThread + (totalCountExpected%writers) + extra;
createWriterThread(writeThreadList, queue, amountEachThread, sleepAmount, sleepEvery);
}
final long writeThreadsStarted = System.currentTimeMillis();
for (int index = 0; index < readers; index++) {
final TestReader reader = new TestReader(queue, cpuIntensive, times);
readerList.add(reader);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
reader.read();
}
});
thread.start();
readerThreadList.add(thread);
}
final long readThreadsStarted = System.currentTimeMillis();
final AtomicLong writeDuration = new AtomicLong();
final Thread writerThreadCounter = writeTimer(writeThreadList, writeThreadsStarted, writeDuration);
int localCount = 0;
int statusCount =0;
exitLoop:
while (true) {
Sys.sleep(10);
statusCount++;
if (statusCount % 10 == 0) {
System.out.print(".");
}
if (statusCount % 100 == 0) {
System.out.println(fmt(localCount));
}
if (start - System.currentTimeMillis() > timeOut) {
break;
}
for (TestReader reader : readerList) {
localCount = reader.totalOut;
if (localCount >= totalCountExpected) {
break exitLoop;
}
}
}
long end = System.currentTimeMillis();
for (TestReader testReader : readerList) {
testReader.stop();
puts(testReader.answer.get());
}
Sys.sleep(100);
puts("\n---------------------------------------------------------");
puts(
"\nThreads readers \t", readers,
"\n writers \t", writers,
"\nMessage count \t", fmt(totalCountExpected),
"\nMsg cnt per thrd \t", fmt(itemsEachThread),
"\nBatch size \t", fmt(queueBuilder.getBatchSize()),
"\nNum batches \t", fmt(queueBuilder.getSize()),
"\n---------------------------------------------------");
puts( "\nCount \t", fmt(localCount),
"\nRead time total\t", fmt(end - readThreadsStarted),
"\nWrite time \t", fmt(writeDuration.get())
);
readTimes.add(end - readThreadsStarted);
try {
writerThreadCounter.join(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static Thread writeTimer(final List<Thread> writeThreadList,
final long writeThreadsStarted,
final AtomicLong writeDuration) throws Exception {
ListIterator<Thread> threadListIterator = writeThreadList.listIterator();
while (threadListIterator.hasNext()) {
Thread thread = threadListIterator.next();
if (!thread.isAlive()) {
threadListIterator.remove();
}
}
Sys.sleep(10);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
Sys.sleep(10);
ListIterator<Thread> threadListIterator = writeThreadList.listIterator();
while (threadListIterator.hasNext()) {
Thread thread = threadListIterator.next();
if (!thread.isAlive()) {
threadListIterator.remove();
}
}
Sys.sleep(10);
while (threadListIterator.hasNext()) {
Thread thread = threadListIterator.next();
if (!thread.isAlive()) {
threadListIterator.remove();
}
}
Sys.sleep(10);
for (Thread writerThread : writeThreadList) {
try {
writerThread.join(1000);
} catch (InterruptedException e) {
continue;
}
}
writeDuration.set(System.currentTimeMillis() - writeThreadsStarted);
}
});
thread.start();
return thread;
}
private static void createWriterThread(final List<Thread> threadList,
final Queue<Integer> queue,
final int itemsEachThread,
final int sleepAmount,
final int sleepEvery) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
final SendQueue<Integer> integerSendQueue = queue.sendQueue();
int count = 0;
for (int index = 0; index < itemsEachThread; index++) {
count++;
if (count > sleepEvery) {
count = 0;
if (sleepAmount>0) {
Sys.sleep(sleepAmount);
}
}
integerSendQueue.send(1);
}
integerSendQueue.flushSends();
}
});
threadList.add(thread);
thread.start();
}
public static void main (String... args) throws Exception {
final int batchSize = 10_000;
final int totalSends = 400_000_000;
final int timeout = 5_000;
final int fudgeFactor = 100;
final int sleepAmount = 10;
final int sleepEvery = 10_000_000;
final int checkEvery = batchSize/10;
final boolean cpuIntensive = true;
final int times = 1_000_000;
final QueueBuilder queueBuilder = queueBuilder()
.setBatchSize(batchSize)
.setLinkTransferQueue()
.setCheckEvery(checkEvery);
// final QueueBuilder queueBuilder = queueBuilder()
// .setBatchSize(batchSize)
// .setLinkTransferQueue();
// final QueueBuilder queueBuilder = queueBuilder()
// .setBatchSize(batchSize)
// .setSize(10_000_000).setArrayBlockingQueue();
perfTest(queueBuilder, 1, 10, totalSends, 50_000, new LongList(), fudgeFactor, sleepAmount, sleepEvery, cpuIntensive, times);
System.gc();
Sys.sleep(10_000);
final LongList timeMeasurements = new LongList();
for (int writers = 0; writers < 25; writers+=5) {
int numThreads = writers == 0 ? writers+2 : writers;
perfTest(queueBuilder, 1, numThreads, totalSends, timeout, timeMeasurements, fudgeFactor, sleepAmount, sleepEvery, cpuIntensive, times);
Sys.sleep(500);
System.gc();
Sys.sleep(10_000);
}
for (Long value : timeMeasurements) {
puts(value);
}
puts(timeMeasurements);
puts(
"\nmin \t", timeMeasurements.min(),
"\nmax \t", timeMeasurements.max(),
"\nmean \t", timeMeasurements.mean(),
"\nmedian \t", timeMeasurements.median(),
"\nstddev \t", timeMeasurements.standardDeviation());
}
@Before
public void setup() {
}
}
Making it EXTRA Cpu intensive and spikey
package io.advantageous.qbit.perf;
import io.advantageous.qbit.queue.Queue;
import io.advantageous.qbit.queue.QueueBuilder;
import io.advantageous.qbit.queue.ReceiveQueue;
import io.advantageous.qbit.queue.SendQueue;
import org.boon.collections.LongList;
import org.boon.core.Sys;
import org.junit.Before;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static io.advantageous.qbit.queue.QueueBuilder.queueBuilder;
import static org.boon.Boon.puts;
/**
* Created by rhightower on 2/7/15.
*/
public class PerfTest {
static class TestReader {
private final boolean cpuIntensive;
private final int times;
int total;
volatile int totalOut;
public AtomicLong answer = new AtomicLong();
AtomicBoolean stop = new AtomicBoolean();
private final Queue<Integer> queue;
private final ReceiveQueue<Integer> receiveQueue;
TestReader(final Queue<Integer> queue, boolean cpuIntensive, int times) {
this.queue = queue;
this.receiveQueue = queue.receiveQueue();
this.cpuIntensive = cpuIntensive;
this.times = times;
}
public void stop() {
stop.set(true);
}
public void read() {
Integer value = receiveQueue.poll();
while (true) {
while (value != null) {
total += value;
if (total % 10 == 0) {
totalOut = total;
}
if (cpuIntensive && total % 13 == 0) {
doSomething(value);
}
value = receiveQueue.poll();
}
totalOut = total;
value = receiveQueue.pollWait();
if (stop.get()) {
return;
}
}
}
private void doSomething(Integer value) {
long lv = 0;
for (int j = 0; j < 10; j++) {
for (int index = 0; index < times; index++) {
lv = value * index % 13 + index;
lv = lv * 47;
lv = lv * 1000;
lv = lv * 13 + lv % 31;
}
this.answer.set(this.answer.get() + lv);
}
}
}
public static String fmt(int num) {
return String.format("%,d", num);
}
public static String fmt(long num) {
return String.format("%,d", num);
}
public static void perfTest(
final QueueBuilder queueBuilder,
final int readers,
final int writers,
final int totalCountExpected,
final int timeOut,
LongList readTimes,
int extra, int sleepAmount, int sleepEvery,
boolean cpuIntensive, int times) throws Exception{
final int itemsEachThread = totalCountExpected / writers +1;
final long start = System.currentTimeMillis();
puts("---------------------------------------------------------");
final Queue<Integer> queue = queueBuilder.build();
final List<TestReader> readerList = new ArrayList<>(readers);
final List<Thread> writeThreadList = new ArrayList<>(writers);
final List<Thread> readerThreadList = new ArrayList<>(readers);
for (int index = 0; index < writers; index++) {
int amountEachThread = itemsEachThread + (totalCountExpected%writers) + extra;
createWriterThread(writeThreadList, queue, amountEachThread, sleepAmount, sleepEvery);
}
final long writeThreadsStarted = System.currentTimeMillis();
for (int index = 0; index < readers; index++) {
final TestReader reader = new TestReader(queue, cpuIntensive, times);
readerList.add(reader);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
reader.read();
}
});
thread.start();
readerThreadList.add(thread);
}
final long readThreadsStarted = System.currentTimeMillis();
final AtomicLong writeDuration = new AtomicLong();
final Thread writerThreadCounter = writeTimer(writeThreadList, writeThreadsStarted, writeDuration);
int localCount = 0;
int statusCount =0;
exitLoop:
while (true) {
Sys.sleep(10);
statusCount++;
if (statusCount % 10 == 0) {
System.out.print(".");
}
if (statusCount % 100 == 0) {
System.out.println(fmt(localCount));
}
if (start - System.currentTimeMillis() > timeOut) {
break;
}
for (TestReader reader : readerList) {
localCount = reader.totalOut;
if (localCount >= totalCountExpected) {
break exitLoop;
}
}
}
long end = System.currentTimeMillis();
for (TestReader testReader : readerList) {
testReader.stop();
puts(testReader.answer.get());
}
Sys.sleep(100);
puts("\n---------------------------------------------------------");
puts(
"\nThreads readers \t", readers,
"\n writers \t", writers,
"\nMessage count \t", fmt(totalCountExpected),
"\nMsg cnt per thrd \t", fmt(itemsEachThread),
"\nBatch size \t", fmt(queueBuilder.getBatchSize()),
"\nNum batches \t", fmt(queueBuilder.getSize()),
"\n---------------------------------------------------");
puts( "\nCount \t", fmt(localCount),
"\nRead time total\t", fmt(end - readThreadsStarted),
"\nWrite time \t", fmt(writeDuration.get())
);
readTimes.add(end - readThreadsStarted);
try {
writerThreadCounter.join(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static Thread writeTimer(final List<Thread> writeThreadList,
final long writeThreadsStarted,
final AtomicLong writeDuration) throws Exception {
ListIterator<Thread> threadListIterator = writeThreadList.listIterator();
while (threadListIterator.hasNext()) {
Thread thread = threadListIterator.next();
if (!thread.isAlive()) {
threadListIterator.remove();
}
}
Sys.sleep(10);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
Sys.sleep(10);
ListIterator<Thread> threadListIterator = writeThreadList.listIterator();
while (threadListIterator.hasNext()) {
Thread thread = threadListIterator.next();
if (!thread.isAlive()) {
threadListIterator.remove();
}
}
Sys.sleep(10);
while (threadListIterator.hasNext()) {
Thread thread = threadListIterator.next();
if (!thread.isAlive()) {
threadListIterator.remove();
}
}
Sys.sleep(10);
for (Thread writerThread : writeThreadList) {
try {
writerThread.join(1000);
} catch (InterruptedException e) {
continue;
}
}
writeDuration.set(System.currentTimeMillis() - writeThreadsStarted);
}
});
thread.start();
return thread;
}
private static void createWriterThread(final List<Thread> threadList,
final Queue<Integer> queue,
final int itemsEachThread,
final int sleepAmount,
final int sleepEvery) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
final SendQueue<Integer> integerSendQueue = queue.sendQueue();
int count = 0;
for (int index = 0; index < itemsEachThread; index++) {
count++;
if (count > sleepEvery) {
count = 0;
if (sleepAmount>0) {
Sys.sleep(sleepAmount);
}
}
integerSendQueue.send(1);
}
integerSendQueue.flushSends();
}
});
threadList.add(thread);
thread.start();
}
public static void main (String... args) throws Exception {
final int batchSize = 10_000;
final int totalSends = 100_000_000;
final int timeout = 5_000;
final int fudgeFactor = 100;
final int sleepAmount = 10;
final int sleepEvery = 10_000_000;
final int checkEvery = 1000;
final boolean cpuIntensive = true;
final int times = 2_000_000_000;
// final QueueBuilder queueBuilder = queueBuilder()
// .setBatchSize(batchSize)
// .setLinkTransferQueue()
// .setCheckEvery(checkEvery);//.setTryTransfer(true);
// final QueueBuilder queueBuilder = queueBuilder()
// .setBatchSize(batchSize)
// .setLinkTransferQueue();
final QueueBuilder queueBuilder = queueBuilder()
.setBatchSize(batchSize)
.setSize(10_000_000).setArrayBlockingQueue();
perfTest(queueBuilder, 1, 10, totalSends, 50_000, new LongList(), fudgeFactor, sleepAmount, sleepEvery, cpuIntensive, times);
System.gc();
Sys.sleep(10_000);
final LongList timeMeasurements = new LongList();
for (int writers = 0; writers < 25; writers+=5) {
int numThreads = writers == 0 ? writers+2 : writers;
perfTest(queueBuilder, 1, numThreads, totalSends, timeout, timeMeasurements, fudgeFactor, sleepAmount, sleepEvery, cpuIntensive, times);
Sys.sleep(500);
System.gc();
Sys.sleep(3_000);
}
for (Long value : timeMeasurements) {
puts(value);
}
puts(timeMeasurements);
puts(
"\nmin \t", timeMeasurements.min(),
"\nmax \t", timeMeasurements.max(),
"\nmean \t", timeMeasurements.mean(),
"\nmedian \t", timeMeasurements.median(),
"\nstddev \t", timeMeasurements.standardDeviation());
}
@Before
public void setup() {
}
}
If you want to learn more about QBit, follows some of these links:
[Detailed Tutorial] QBit microservice example
[Doc] Queue Callbacks for QBit queue based services
[Quick Start] Building a simple Rest web microservice server with QBit
[Quick Start] Building a TODO web microservice client with QBit
[Quick Start] Building a TODO web microservice server with QBit
[Quick Start] Building boon for the QBit microservice engine
[Quick Start] Building QBit the microservice lib for Java
[Rough Cut] Delivering up Single Page Applications from QBit Java JSON Microservice lib
[Rough Cut] Working with event bus for QBit the microservice engine
[Rough Cut] Working with inproc MicroServices
[Rough Cut] Working with private event bus for inproc microservices
[Rough Cut] Working with strongly typed event bus proxies for QBit Java Microservice lib
[Rough Cut] Working with System Manager for QBit Mircoservice lib
[Z Notebook] More benchmarking internal
[Z Notebook] Performance testing for REST
[Z Notebook] Roadmap
Home
Introduction to QBit
Local Service Proxies
QBit Boon New Wave of JSON HTTP and Websocket
QBit Docs
[Detailed Tutorial] QBit microservice example
No comments:
Post a Comment