Rick

Rick
Rick

Tuesday, October 13, 2015

QBit has JMS, Kafka, Redis support, etc.

QBit has JMS support. The JMS support will be similar for Kafka support and other persistent queues. You can use JMS queues, local queues and Kafka queues with the same interface.
The JMS support works with ActiveMQ with little effort and can be easily be made to work with any JMS implementation via the JmsServiceBuilder.
Two classes do the bulk of the integration with JMS as follows:
  • JmsService does the low level communication with JMS.
  • JmsServiceBuilder manages JNDI, creation of sessions, connections, destinations, etc.
Then there are three classes that adapt JMS Queues and Topics to QBit Queues.
  • JmsTextQueue exposes a JMS queue or topic as QBit Queue
  • JmsTextReceiveQueue exposes a JMS queue or topic as QBit ReceiveQueue
  • JmsTextSenderQueue exposes a JMS queue or topic as QBit SendQueue
  • JsonQueue wraps a QBit Queue<String> and converts items into JSON and from JSON.
  • EventBusQueueAdapter moves items from a queue onto the QBit event bus. It can be used to channel events from Kafka, JMS, and/or Redis into the QBit world.

Receive a messages from a queue

Receive a messages from a queue

        /** Create a new JMS Builder which can emit JmsService objects. */
        final JmsServiceBuilder jmsBuilder = JmsServiceBuilder
                                            .newJmsServiceBuilder()
                                            .setHost("somehost").setPort(6355)
                                            .setDefaultDestination("foobarQueue");


        /** Create a QBit Queue that talks to JMS. */
        final Queue<String> textQueue = new JmsTextQueue(jmsBuilder);


        /** Create a QBit ReceiveQueue that talks to JMS. */
        final ReceiveQueue<String> receiveQueue = textQueue.receiveQueue();



        /** Get a message from JMS. */
        String message  = receiveQueue.pollWait();
        out.println(message);


        /** Keep getting messages. */
        while (message!=null) {
            message = receiveQueue.poll();
            out.println(message);
        }

        out.println("DONE");

Send a messages to a queue

Send a messages to a queue

        final JmsServiceBuilder jmsBuilder = JmsServiceBuilder.newJmsServiceBuilder()
                .setDefaultDestination("foobarQueue");
        final Queue<String> textQueue = new JmsTextQueue(jmsBuilder);
        final SendQueue<String> sendQueue = textQueue.sendQueue();

        sendQueue.send("foo");
        for (int i=0; i < 10; i++) {
            sendQueue.send("foo" + i);
        }

Starting up a QBit listener to listen to a queue

Starting up a listener to listent to a queue

        final ArrayBlockingQueue<Person> personsABQ = new ArrayBlockingQueue<>(100);

        personSendQueue.send(new Person("Geoff"));
        personSendQueue.send(new Person("Rick"));
        personSendQueue.flushSends();


        personQueue.startListener(personsABQ::add); //Listen to JMS and put everything in ArrayBlockingQueue

      //This is just an example not a suggestion on API usage. 

Strongly typed queue with JsonQueue.

If you want to not work with String and instead work with strongly typed object, you can combine the JMS support with the JsonQueue.

Complete example

import java.util.concurrent.atomic.AtomicReference

import io.advantageous.qbit.admin.{MicroserviceConfig, ManagedServiceBuilder}
import io.advantageous.qbit.system.QBitSystemManager
import scala.util.Properties._
import org.slf4j.LoggerFactory._


object Main extends App{

  var qBitSystemManagerAtomicReference: AtomicReference[QBitSystemManager] = null
  System.getProperty(MicroserviceConfig.CONTEXT + "file", "/app/conf/service.json")



  start(true)


  def start(wait:Boolean) {


    qBitSystemManagerAtomicReference = new AtomicReference[QBitSystemManager]()
    /**
     * Logger to log messages.
     */
    val managedServiceBuilder: ManagedServiceBuilder = ManagedServiceBuilder.managedServiceBuilder("queue-test-client")

    val logger = getLogger("queue.Main")

    val brokerURL = envOrElse("BROKER_URL", "http://localhost:8080")

    logger.info("STARTING...")

    val testClient = new TestQueueClientService(
        managedServiceBuilder.getStatServiceBuilder.buildStatsCollector,
        brokerURL)


    managedServiceBuilder.addEndpointService(testClient)


    logger.info("PORT_WEB {}", envOrElse("PORT_WEB", "9090").toInt)
    logger.info("BROKER_URL {}", brokerURL)

    managedServiceBuilder.setPort(envOrElse("PORT_WEB", "9090").toInt)

    managedServiceBuilder.getEndpointServerBuilder
      .setUri("/api/v1")
      .build
      .startServer


    configureAdmin(managedServiceBuilder)
    logger.info("Queue Broker is open for eBusiness")
    qBitSystemManagerAtomicReference.set(managedServiceBuilder.getSystemManager)
    if (wait) {
      managedServiceBuilder.getSystemManager.waitForShutdown()
    }

  }

  /**
   * Configure the admin (we will be able to get rid of this for the last release.
   * @param managedServiceBuilder managedServiceBuilder
   */
  private def configureAdmin(managedServiceBuilder: ManagedServiceBuilder) {
    val portAdmin = envOrElse("PORT_ADMIN", "6666").toInt
    managedServiceBuilder.getAdminBuilder.setPort(portAdmin)
    managedServiceBuilder.getAdminBuilder.setMicroServiceName("calypso.queue.test")
    managedServiceBuilder.getAdminBuilder.build.startServer
  }



  def shutdown() {
    qBitSystemManagerAtomicReference.get().shutDown()
  }

}

....

import java.lang

import java.util
import java.util.Collections

import io.advantageous.qbit.annotation.RequestMethod._
import io.advantageous.boon.core.Str
import io.advantageous.qbit.annotation.{QueueCallbackType, QueueCallback, RequestMapping}
import io.advantageous.qbit.http.HTTP
import io.advantageous.qbit.jms.{JmsTextQueue, JmsServiceBuilder}
import io.advantageous.qbit.queue.{JsonQueue, SendQueue, Queue}
import io.advantageous.qbit.reactive.{ReactorBuilder, Reactor}
import io.advantageous.qbit.service.stats.StatsCollector
import io.advantageous.qbit.util.Timer
import org.slf4j.{LoggerFactory, Logger}


@RequestMapping(Array("/queue-test-client"))
class TestQueueClientService(private val statsCollector: StatsCollector,
                             private val brokerURL : String,
                             private val reactor: Reactor = ReactorBuilder.reactorBuilder().build(),
                             private val timer: Timer = Timer.timer()) {

  private var time: Long = 0L
  private var listenerURL = ""
  private val logger: Logger = LoggerFactory.getLogger(classOf[TestQueueClientService])
  private var jmsQueue : Option[Queue[Record]] = None
  private var jmsSendQueue : Option[SendQueue[Record]] = None

  init()

  def init(): Unit = {
    listenerURL = HTTP.get(brokerURL + "/api/v1/broker/ip/address-port")
    val listenerURLParts = Str.split(listenerURL.replace("\"", ""), ':')


    val jmsServiceBuilder = JmsServiceBuilder.newJmsServiceBuilder()

    jmsServiceBuilder.setHost("somehost")

    if (listenerURLParts.length == 2) {
      jmsServiceBuilder.setPort(listenerURLParts(1).toInt)
    }

    jmsQueue = Option(new JsonQueue[Record](classOf[Record], new JmsTextQueue(jmsServiceBuilder)))
    jmsSendQueue = Option(jmsQueue.get.sendQueue())


    logger.info(s"LISTENER URL $listenerURL")
  }



  /** Read description in annotation. */
  @RequestMapping(value = Array("/send"), summary = "send", description = "send some records",
    returnDescription = " just send some records", method = Array(PUT))
  def sendRecords: lang.Boolean = {

    if (jmsSendQueue.isDefined) {
      for (i <- 1 to 100) {
        jmsSendQueue.get.send(new Record("hi mom" + i))
      }
    } else {
      init()
    }
    true
  }


  /** Read description in annotation. */
  @RequestMapping(value = Array("/receive"), summary = "receive records", description = "receive some records",
    returnDescription = " just receive some records", method = Array(PUT))
  def receiveRecords: util.List[Record] = {

    if (jmsQueue.isDefined) {
      val receiveQueue = jmsQueue.get.receiveQueue()

      var record: Record = receiveQueue.poll()

      val list = new util.ArrayList[Record]()

      if (record !=null)
        do {
          list.add(record)
          record = receiveQueue.poll()
        } while (record != null)
      list
    } else {
      init()
      Collections.emptyList()
    }
  }

  /** Time service. */
  @RequestMapping(value = Array("/time"), summary = "time", description = "Used to time service to see if it is up",
    returnDescription = "just returns current time")
  def getTime = time

  /** Build number increment this on every unique build to make sure the latest is in Orchard. */
  @RequestMapping(value = Array("/build"), summary = "build number",
    description = "Used to make sure we have the right version deployed", returnDescription = "returns the build number")
  def buildNumber = {
    "1-(10-12-2015)"
  }

  /** Process Reactor stuff. */
  @QueueCallback(Array(QueueCallbackType.LIMIT, QueueCallbackType.EMPTY))
  def process () {
    reactor.process()
    time = timer.time
  }


  /** Ping service. */
  @RequestMapping(value = Array("/ping"), summary = "ping", description = "Used to ping service to see if it is up",
    returnDescription = "just returns true")
  def ping : lang.Boolean = {
    true
  }

}

Complete example 2

/*
 * Copyright (c) 2015. Rick Hightower, Geoff Chandler
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *          http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * QBit - The Microservice lib for Java : JSON, WebSocket, REST. Be The Web!
 */

package io.advantageous.qbit.jms.example.events;

import io.advantageous.boon.core.Sys;
import io.advantageous.qbit.QBit;
import io.advantageous.qbit.annotation.EventChannel;
import io.advantageous.qbit.annotation.OnEvent;
import io.advantageous.qbit.annotation.QueueCallback;
import io.advantageous.qbit.annotation.QueueCallbackType;
import io.advantageous.qbit.concurrent.PeriodicScheduler;
import io.advantageous.qbit.events.EventBusProxyCreator;
import io.advantageous.qbit.events.EventManager;
import io.advantageous.qbit.events.EventManagerBuilder;
import io.advantageous.qbit.events.spi.EventConnector;
import io.advantageous.qbit.events.spi.EventTransferObject;
import io.advantageous.qbit.jms.JmsServiceBuilder;
import io.advantageous.qbit.jms.JmsTextQueue;
import io.advantageous.qbit.queue.*;
import io.advantageous.qbit.service.ServiceQueue;
import io.advantageous.qbit.util.PortUtils;
import org.apache.activemq.broker.BrokerService;

import javax.jms.Session;
import java.util.concurrent.TimeUnit;

import static io.advantageous.qbit.service.ServiceBuilder.serviceBuilder;
import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;

/**
 * EmployeeEventExampleUsingChannelsToSendEvents
 * created by rhightower on 2/11/15.
 */
@SuppressWarnings("ALL")
public class EmployeeEventExampleUsingChannelsToSendEventsWithJMS {


    public static final String NEW_HIRE_CHANNEL = "com.mycompnay.employee.new";




    public static void main(String... args) throws Exception {


        final BrokerService broker; //JMS Broker to make this a self contained example.
        final int port; //port to bind to JMS Broker to


        /* ******************************************************************************/
        /* START JMS BROKER. ************************************************************/
        /* Start up JMS Broker. */
        port = PortUtils.findOpenPortStartAt(4000);
        broker= new BrokerService();
        broker.addConnector("tcp://localhost:"+port);
        broker.start();

        Sys.sleep(5_000);


        /* ******************************************************************************/
        /* START JMS CLIENTS FOR SERVER A AND B *******************************************/
        /* Create a JMS Builder to create JMS Queues. */
        final JmsServiceBuilder jmsBuilder = JmsServiceBuilder.newJmsServiceBuilder().setPort(port)
                .setDefaultDestination(NEW_HIRE_CHANNEL).setAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);


        /* JMS client for server A. */
        final JsonQueue<Employee> employeeJsonQueueServerA =
                new JsonQueue<>(Employee.class, new JmsTextQueue(jmsBuilder));


        /* JMS client for server B. */
        final JsonQueue<Employee> employeeJsonQueueServerB =
                new JsonQueue<>(Employee.class, new JmsTextQueue(jmsBuilder));

        /* Send Queue to send messages to JMS broker. */
        final SendQueue<Employee> sendQueueA = employeeJsonQueueServerA.sendQueue();
        Sys.sleep(1_000);




        /*  ReceiveQueueB Queue B to receive messages from JMS broker. */
        final ReceiveQueue<Employee> receiveQueueB = employeeJsonQueueServerB.receiveQueue();
        Sys.sleep(1_000);




        /* ******************************************************************************/
        /* START EVENT BUS A  ************************************************************/
        /* Create you own private event bus for Server A. */
        final EventManager privateEventBusServerAInternal = EventManagerBuilder.eventManagerBuilder()
                .setEventConnector(new EventConnector() {
                    @Override
                    public void forwardEvent(EventTransferObject<Object> event) {

                        if (event.channel().equals(NEW_HIRE_CHANNEL)) {
                            System.out.println(event);
                            final Object body = event.body();
                            final Object[] bodyArray = ((Object[]) body);
                            final Employee employee = (Employee) bodyArray[0];
                            System.out.println(employee);
                            sendQueueA.sendAndFlush(employee);
                        }
                    }
                })
                .setName("serverAEventBus").build();

                /* Create a service queue for this event bus. */
        final ServiceQueue privateEventBusServiceQueueA = serviceBuilder()
                .setServiceObject(privateEventBusServerAInternal)
                .setInvokeDynamic(false).build();

        final EventManager privateEventBusServerA = privateEventBusServiceQueueA.createProxyWithAutoFlush(EventManager.class,
                50, TimeUnit.MILLISECONDS);



        /* Create you own private event bus for Server B. */
        final EventManager privateEventBusServerBInternal = EventManagerBuilder.eventManagerBuilder()
                .setEventConnector(new EventConnector() {
                    @Override
                    public void forwardEvent(EventTransferObject<Object> event) {
                        System.out.println(event);
                        final Object body = event.body();
                        final Object[] bodyArray = ((Object[]) body);
                        final Employee employee = (Employee) bodyArray[0];
                        System.out.println(employee);
                        sendQueueA.sendAndFlush(employee);
                    }
                })
                .setName("serverBEventBus").build();

                /* Create a service queue for this event bus. */
        final ServiceQueue privateEventBusServiceQueueB = serviceBuilder()
                .setServiceObject(privateEventBusServerBInternal)
                .setInvokeDynamic(false).build();


        final EventManager privateEventBusServerB = privateEventBusServiceQueueB.createProxyWithAutoFlush(EventManager.class,
                50, TimeUnit.MILLISECONDS);






        final EventBusProxyCreator eventBusProxyCreator =
                QBit.factory().eventBusProxyCreator();

        final EmployeeEventManager employeeEventManagerA =
                eventBusProxyCreator.createProxy(privateEventBusServerA, EmployeeEventManager.class);

        final EmployeeEventManager employeeEventManagerB =
                eventBusProxyCreator.createProxy(privateEventBusServerB, EmployeeEventManager.class);

        /* ******************************************************************************/
        /* LISTEN TO JMS CLIENT B and FORWARD to Event bus. **********************/
        /* Listen to JMS client and push to B event bus ****************************/
        employeeJsonQueueServerB.startListener(new ReceiveQueueListener<Employee>(){
            @Override
            public void receive(final Employee employee) {
                System.out.println("HERE " + employee);
                employeeEventManagerB.sendNewEmployee(employee);
                System.out.println("LEFT " + employee);

            }
        });


        final SalaryChangedChannel salaryChangedChannel = eventBusProxyCreator.createProxy(privateEventBusServerA, SalaryChangedChannel.class);

        /*
        Create your EmployeeHiringService but this time pass the private event bus.
        Note you could easily use Spring or Guice for this wiring.
         */
        final EmployeeHiringService employeeHiring = new EmployeeHiringService(employeeEventManagerA,
                salaryChangedChannel); //Runs on Server A



        /* Now create your other service POJOs which have no compile time dependencies on QBit. */
        final PayrollService payroll = new PayrollService(); //Runs on Server A
        final BenefitsService benefits = new BenefitsService();//Runs on Server A

        final VolunteerService volunteering = new VolunteerService();//Runs on Server B


        /** Employee hiring service. A. */
        ServiceQueue employeeHiringServiceQueue = serviceBuilder()
                .setServiceObject(employeeHiring)
                .setInvokeDynamic(false).build();

        /** Payroll service A. */
        ServiceQueue payrollServiceQueue = serviceBuilder()
                .setServiceObject(payroll)
                .setInvokeDynamic(false).build();

        /** Employee Benefits service. A. */
        ServiceQueue employeeBenefitsServiceQueue = serviceBuilder()
                .setServiceObject(benefits)
                .setInvokeDynamic(false).build();

        /** Community outreach program. B. */
        ServiceQueue volunteeringServiceQueue = serviceBuilder()
                .setServiceObject(volunteering)
                .setInvokeDynamic(false).build();


        /* Now wire in the event bus so it can fire events into the service queues.
        * For ServerA. */
        privateEventBusServerA.joinService(payrollServiceQueue);
        privateEventBusServerA.joinService(employeeBenefitsServiceQueue);


        /* Now wire in event B bus. */
        privateEventBusServerB.joinService(volunteeringServiceQueue);


        /* Start Server A bus. */
        privateEventBusServiceQueueA.start();


        /* Start Server B bus. */
        privateEventBusServiceQueueB.start();


        employeeHiringServiceQueue.start();
        volunteeringServiceQueue.start();
        payrollServiceQueue.start();
        employeeBenefitsServiceQueue.start();


        /** Now create the service proxy like before. */
        EmployeeHiringServiceClient employeeHiringServiceClientProxy =
                employeeHiringServiceQueue.createProxy(EmployeeHiringServiceClient.class);

        /** Call the hireEmployee method which triggers the other events. */
        employeeHiringServiceClientProxy.hireEmployee(new Employee("Lucas", 1));

        flushServiceProxy(employeeHiringServiceClientProxy);

        Sys.sleep(5_000);

    }

    interface EmployeeHiringServiceClient {
        void hireEmployee(final Employee employee);

    }


    @EventChannel
    interface SalaryChangedChannel {


        void salaryChanged(Employee employee, int newSalary);

    }


    interface EmployeeEventManager {

        @EventChannel(NEW_HIRE_CHANNEL)
        void sendNewEmployee(Employee employee);


    }

    public static class Employee {
        final String firstName;
        final int employeeId;

        public Employee(String firstName, int employeeId) {
            this.firstName = firstName;
            this.employeeId = employeeId;
        }

        public String getFirstName() {
            return firstName;
        }

        public int getEmployeeId() {
            return employeeId;
        }

        @Override
        public String toString() {
            return "Employee{" +
                    "firstName='" + firstName + '\'' +
                    ", employeeId=" + employeeId +
                    '}';
        }
    }

    public static class EmployeeHiringService {

        final EmployeeEventManager eventManager;
        final SalaryChangedChannel salaryChangedChannel;

        public EmployeeHiringService(final EmployeeEventManager employeeEventManager,
                                     final SalaryChangedChannel salaryChangedChannel) {
            this.eventManager = employeeEventManager;
            this.salaryChangedChannel = salaryChangedChannel;
        }


        @QueueCallback(QueueCallbackType.EMPTY)
        private void noMoreRequests() {


            flushServiceProxy(salaryChangedChannel);
            flushServiceProxy(eventManager);
        }


        @QueueCallback(QueueCallbackType.LIMIT)
        private void hitLimitOfRequests() {

            flushServiceProxy(salaryChangedChannel);
            flushServiceProxy(eventManager);
        }


        public void hireEmployee(final Employee employee) {

            int salary = 100;
            System.out.printf("Hired employee %s\n", employee);

            //Does stuff to hire employee


            eventManager.sendNewEmployee(employee);
            salaryChangedChannel.salaryChanged(employee, salary);


        }

    }

    public static class BenefitsService {

        @OnEvent(NEW_HIRE_CHANNEL)
        public void enroll(final Employee employee) {

            System.out.printf("Employee enrolled into benefits system employee %s %d\n",
                    employee.getFirstName(), employee.getEmployeeId());

        }

    }

    public static class VolunteerService {

        @OnEvent(NEW_HIRE_CHANNEL)
        public void invite(final Employee employee) {

            System.out.printf("Employee will be invited to the community outreach program %s %d\n",
                    employee.getFirstName(), employee.getEmployeeId());

        }

    }

    public static class PayrollService implements SalaryChangedChannel {

        @Override
        public void salaryChanged(Employee employee, int newSalary) {
            System.out.printf("DIRECT FROM CHANNEL SalaryChangedChannel Employee added to payroll  %s %d %d\n",
                    employee.getFirstName(), employee.getEmployeeId(), newSalary);

        }
    }
}

Complete Example 3

package io.advantageous.qbit.jms;

import io.advantageous.boon.core.Lists;
import io.advantageous.boon.core.Sys;
import io.advantageous.qbit.QBit;
import io.advantageous.qbit.queue.*;
import io.advantageous.qbit.util.PortUtils;
import org.apache.activemq.broker.BrokerService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
 *
 * Created  10/8/15.
 */
public class JmsTest {

    private Queue<Person> personQueue;
    private SendQueue<Person> personSendQueue;
    private ReceiveQueue<Person> personReceiveQueue;
    private BrokerService broker;
    private int port;

    @Before
    public void setUp() throws Exception {

        port = PortUtils.findOpenPortStartAt(4000);
        broker= new BrokerService();
        broker.addConnector("tcp://localhost:" + port);
        broker.start();

        final JmsServiceBuilder jmsBuilder = JmsServiceBuilder.newJmsServiceBuilder()
                .setDefaultDestination("foobarQueue").setAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE).setPort(port);

        final Queue<String> textQueue = new JmsTextQueue(jmsBuilder);

        personQueue = new JsonQueue<>(Person.class, textQueue);
        personSendQueue = personQueue.sendQueue();
        personReceiveQueue = personQueue.receiveQueue();



        personSendQueue.shouldBatch();
        personSendQueue.name();
        personSendQueue.size();
        personQueue.name();
        personQueue.size();
    }

    @Test
    public void testSendConsume() throws Exception {


        personSendQueue.send(new Person("Geoff"));
        personSendQueue.send(new Person("Rick"));
        personSendQueue.flushSends();

        final Person geoff = personReceiveQueue.pollWait();
        final Person rick = personReceiveQueue.pollWait();

        assertTrue(geoff.name.equals("Rick") || geoff.name.equals("Geoff"));
        assertTrue(rick.name.equals("Rick") || rick.name.equals("Geoff"));


        assertEquals(true, personQueue.started());

    }


    @Test
    public void testSendConsume2() throws Exception {

        personSendQueue.sendAndFlush(new Person("Geoff"));
        personSendQueue.sendAndFlush(new Person("Rick"));

        final Person geoff = personReceiveQueue.pollWait();
        Sys.sleep(100);
        final Person rick = personReceiveQueue.poll();


        assertTrue(geoff.name.equals("Rick") || geoff.name.equals("Geoff"));
        assertTrue(rick.name.equals("Rick") || rick.name.equals("Geoff"));
    }


    @Test
    public void testSendConsume3() throws Exception {

        personSendQueue = personQueue.sendQueueWithAutoFlush(10, TimeUnit.MILLISECONDS);

        personSendQueue.sendMany(new Person("Geoff"), new Person("Rick"));
        final Person geoff = personReceiveQueue.take();
        final Person rick = personReceiveQueue.take();

        assertTrue(geoff.name.equals("Rick") || geoff.name.equals("Geoff"));
        assertTrue(rick.name.equals("Rick") || rick.name.equals("Geoff"));

    }


    @Test
    public void testSendConsume4() throws Exception {
        personSendQueue = personQueue.sendQueueWithAutoFlush(QBit.factory().periodicScheduler(),
                10, TimeUnit.MILLISECONDS);

        personSendQueue.sendBatch(Lists.list(new Person("Geoff"), new Person("Rick")));
        final Person geoff = personReceiveQueue.take();
        final Person rick = personReceiveQueue.take();

        assertTrue(geoff.name.equals("Rick") || geoff.name.equals("Geoff"));
        assertTrue(rick.name.equals("Rick") || rick.name.equals("Geoff"));

    }

    @Test
    public void testSendConsume5() throws Exception {
        final List<Person> list = Lists.list(new Person("Geoff"), new Person("Rick"));

        Iterable<Person> persons = list::iterator;


        personSendQueue.sendBatch(persons);
        personSendQueue.flushSends();

        final Person geoff = personReceiveQueue.pollWait();
        final Person rick = personReceiveQueue.pollWait();


        assertTrue(geoff.name.equals("Rick") || geoff.name.equals("Geoff"));
        assertTrue(rick.name.equals("Rick") || rick.name.equals("Geoff"));

    }

    @Test
    public void testSendConsume6() throws Exception {

        personSendQueue.send(new Person("Geoff"));
        personSendQueue.send(new Person("Rick"));

        personSendQueue.flushSends();

        Sys.sleep(2000);
        final List<Person> personsBatch = (List<Person>) personReceiveQueue.readBatch();

    }


    @Test
    public void testSendConsume7() throws Exception {
        final List<Person> list = Lists.list(new Person("Geoff"), new Person("Rick"));
        final Iterable<Person> persons = list::iterator;


        personSendQueue.sendBatch(persons);
        personSendQueue.flushSends();

        final List<Person> personsBatch = (List<Person>) personReceiveQueue.readBatch(5);




    }


    @Test
    public void testSendConsume8() throws Exception {

        final ArrayBlockingQueue<Person> personsABQ = new ArrayBlockingQueue<>(100);

        personSendQueue.send(new Person("Geoff"));
        personSendQueue.send(new Person("Rick"));
        personSendQueue.flushSends();


        personQueue.startListener(personsABQ::add);

        Sys.sleep(1000);
        int count = 0;

        while (personsABQ.size() < 2) {
            Sys.sleep(100);
            count++;
            if (count > 100) break;
        }


        Sys.sleep(1000);
        assertEquals(2, personsABQ.size());
        final Person geoff = personsABQ.poll();
        final Person rick = personsABQ.poll();


        assertTrue(geoff.name.equals("Rick") || geoff.name.equals("Geoff"));
        assertTrue(rick.name.equals("Rick") || rick.name.equals("Geoff"));

    }


    @Test
    public void builder() throws Exception {
        JmsServiceBuilder jmsServiceBuilder = JmsServiceBuilder.newJmsServiceBuilder();
        jmsServiceBuilder.setJndiSettings(Collections.emptyMap());
        jmsServiceBuilder.getJndiSettings();
        jmsServiceBuilder.setConnectionFactory(new ConnectionFactory() {
            @Override
            public Connection createConnection() throws JMSException {
                return null;
            }

            @Override
            public Connection createConnection(String userName, String password) throws JMSException {
                return null;
            }
        });
        jmsServiceBuilder.getConnectionFactory();

        jmsServiceBuilder.setConnectionFactoryName("Foo");
        jmsServiceBuilder.getConnectionFactoryName();
        jmsServiceBuilder.setContext(null);
        jmsServiceBuilder.getContext();
        jmsServiceBuilder.setDefaultDestination("foo");
        jmsServiceBuilder.getDefaultDestination();
        jmsServiceBuilder.setAcknowledgeMode(5);
        jmsServiceBuilder.getAcknowledgeMode();
        jmsServiceBuilder.setDefaultTimeout(1);
        jmsServiceBuilder.getDefaultTimeout();
        jmsServiceBuilder.setHost("foo");
        jmsServiceBuilder.getHost();
        jmsServiceBuilder.setUserName("rick");
        jmsServiceBuilder.getUserName();
        jmsServiceBuilder.setPassword("foo");
        jmsServiceBuilder.getPassword();
        jmsServiceBuilder.setProviderURL("");
        jmsServiceBuilder.getProviderURL();
        jmsServiceBuilder.setProviderURLPattern("");
        jmsServiceBuilder.getProviderURLPattern();
        jmsServiceBuilder.setConnectionSupplier(null);
        jmsServiceBuilder.getConnectionSupplier();
        jmsServiceBuilder.setStartConnection(true);
        jmsServiceBuilder.isStartConnection();
        jmsServiceBuilder.setTransacted(true);
        jmsServiceBuilder.isTransacted();
        jmsServiceBuilder.setJndiSettings(null);
        jmsServiceBuilder.addJndiSetting("foo","bar");



        jmsServiceBuilder = JmsServiceBuilder.newJmsServiceBuilder().setPort(port);

        jmsServiceBuilder.build().start();
    }

    @After
    public void tearDown() throws Exception {

        personQueue.stop();
        personReceiveQueue.stop();
        personSendQueue.stop();

        try {
            broker.stop();
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        broker = null;
        Sys.sleep(1000);
    }


    private static class Person {
        final String name;

        private Person(String name) {
            this.name = name;
        }
    }
}

Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training