QBit has JMS support. The JMS support will be similar for Kafka support and other persistent queues. You can use JMS queues, local queues and Kafka queues with the same interface.
The JMS support works with ActiveMQ with little effort and can be easily be made to work with any JMS implementation via the JmsServiceBuilder
.
Two classes do the bulk of the integration with JMS as follows:
JmsService
does the low level communication with JMS.
JmsServiceBuilder
manages JNDI, creation of sessions, connections, destinations, etc.
Then there are three classes that adapt JMS Queues
and Topics
to QBit Queues
.
JmsTextQueue
exposes a JMS queue or topic as QBit Queue
JmsTextReceiveQueue
exposes a JMS queue or topic as QBit ReceiveQueue
-
JmsTextSenderQueue
exposes a JMS queue or topic as QBit SendQueue
-
JsonQueue
wraps a QBit Queue<String>
and converts items into JSON and from JSON.
EventBusQueueAdapter
moves items from a queue onto the QBit event bus. It can be used to channel events from Kafka, JMS, and/or Redis into the QBit world.
QBit has JMS support. The JMS support will be similar for Kafka support and other persistent queues. You can use JMS queues, local queues and Kafka queues with the same interface.
The JMS support works with ActiveMQ with little effort and can be easily be made to work with any JMS implementation via the
JmsServiceBuilder
.
Two classes do the bulk of the integration with JMS as follows:
JmsService
does the low level communication with JMS.JmsServiceBuilder
manages JNDI, creation of sessions, connections, destinations, etc.
Then there are three classes that adapt JMS
Queues
and Topics
to QBit Queues
.JmsTextQueue
exposes a JMS queue or topic as QBitQueue
JmsTextReceiveQueue
exposes a JMS queue or topic as QBitReceiveQueue
JmsTextSenderQueue
exposes a JMS queue or topic as QBitSendQueue
JsonQueue
wraps a QBitQueue<String>
and converts items into JSON and from JSON.EventBusQueueAdapter
moves items from a queue onto the QBit event bus. It can be used to channel events from Kafka, JMS, and/or Redis into the QBit world.
Receive a messages from a queue
Receive a messages from a queue
/** Create a new JMS Builder which can emit JmsService objects. */
final JmsServiceBuilder jmsBuilder = JmsServiceBuilder
.newJmsServiceBuilder()
.setHost("somehost").setPort(6355)
.setDefaultDestination("foobarQueue");
/** Create a QBit Queue that talks to JMS. */
final Queue<String> textQueue = new JmsTextQueue(jmsBuilder);
/** Create a QBit ReceiveQueue that talks to JMS. */
final ReceiveQueue<String> receiveQueue = textQueue.receiveQueue();
/** Get a message from JMS. */
String message = receiveQueue.pollWait();
out.println(message);
/** Keep getting messages. */
while (message!=null) {
message = receiveQueue.poll();
out.println(message);
}
out.println("DONE");
/** Create a new JMS Builder which can emit JmsService objects. */
final JmsServiceBuilder jmsBuilder = JmsServiceBuilder
.newJmsServiceBuilder()
.setHost("somehost").setPort(6355)
.setDefaultDestination("foobarQueue");
/** Create a QBit Queue that talks to JMS. */
final Queue<String> textQueue = new JmsTextQueue(jmsBuilder);
/** Create a QBit ReceiveQueue that talks to JMS. */
final ReceiveQueue<String> receiveQueue = textQueue.receiveQueue();
/** Get a message from JMS. */
String message = receiveQueue.pollWait();
out.println(message);
/** Keep getting messages. */
while (message!=null) {
message = receiveQueue.poll();
out.println(message);
}
out.println("DONE");
Send a messages to a queue
Send a messages to a queue
final JmsServiceBuilder jmsBuilder = JmsServiceBuilder.newJmsServiceBuilder()
.setDefaultDestination("foobarQueue");
final Queue<String> textQueue = new JmsTextQueue(jmsBuilder);
final SendQueue<String> sendQueue = textQueue.sendQueue();
sendQueue.send("foo");
for (int i=0; i < 10; i++) {
sendQueue.send("foo" + i);
}
final JmsServiceBuilder jmsBuilder = JmsServiceBuilder.newJmsServiceBuilder()
.setDefaultDestination("foobarQueue");
final Queue<String> textQueue = new JmsTextQueue(jmsBuilder);
final SendQueue<String> sendQueue = textQueue.sendQueue();
sendQueue.send("foo");
for (int i=0; i < 10; i++) {
sendQueue.send("foo" + i);
}
Starting up a QBit listener to listen to a queue
Starting up a listener to listent to a queue
final ArrayBlockingQueue<Person> personsABQ = new ArrayBlockingQueue<>(100);
personSendQueue.send(new Person("Geoff"));
personSendQueue.send(new Person("Rick"));
personSendQueue.flushSends();
personQueue.startListener(personsABQ::add); //Listen to JMS and put everything in ArrayBlockingQueue
//This is just an example not a suggestion on API usage.
final ArrayBlockingQueue<Person> personsABQ = new ArrayBlockingQueue<>(100);
personSendQueue.send(new Person("Geoff"));
personSendQueue.send(new Person("Rick"));
personSendQueue.flushSends();
personQueue.startListener(personsABQ::add); //Listen to JMS and put everything in ArrayBlockingQueue
//This is just an example not a suggestion on API usage.
Strongly typed queue with JsonQueue.
If you want to not work with String and instead work with strongly typed object, you can combine the JMS support with the JsonQueue
.
If you want to not work with String and instead work with strongly typed object, you can combine the JMS support with the
JsonQueue
.Complete example
import java.util.concurrent.atomic.AtomicReference
import io.advantageous.qbit.admin.{MicroserviceConfig, ManagedServiceBuilder}
import io.advantageous.qbit.system.QBitSystemManager
import scala.util.Properties._
import org.slf4j.LoggerFactory._
object Main extends App{
var qBitSystemManagerAtomicReference: AtomicReference[QBitSystemManager] = null
System.getProperty(MicroserviceConfig.CONTEXT + "file", "/app/conf/service.json")
start(true)
def start(wait:Boolean) {
qBitSystemManagerAtomicReference = new AtomicReference[QBitSystemManager]()
/**
* Logger to log messages.
*/
val managedServiceBuilder: ManagedServiceBuilder = ManagedServiceBuilder.managedServiceBuilder("queue-test-client")
val logger = getLogger("queue.Main")
val brokerURL = envOrElse("BROKER_URL", "http://localhost:8080")
logger.info("STARTING...")
val testClient = new TestQueueClientService(
managedServiceBuilder.getStatServiceBuilder.buildStatsCollector,
brokerURL)
managedServiceBuilder.addEndpointService(testClient)
logger.info("PORT_WEB {}", envOrElse("PORT_WEB", "9090").toInt)
logger.info("BROKER_URL {}", brokerURL)
managedServiceBuilder.setPort(envOrElse("PORT_WEB", "9090").toInt)
managedServiceBuilder.getEndpointServerBuilder
.setUri("/api/v1")
.build
.startServer
configureAdmin(managedServiceBuilder)
logger.info("Queue Broker is open for eBusiness")
qBitSystemManagerAtomicReference.set(managedServiceBuilder.getSystemManager)
if (wait) {
managedServiceBuilder.getSystemManager.waitForShutdown()
}
}
/**
* Configure the admin (we will be able to get rid of this for the last release.
* @param managedServiceBuilder managedServiceBuilder
*/
private def configureAdmin(managedServiceBuilder: ManagedServiceBuilder) {
val portAdmin = envOrElse("PORT_ADMIN", "6666").toInt
managedServiceBuilder.getAdminBuilder.setPort(portAdmin)
managedServiceBuilder.getAdminBuilder.setMicroServiceName("calypso.queue.test")
managedServiceBuilder.getAdminBuilder.build.startServer
}
def shutdown() {
qBitSystemManagerAtomicReference.get().shutDown()
}
}
....
import java.lang
import java.util
import java.util.Collections
import io.advantageous.qbit.annotation.RequestMethod._
import io.advantageous.boon.core.Str
import io.advantageous.qbit.annotation.{QueueCallbackType, QueueCallback, RequestMapping}
import io.advantageous.qbit.http.HTTP
import io.advantageous.qbit.jms.{JmsTextQueue, JmsServiceBuilder}
import io.advantageous.qbit.queue.{JsonQueue, SendQueue, Queue}
import io.advantageous.qbit.reactive.{ReactorBuilder, Reactor}
import io.advantageous.qbit.service.stats.StatsCollector
import io.advantageous.qbit.util.Timer
import org.slf4j.{LoggerFactory, Logger}
@RequestMapping(Array("/queue-test-client"))
class TestQueueClientService(private val statsCollector: StatsCollector,
private val brokerURL : String,
private val reactor: Reactor = ReactorBuilder.reactorBuilder().build(),
private val timer: Timer = Timer.timer()) {
private var time: Long = 0L
private var listenerURL = ""
private val logger: Logger = LoggerFactory.getLogger(classOf[TestQueueClientService])
private var jmsQueue : Option[Queue[Record]] = None
private var jmsSendQueue : Option[SendQueue[Record]] = None
init()
def init(): Unit = {
listenerURL = HTTP.get(brokerURL + "/api/v1/broker/ip/address-port")
val listenerURLParts = Str.split(listenerURL.replace("\"", ""), ':')
val jmsServiceBuilder = JmsServiceBuilder.newJmsServiceBuilder()
jmsServiceBuilder.setHost("somehost")
if (listenerURLParts.length == 2) {
jmsServiceBuilder.setPort(listenerURLParts(1).toInt)
}
jmsQueue = Option(new JsonQueue[Record](classOf[Record], new JmsTextQueue(jmsServiceBuilder)))
jmsSendQueue = Option(jmsQueue.get.sendQueue())
logger.info(s"LISTENER URL $listenerURL")
}
/** Read description in annotation. */
@RequestMapping(value = Array("/send"), summary = "send", description = "send some records",
returnDescription = " just send some records", method = Array(PUT))
def sendRecords: lang.Boolean = {
if (jmsSendQueue.isDefined) {
for (i <- 1 to 100) {
jmsSendQueue.get.send(new Record("hi mom" + i))
}
} else {
init()
}
true
}
/** Read description in annotation. */
@RequestMapping(value = Array("/receive"), summary = "receive records", description = "receive some records",
returnDescription = " just receive some records", method = Array(PUT))
def receiveRecords: util.List[Record] = {
if (jmsQueue.isDefined) {
val receiveQueue = jmsQueue.get.receiveQueue()
var record: Record = receiveQueue.poll()
val list = new util.ArrayList[Record]()
if (record !=null)
do {
list.add(record)
record = receiveQueue.poll()
} while (record != null)
list
} else {
init()
Collections.emptyList()
}
}
/** Time service. */
@RequestMapping(value = Array("/time"), summary = "time", description = "Used to time service to see if it is up",
returnDescription = "just returns current time")
def getTime = time
/** Build number increment this on every unique build to make sure the latest is in Orchard. */
@RequestMapping(value = Array("/build"), summary = "build number",
description = "Used to make sure we have the right version deployed", returnDescription = "returns the build number")
def buildNumber = {
"1-(10-12-2015)"
}
/** Process Reactor stuff. */
@QueueCallback(Array(QueueCallbackType.LIMIT, QueueCallbackType.EMPTY))
def process () {
reactor.process()
time = timer.time
}
/** Ping service. */
@RequestMapping(value = Array("/ping"), summary = "ping", description = "Used to ping service to see if it is up",
returnDescription = "just returns true")
def ping : lang.Boolean = {
true
}
}
import java.util.concurrent.atomic.AtomicReference
import io.advantageous.qbit.admin.{MicroserviceConfig, ManagedServiceBuilder}
import io.advantageous.qbit.system.QBitSystemManager
import scala.util.Properties._
import org.slf4j.LoggerFactory._
object Main extends App{
var qBitSystemManagerAtomicReference: AtomicReference[QBitSystemManager] = null
System.getProperty(MicroserviceConfig.CONTEXT + "file", "/app/conf/service.json")
start(true)
def start(wait:Boolean) {
qBitSystemManagerAtomicReference = new AtomicReference[QBitSystemManager]()
/**
* Logger to log messages.
*/
val managedServiceBuilder: ManagedServiceBuilder = ManagedServiceBuilder.managedServiceBuilder("queue-test-client")
val logger = getLogger("queue.Main")
val brokerURL = envOrElse("BROKER_URL", "http://localhost:8080")
logger.info("STARTING...")
val testClient = new TestQueueClientService(
managedServiceBuilder.getStatServiceBuilder.buildStatsCollector,
brokerURL)
managedServiceBuilder.addEndpointService(testClient)
logger.info("PORT_WEB {}", envOrElse("PORT_WEB", "9090").toInt)
logger.info("BROKER_URL {}", brokerURL)
managedServiceBuilder.setPort(envOrElse("PORT_WEB", "9090").toInt)
managedServiceBuilder.getEndpointServerBuilder
.setUri("/api/v1")
.build
.startServer
configureAdmin(managedServiceBuilder)
logger.info("Queue Broker is open for eBusiness")
qBitSystemManagerAtomicReference.set(managedServiceBuilder.getSystemManager)
if (wait) {
managedServiceBuilder.getSystemManager.waitForShutdown()
}
}
/**
* Configure the admin (we will be able to get rid of this for the last release.
* @param managedServiceBuilder managedServiceBuilder
*/
private def configureAdmin(managedServiceBuilder: ManagedServiceBuilder) {
val portAdmin = envOrElse("PORT_ADMIN", "6666").toInt
managedServiceBuilder.getAdminBuilder.setPort(portAdmin)
managedServiceBuilder.getAdminBuilder.setMicroServiceName("calypso.queue.test")
managedServiceBuilder.getAdminBuilder.build.startServer
}
def shutdown() {
qBitSystemManagerAtomicReference.get().shutDown()
}
}
....
import java.lang
import java.util
import java.util.Collections
import io.advantageous.qbit.annotation.RequestMethod._
import io.advantageous.boon.core.Str
import io.advantageous.qbit.annotation.{QueueCallbackType, QueueCallback, RequestMapping}
import io.advantageous.qbit.http.HTTP
import io.advantageous.qbit.jms.{JmsTextQueue, JmsServiceBuilder}
import io.advantageous.qbit.queue.{JsonQueue, SendQueue, Queue}
import io.advantageous.qbit.reactive.{ReactorBuilder, Reactor}
import io.advantageous.qbit.service.stats.StatsCollector
import io.advantageous.qbit.util.Timer
import org.slf4j.{LoggerFactory, Logger}
@RequestMapping(Array("/queue-test-client"))
class TestQueueClientService(private val statsCollector: StatsCollector,
private val brokerURL : String,
private val reactor: Reactor = ReactorBuilder.reactorBuilder().build(),
private val timer: Timer = Timer.timer()) {
private var time: Long = 0L
private var listenerURL = ""
private val logger: Logger = LoggerFactory.getLogger(classOf[TestQueueClientService])
private var jmsQueue : Option[Queue[Record]] = None
private var jmsSendQueue : Option[SendQueue[Record]] = None
init()
def init(): Unit = {
listenerURL = HTTP.get(brokerURL + "/api/v1/broker/ip/address-port")
val listenerURLParts = Str.split(listenerURL.replace("\"", ""), ':')
val jmsServiceBuilder = JmsServiceBuilder.newJmsServiceBuilder()
jmsServiceBuilder.setHost("somehost")
if (listenerURLParts.length == 2) {
jmsServiceBuilder.setPort(listenerURLParts(1).toInt)
}
jmsQueue = Option(new JsonQueue[Record](classOf[Record], new JmsTextQueue(jmsServiceBuilder)))
jmsSendQueue = Option(jmsQueue.get.sendQueue())
logger.info(s"LISTENER URL $listenerURL")
}
/** Read description in annotation. */
@RequestMapping(value = Array("/send"), summary = "send", description = "send some records",
returnDescription = " just send some records", method = Array(PUT))
def sendRecords: lang.Boolean = {
if (jmsSendQueue.isDefined) {
for (i <- 1 to 100) {
jmsSendQueue.get.send(new Record("hi mom" + i))
}
} else {
init()
}
true
}
/** Read description in annotation. */
@RequestMapping(value = Array("/receive"), summary = "receive records", description = "receive some records",
returnDescription = " just receive some records", method = Array(PUT))
def receiveRecords: util.List[Record] = {
if (jmsQueue.isDefined) {
val receiveQueue = jmsQueue.get.receiveQueue()
var record: Record = receiveQueue.poll()
val list = new util.ArrayList[Record]()
if (record !=null)
do {
list.add(record)
record = receiveQueue.poll()
} while (record != null)
list
} else {
init()
Collections.emptyList()
}
}
/** Time service. */
@RequestMapping(value = Array("/time"), summary = "time", description = "Used to time service to see if it is up",
returnDescription = "just returns current time")
def getTime = time
/** Build number increment this on every unique build to make sure the latest is in Orchard. */
@RequestMapping(value = Array("/build"), summary = "build number",
description = "Used to make sure we have the right version deployed", returnDescription = "returns the build number")
def buildNumber = {
"1-(10-12-2015)"
}
/** Process Reactor stuff. */
@QueueCallback(Array(QueueCallbackType.LIMIT, QueueCallbackType.EMPTY))
def process () {
reactor.process()
time = timer.time
}
/** Ping service. */
@RequestMapping(value = Array("/ping"), summary = "ping", description = "Used to ping service to see if it is up",
returnDescription = "just returns true")
def ping : lang.Boolean = {
true
}
}
Complete example 2
/*
* Copyright (c) 2015. Rick Hightower, Geoff Chandler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* QBit - The Microservice lib for Java : JSON, WebSocket, REST. Be The Web!
*/
package io.advantageous.qbit.jms.example.events;
import io.advantageous.boon.core.Sys;
import io.advantageous.qbit.QBit;
import io.advantageous.qbit.annotation.EventChannel;
import io.advantageous.qbit.annotation.OnEvent;
import io.advantageous.qbit.annotation.QueueCallback;
import io.advantageous.qbit.annotation.QueueCallbackType;
import io.advantageous.qbit.concurrent.PeriodicScheduler;
import io.advantageous.qbit.events.EventBusProxyCreator;
import io.advantageous.qbit.events.EventManager;
import io.advantageous.qbit.events.EventManagerBuilder;
import io.advantageous.qbit.events.spi.EventConnector;
import io.advantageous.qbit.events.spi.EventTransferObject;
import io.advantageous.qbit.jms.JmsServiceBuilder;
import io.advantageous.qbit.jms.JmsTextQueue;
import io.advantageous.qbit.queue.*;
import io.advantageous.qbit.service.ServiceQueue;
import io.advantageous.qbit.util.PortUtils;
import org.apache.activemq.broker.BrokerService;
import javax.jms.Session;
import java.util.concurrent.TimeUnit;
import static io.advantageous.qbit.service.ServiceBuilder.serviceBuilder;
import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;
/**
* EmployeeEventExampleUsingChannelsToSendEvents
* created by rhightower on 2/11/15.
*/
@SuppressWarnings("ALL")
public class EmployeeEventExampleUsingChannelsToSendEventsWithJMS {
public static final String NEW_HIRE_CHANNEL = "com.mycompnay.employee.new";
public static void main(String... args) throws Exception {
final BrokerService broker; //JMS Broker to make this a self contained example.
final int port; //port to bind to JMS Broker to
/* ******************************************************************************/
/* START JMS BROKER. ************************************************************/
/* Start up JMS Broker. */
port = PortUtils.findOpenPortStartAt(4000);
broker= new BrokerService();
broker.addConnector("tcp://localhost:"+port);
broker.start();
Sys.sleep(5_000);
/* ******************************************************************************/
/* START JMS CLIENTS FOR SERVER A AND B *******************************************/
/* Create a JMS Builder to create JMS Queues. */
final JmsServiceBuilder jmsBuilder = JmsServiceBuilder.newJmsServiceBuilder().setPort(port)
.setDefaultDestination(NEW_HIRE_CHANNEL).setAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
/* JMS client for server A. */
final JsonQueue<Employee> employeeJsonQueueServerA =
new JsonQueue<>(Employee.class, new JmsTextQueue(jmsBuilder));
/* JMS client for server B. */
final JsonQueue<Employee> employeeJsonQueueServerB =
new JsonQueue<>(Employee.class, new JmsTextQueue(jmsBuilder));
/* Send Queue to send messages to JMS broker. */
final SendQueue<Employee> sendQueueA = employeeJsonQueueServerA.sendQueue();
Sys.sleep(1_000);
/* ReceiveQueueB Queue B to receive messages from JMS broker. */
final ReceiveQueue<Employee> receiveQueueB = employeeJsonQueueServerB.receiveQueue();
Sys.sleep(1_000);
/* ******************************************************************************/
/* START EVENT BUS A ************************************************************/
/* Create you own private event bus for Server A. */
final EventManager privateEventBusServerAInternal = EventManagerBuilder.eventManagerBuilder()
.setEventConnector(new EventConnector() {
@Override
public void forwardEvent(EventTransferObject<Object> event) {
if (event.channel().equals(NEW_HIRE_CHANNEL)) {
System.out.println(event);
final Object body = event.body();
final Object[] bodyArray = ((Object[]) body);
final Employee employee = (Employee) bodyArray[0];
System.out.println(employee);
sendQueueA.sendAndFlush(employee);
}
}
})
.setName("serverAEventBus").build();
/* Create a service queue for this event bus. */
final ServiceQueue privateEventBusServiceQueueA = serviceBuilder()
.setServiceObject(privateEventBusServerAInternal)
.setInvokeDynamic(false).build();
final EventManager privateEventBusServerA = privateEventBusServiceQueueA.createProxyWithAutoFlush(EventManager.class,
50, TimeUnit.MILLISECONDS);
/* Create you own private event bus for Server B. */
final EventManager privateEventBusServerBInternal = EventManagerBuilder.eventManagerBuilder()
.setEventConnector(new EventConnector() {
@Override
public void forwardEvent(EventTransferObject<Object> event) {
System.out.println(event);
final Object body = event.body();
final Object[] bodyArray = ((Object[]) body);
final Employee employee = (Employee) bodyArray[0];
System.out.println(employee);
sendQueueA.sendAndFlush(employee);
}
})
.setName("serverBEventBus").build();
/* Create a service queue for this event bus. */
final ServiceQueue privateEventBusServiceQueueB = serviceBuilder()
.setServiceObject(privateEventBusServerBInternal)
.setInvokeDynamic(false).build();
final EventManager privateEventBusServerB = privateEventBusServiceQueueB.createProxyWithAutoFlush(EventManager.class,
50, TimeUnit.MILLISECONDS);
final EventBusProxyCreator eventBusProxyCreator =
QBit.factory().eventBusProxyCreator();
final EmployeeEventManager employeeEventManagerA =
eventBusProxyCreator.createProxy(privateEventBusServerA, EmployeeEventManager.class);
final EmployeeEventManager employeeEventManagerB =
eventBusProxyCreator.createProxy(privateEventBusServerB, EmployeeEventManager.class);
/* ******************************************************************************/
/* LISTEN TO JMS CLIENT B and FORWARD to Event bus. **********************/
/* Listen to JMS client and push to B event bus ****************************/
employeeJsonQueueServerB.startListener(new ReceiveQueueListener<Employee>(){
@Override
public void receive(final Employee employee) {
System.out.println("HERE " + employee);
employeeEventManagerB.sendNewEmployee(employee);
System.out.println("LEFT " + employee);
}
});
final SalaryChangedChannel salaryChangedChannel = eventBusProxyCreator.createProxy(privateEventBusServerA, SalaryChangedChannel.class);
/*
Create your EmployeeHiringService but this time pass the private event bus.
Note you could easily use Spring or Guice for this wiring.
*/
final EmployeeHiringService employeeHiring = new EmployeeHiringService(employeeEventManagerA,
salaryChangedChannel); //Runs on Server A
/* Now create your other service POJOs which have no compile time dependencies on QBit. */
final PayrollService payroll = new PayrollService(); //Runs on Server A
final BenefitsService benefits = new BenefitsService();//Runs on Server A
final VolunteerService volunteering = new VolunteerService();//Runs on Server B
/** Employee hiring service. A. */
ServiceQueue employeeHiringServiceQueue = serviceBuilder()
.setServiceObject(employeeHiring)
.setInvokeDynamic(false).build();
/** Payroll service A. */
ServiceQueue payrollServiceQueue = serviceBuilder()
.setServiceObject(payroll)
.setInvokeDynamic(false).build();
/** Employee Benefits service. A. */
ServiceQueue employeeBenefitsServiceQueue = serviceBuilder()
.setServiceObject(benefits)
.setInvokeDynamic(false).build();
/** Community outreach program. B. */
ServiceQueue volunteeringServiceQueue = serviceBuilder()
.setServiceObject(volunteering)
.setInvokeDynamic(false).build();
/* Now wire in the event bus so it can fire events into the service queues.
* For ServerA. */
privateEventBusServerA.joinService(payrollServiceQueue);
privateEventBusServerA.joinService(employeeBenefitsServiceQueue);
/* Now wire in event B bus. */
privateEventBusServerB.joinService(volunteeringServiceQueue);
/* Start Server A bus. */
privateEventBusServiceQueueA.start();
/* Start Server B bus. */
privateEventBusServiceQueueB.start();
employeeHiringServiceQueue.start();
volunteeringServiceQueue.start();
payrollServiceQueue.start();
employeeBenefitsServiceQueue.start();
/** Now create the service proxy like before. */
EmployeeHiringServiceClient employeeHiringServiceClientProxy =
employeeHiringServiceQueue.createProxy(EmployeeHiringServiceClient.class);
/** Call the hireEmployee method which triggers the other events. */
employeeHiringServiceClientProxy.hireEmployee(new Employee("Lucas", 1));
flushServiceProxy(employeeHiringServiceClientProxy);
Sys.sleep(5_000);
}
interface EmployeeHiringServiceClient {
void hireEmployee(final Employee employee);
}
@EventChannel
interface SalaryChangedChannel {
void salaryChanged(Employee employee, int newSalary);
}
interface EmployeeEventManager {
@EventChannel(NEW_HIRE_CHANNEL)
void sendNewEmployee(Employee employee);
}
public static class Employee {
final String firstName;
final int employeeId;
public Employee(String firstName, int employeeId) {
this.firstName = firstName;
this.employeeId = employeeId;
}
public String getFirstName() {
return firstName;
}
public int getEmployeeId() {
return employeeId;
}
@Override
public String toString() {
return "Employee{" +
"firstName='" + firstName + '\'' +
", employeeId=" + employeeId +
'}';
}
}
public static class EmployeeHiringService {
final EmployeeEventManager eventManager;
final SalaryChangedChannel salaryChangedChannel;
public EmployeeHiringService(final EmployeeEventManager employeeEventManager,
final SalaryChangedChannel salaryChangedChannel) {
this.eventManager = employeeEventManager;
this.salaryChangedChannel = salaryChangedChannel;
}
@QueueCallback(QueueCallbackType.EMPTY)
private void noMoreRequests() {
flushServiceProxy(salaryChangedChannel);
flushServiceProxy(eventManager);
}
@QueueCallback(QueueCallbackType.LIMIT)
private void hitLimitOfRequests() {
flushServiceProxy(salaryChangedChannel);
flushServiceProxy(eventManager);
}
public void hireEmployee(final Employee employee) {
int salary = 100;
System.out.printf("Hired employee %s\n", employee);
//Does stuff to hire employee
eventManager.sendNewEmployee(employee);
salaryChangedChannel.salaryChanged(employee, salary);
}
}
public static class BenefitsService {
@OnEvent(NEW_HIRE_CHANNEL)
public void enroll(final Employee employee) {
System.out.printf("Employee enrolled into benefits system employee %s %d\n",
employee.getFirstName(), employee.getEmployeeId());
}
}
public static class VolunteerService {
@OnEvent(NEW_HIRE_CHANNEL)
public void invite(final Employee employee) {
System.out.printf("Employee will be invited to the community outreach program %s %d\n",
employee.getFirstName(), employee.getEmployeeId());
}
}
public static class PayrollService implements SalaryChangedChannel {
@Override
public void salaryChanged(Employee employee, int newSalary) {
System.out.printf("DIRECT FROM CHANNEL SalaryChangedChannel Employee added to payroll %s %d %d\n",
employee.getFirstName(), employee.getEmployeeId(), newSalary);
}
}
}
/*
* Copyright (c) 2015. Rick Hightower, Geoff Chandler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* QBit - The Microservice lib for Java : JSON, WebSocket, REST. Be The Web!
*/
package io.advantageous.qbit.jms.example.events;
import io.advantageous.boon.core.Sys;
import io.advantageous.qbit.QBit;
import io.advantageous.qbit.annotation.EventChannel;
import io.advantageous.qbit.annotation.OnEvent;
import io.advantageous.qbit.annotation.QueueCallback;
import io.advantageous.qbit.annotation.QueueCallbackType;
import io.advantageous.qbit.concurrent.PeriodicScheduler;
import io.advantageous.qbit.events.EventBusProxyCreator;
import io.advantageous.qbit.events.EventManager;
import io.advantageous.qbit.events.EventManagerBuilder;
import io.advantageous.qbit.events.spi.EventConnector;
import io.advantageous.qbit.events.spi.EventTransferObject;
import io.advantageous.qbit.jms.JmsServiceBuilder;
import io.advantageous.qbit.jms.JmsTextQueue;
import io.advantageous.qbit.queue.*;
import io.advantageous.qbit.service.ServiceQueue;
import io.advantageous.qbit.util.PortUtils;
import org.apache.activemq.broker.BrokerService;
import javax.jms.Session;
import java.util.concurrent.TimeUnit;
import static io.advantageous.qbit.service.ServiceBuilder.serviceBuilder;
import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;
/**
* EmployeeEventExampleUsingChannelsToSendEvents
* created by rhightower on 2/11/15.
*/
@SuppressWarnings("ALL")
public class EmployeeEventExampleUsingChannelsToSendEventsWithJMS {
public static final String NEW_HIRE_CHANNEL = "com.mycompnay.employee.new";
public static void main(String... args) throws Exception {
final BrokerService broker; //JMS Broker to make this a self contained example.
final int port; //port to bind to JMS Broker to
/* ******************************************************************************/
/* START JMS BROKER. ************************************************************/
/* Start up JMS Broker. */
port = PortUtils.findOpenPortStartAt(4000);
broker= new BrokerService();
broker.addConnector("tcp://localhost:"+port);
broker.start();
Sys.sleep(5_000);
/* ******************************************************************************/
/* START JMS CLIENTS FOR SERVER A AND B *******************************************/
/* Create a JMS Builder to create JMS Queues. */
final JmsServiceBuilder jmsBuilder = JmsServiceBuilder.newJmsServiceBuilder().setPort(port)
.setDefaultDestination(NEW_HIRE_CHANNEL).setAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
/* JMS client for server A. */
final JsonQueue<Employee> employeeJsonQueueServerA =
new JsonQueue<>(Employee.class, new JmsTextQueue(jmsBuilder));
/* JMS client for server B. */
final JsonQueue<Employee> employeeJsonQueueServerB =
new JsonQueue<>(Employee.class, new JmsTextQueue(jmsBuilder));
/* Send Queue to send messages to JMS broker. */
final SendQueue<Employee> sendQueueA = employeeJsonQueueServerA.sendQueue();
Sys.sleep(1_000);
/* ReceiveQueueB Queue B to receive messages from JMS broker. */
final ReceiveQueue<Employee> receiveQueueB = employeeJsonQueueServerB.receiveQueue();
Sys.sleep(1_000);
/* ******************************************************************************/
/* START EVENT BUS A ************************************************************/
/* Create you own private event bus for Server A. */
final EventManager privateEventBusServerAInternal = EventManagerBuilder.eventManagerBuilder()
.setEventConnector(new EventConnector() {
@Override
public void forwardEvent(EventTransferObject<Object> event) {
if (event.channel().equals(NEW_HIRE_CHANNEL)) {
System.out.println(event);
final Object body = event.body();
final Object[] bodyArray = ((Object[]) body);
final Employee employee = (Employee) bodyArray[0];
System.out.println(employee);
sendQueueA.sendAndFlush(employee);
}
}
})
.setName("serverAEventBus").build();
/* Create a service queue for this event bus. */
final ServiceQueue privateEventBusServiceQueueA = serviceBuilder()
.setServiceObject(privateEventBusServerAInternal)
.setInvokeDynamic(false).build();
final EventManager privateEventBusServerA = privateEventBusServiceQueueA.createProxyWithAutoFlush(EventManager.class,
50, TimeUnit.MILLISECONDS);
/* Create you own private event bus for Server B. */
final EventManager privateEventBusServerBInternal = EventManagerBuilder.eventManagerBuilder()
.setEventConnector(new EventConnector() {
@Override
public void forwardEvent(EventTransferObject<Object> event) {
System.out.println(event);
final Object body = event.body();
final Object[] bodyArray = ((Object[]) body);
final Employee employee = (Employee) bodyArray[0];
System.out.println(employee);
sendQueueA.sendAndFlush(employee);
}
})
.setName("serverBEventBus").build();
/* Create a service queue for this event bus. */
final ServiceQueue privateEventBusServiceQueueB = serviceBuilder()
.setServiceObject(privateEventBusServerBInternal)
.setInvokeDynamic(false).build();
final EventManager privateEventBusServerB = privateEventBusServiceQueueB.createProxyWithAutoFlush(EventManager.class,
50, TimeUnit.MILLISECONDS);
final EventBusProxyCreator eventBusProxyCreator =
QBit.factory().eventBusProxyCreator();
final EmployeeEventManager employeeEventManagerA =
eventBusProxyCreator.createProxy(privateEventBusServerA, EmployeeEventManager.class);
final EmployeeEventManager employeeEventManagerB =
eventBusProxyCreator.createProxy(privateEventBusServerB, EmployeeEventManager.class);
/* ******************************************************************************/
/* LISTEN TO JMS CLIENT B and FORWARD to Event bus. **********************/
/* Listen to JMS client and push to B event bus ****************************/
employeeJsonQueueServerB.startListener(new ReceiveQueueListener<Employee>(){
@Override
public void receive(final Employee employee) {
System.out.println("HERE " + employee);
employeeEventManagerB.sendNewEmployee(employee);
System.out.println("LEFT " + employee);
}
});
final SalaryChangedChannel salaryChangedChannel = eventBusProxyCreator.createProxy(privateEventBusServerA, SalaryChangedChannel.class);
/*
Create your EmployeeHiringService but this time pass the private event bus.
Note you could easily use Spring or Guice for this wiring.
*/
final EmployeeHiringService employeeHiring = new EmployeeHiringService(employeeEventManagerA,
salaryChangedChannel); //Runs on Server A
/* Now create your other service POJOs which have no compile time dependencies on QBit. */
final PayrollService payroll = new PayrollService(); //Runs on Server A
final BenefitsService benefits = new BenefitsService();//Runs on Server A
final VolunteerService volunteering = new VolunteerService();//Runs on Server B
/** Employee hiring service. A. */
ServiceQueue employeeHiringServiceQueue = serviceBuilder()
.setServiceObject(employeeHiring)
.setInvokeDynamic(false).build();
/** Payroll service A. */
ServiceQueue payrollServiceQueue = serviceBuilder()
.setServiceObject(payroll)
.setInvokeDynamic(false).build();
/** Employee Benefits service. A. */
ServiceQueue employeeBenefitsServiceQueue = serviceBuilder()
.setServiceObject(benefits)
.setInvokeDynamic(false).build();
/** Community outreach program. B. */
ServiceQueue volunteeringServiceQueue = serviceBuilder()
.setServiceObject(volunteering)
.setInvokeDynamic(false).build();
/* Now wire in the event bus so it can fire events into the service queues.
* For ServerA. */
privateEventBusServerA.joinService(payrollServiceQueue);
privateEventBusServerA.joinService(employeeBenefitsServiceQueue);
/* Now wire in event B bus. */
privateEventBusServerB.joinService(volunteeringServiceQueue);
/* Start Server A bus. */
privateEventBusServiceQueueA.start();
/* Start Server B bus. */
privateEventBusServiceQueueB.start();
employeeHiringServiceQueue.start();
volunteeringServiceQueue.start();
payrollServiceQueue.start();
employeeBenefitsServiceQueue.start();
/** Now create the service proxy like before. */
EmployeeHiringServiceClient employeeHiringServiceClientProxy =
employeeHiringServiceQueue.createProxy(EmployeeHiringServiceClient.class);
/** Call the hireEmployee method which triggers the other events. */
employeeHiringServiceClientProxy.hireEmployee(new Employee("Lucas", 1));
flushServiceProxy(employeeHiringServiceClientProxy);
Sys.sleep(5_000);
}
interface EmployeeHiringServiceClient {
void hireEmployee(final Employee employee);
}
@EventChannel
interface SalaryChangedChannel {
void salaryChanged(Employee employee, int newSalary);
}
interface EmployeeEventManager {
@EventChannel(NEW_HIRE_CHANNEL)
void sendNewEmployee(Employee employee);
}
public static class Employee {
final String firstName;
final int employeeId;
public Employee(String firstName, int employeeId) {
this.firstName = firstName;
this.employeeId = employeeId;
}
public String getFirstName() {
return firstName;
}
public int getEmployeeId() {
return employeeId;
}
@Override
public String toString() {
return "Employee{" +
"firstName='" + firstName + '\'' +
", employeeId=" + employeeId +
'}';
}
}
public static class EmployeeHiringService {
final EmployeeEventManager eventManager;
final SalaryChangedChannel salaryChangedChannel;
public EmployeeHiringService(final EmployeeEventManager employeeEventManager,
final SalaryChangedChannel salaryChangedChannel) {
this.eventManager = employeeEventManager;
this.salaryChangedChannel = salaryChangedChannel;
}
@QueueCallback(QueueCallbackType.EMPTY)
private void noMoreRequests() {
flushServiceProxy(salaryChangedChannel);
flushServiceProxy(eventManager);
}
@QueueCallback(QueueCallbackType.LIMIT)
private void hitLimitOfRequests() {
flushServiceProxy(salaryChangedChannel);
flushServiceProxy(eventManager);
}
public void hireEmployee(final Employee employee) {
int salary = 100;
System.out.printf("Hired employee %s\n", employee);
//Does stuff to hire employee
eventManager.sendNewEmployee(employee);
salaryChangedChannel.salaryChanged(employee, salary);
}
}
public static class BenefitsService {
@OnEvent(NEW_HIRE_CHANNEL)
public void enroll(final Employee employee) {
System.out.printf("Employee enrolled into benefits system employee %s %d\n",
employee.getFirstName(), employee.getEmployeeId());
}
}
public static class VolunteerService {
@OnEvent(NEW_HIRE_CHANNEL)
public void invite(final Employee employee) {
System.out.printf("Employee will be invited to the community outreach program %s %d\n",
employee.getFirstName(), employee.getEmployeeId());
}
}
public static class PayrollService implements SalaryChangedChannel {
@Override
public void salaryChanged(Employee employee, int newSalary) {
System.out.printf("DIRECT FROM CHANNEL SalaryChangedChannel Employee added to payroll %s %d %d\n",
employee.getFirstName(), employee.getEmployeeId(), newSalary);
}
}
}
Complete Example 3
package io.advantageous.qbit.jms;
import io.advantageous.boon.core.Lists;
import io.advantageous.boon.core.Sys;
import io.advantageous.qbit.QBit;
import io.advantageous.qbit.queue.*;
import io.advantageous.qbit.util.PortUtils;
import org.apache.activemq.broker.BrokerService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
*
* Created 10/8/15.
*/
public class JmsTest {
private Queue<Person> personQueue;
private SendQueue<Person> personSendQueue;
private ReceiveQueue<Person> personReceiveQueue;
private BrokerService broker;
private int port;
@Before
public void setUp() throws Exception {
port = PortUtils.findOpenPortStartAt(4000);
broker= new BrokerService();
broker.addConnector("tcp://localhost:" + port);
broker.start();
final JmsServiceBuilder jmsBuilder = JmsServiceBuilder.newJmsServiceBuilder()
.setDefaultDestination("foobarQueue").setAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE).setPort(port);
final Queue<String> textQueue = new JmsTextQueue(jmsBuilder);
personQueue = new JsonQueue<>(Person.class, textQueue);
personSendQueue = personQueue.sendQueue();
personReceiveQueue = personQueue.receiveQueue();
personSendQueue.shouldBatch();
personSendQueue.name();
personSendQueue.size();
personQueue.name();
personQueue.size();
}
@Test
public void testSendConsume() throws Exception {
personSendQueue.send(new Person("Geoff"));
personSendQueue.send(new Person("Rick"));
personSendQueue.flushSends();
final Person geoff = personReceiveQueue.pollWait();
final Person rick = personReceiveQueue.pollWait();
assertTrue(geoff.name.equals("Rick") || geoff.name.equals("Geoff"));
assertTrue(rick.name.equals("Rick") || rick.name.equals("Geoff"));
assertEquals(true, personQueue.started());
}
@Test
public void testSendConsume2() throws Exception {
personSendQueue.sendAndFlush(new Person("Geoff"));
personSendQueue.sendAndFlush(new Person("Rick"));
final Person geoff = personReceiveQueue.pollWait();
Sys.sleep(100);
final Person rick = personReceiveQueue.poll();
assertTrue(geoff.name.equals("Rick") || geoff.name.equals("Geoff"));
assertTrue(rick.name.equals("Rick") || rick.name.equals("Geoff"));
}
@Test
public void testSendConsume3() throws Exception {
personSendQueue = personQueue.sendQueueWithAutoFlush(10, TimeUnit.MILLISECONDS);
personSendQueue.sendMany(new Person("Geoff"), new Person("Rick"));
final Person geoff = personReceiveQueue.take();
final Person rick = personReceiveQueue.take();
assertTrue(geoff.name.equals("Rick") || geoff.name.equals("Geoff"));
assertTrue(rick.name.equals("Rick") || rick.name.equals("Geoff"));
}
@Test
public void testSendConsume4() throws Exception {
personSendQueue = personQueue.sendQueueWithAutoFlush(QBit.factory().periodicScheduler(),
10, TimeUnit.MILLISECONDS);
personSendQueue.sendBatch(Lists.list(new Person("Geoff"), new Person("Rick")));
final Person geoff = personReceiveQueue.take();
final Person rick = personReceiveQueue.take();
assertTrue(geoff.name.equals("Rick") || geoff.name.equals("Geoff"));
assertTrue(rick.name.equals("Rick") || rick.name.equals("Geoff"));
}
@Test
public void testSendConsume5() throws Exception {
final List<Person> list = Lists.list(new Person("Geoff"), new Person("Rick"));
Iterable<Person> persons = list::iterator;
personSendQueue.sendBatch(persons);
personSendQueue.flushSends();
final Person geoff = personReceiveQueue.pollWait();
final Person rick = personReceiveQueue.pollWait();
assertTrue(geoff.name.equals("Rick") || geoff.name.equals("Geoff"));
assertTrue(rick.name.equals("Rick") || rick.name.equals("Geoff"));
}
@Test
public void testSendConsume6() throws Exception {
personSendQueue.send(new Person("Geoff"));
personSendQueue.send(new Person("Rick"));
personSendQueue.flushSends();
Sys.sleep(2000);
final List<Person> personsBatch = (List<Person>) personReceiveQueue.readBatch();
}
@Test
public void testSendConsume7() throws Exception {
final List<Person> list = Lists.list(new Person("Geoff"), new Person("Rick"));
final Iterable<Person> persons = list::iterator;
personSendQueue.sendBatch(persons);
personSendQueue.flushSends();
final List<Person> personsBatch = (List<Person>) personReceiveQueue.readBatch(5);
}
@Test
public void testSendConsume8() throws Exception {
final ArrayBlockingQueue<Person> personsABQ = new ArrayBlockingQueue<>(100);
personSendQueue.send(new Person("Geoff"));
personSendQueue.send(new Person("Rick"));
personSendQueue.flushSends();
personQueue.startListener(personsABQ::add);
Sys.sleep(1000);
int count = 0;
while (personsABQ.size() < 2) {
Sys.sleep(100);
count++;
if (count > 100) break;
}
Sys.sleep(1000);
assertEquals(2, personsABQ.size());
final Person geoff = personsABQ.poll();
final Person rick = personsABQ.poll();
assertTrue(geoff.name.equals("Rick") || geoff.name.equals("Geoff"));
assertTrue(rick.name.equals("Rick") || rick.name.equals("Geoff"));
}
@Test
public void builder() throws Exception {
JmsServiceBuilder jmsServiceBuilder = JmsServiceBuilder.newJmsServiceBuilder();
jmsServiceBuilder.setJndiSettings(Collections.emptyMap());
jmsServiceBuilder.getJndiSettings();
jmsServiceBuilder.setConnectionFactory(new ConnectionFactory() {
@Override
public Connection createConnection() throws JMSException {
return null;
}
@Override
public Connection createConnection(String userName, String password) throws JMSException {
return null;
}
});
jmsServiceBuilder.getConnectionFactory();
jmsServiceBuilder.setConnectionFactoryName("Foo");
jmsServiceBuilder.getConnectionFactoryName();
jmsServiceBuilder.setContext(null);
jmsServiceBuilder.getContext();
jmsServiceBuilder.setDefaultDestination("foo");
jmsServiceBuilder.getDefaultDestination();
jmsServiceBuilder.setAcknowledgeMode(5);
jmsServiceBuilder.getAcknowledgeMode();
jmsServiceBuilder.setDefaultTimeout(1);
jmsServiceBuilder.getDefaultTimeout();
jmsServiceBuilder.setHost("foo");
jmsServiceBuilder.getHost();
jmsServiceBuilder.setUserName("rick");
jmsServiceBuilder.getUserName();
jmsServiceBuilder.setPassword("foo");
jmsServiceBuilder.getPassword();
jmsServiceBuilder.setProviderURL("");
jmsServiceBuilder.getProviderURL();
jmsServiceBuilder.setProviderURLPattern("");
jmsServiceBuilder.getProviderURLPattern();
jmsServiceBuilder.setConnectionSupplier(null);
jmsServiceBuilder.getConnectionSupplier();
jmsServiceBuilder.setStartConnection(true);
jmsServiceBuilder.isStartConnection();
jmsServiceBuilder.setTransacted(true);
jmsServiceBuilder.isTransacted();
jmsServiceBuilder.setJndiSettings(null);
jmsServiceBuilder.addJndiSetting("foo","bar");
jmsServiceBuilder = JmsServiceBuilder.newJmsServiceBuilder().setPort(port);
jmsServiceBuilder.build().start();
}
@After
public void tearDown() throws Exception {
personQueue.stop();
personReceiveQueue.stop();
personSendQueue.stop();
try {
broker.stop();
} catch (Exception ex) {
ex.printStackTrace();
}
broker = null;
Sys.sleep(1000);
}
private static class Person {
final String name;
private Person(String name) {
this.name = name;
}
}
}
package io.advantageous.qbit.jms;
import io.advantageous.boon.core.Lists;
import io.advantageous.boon.core.Sys;
import io.advantageous.qbit.QBit;
import io.advantageous.qbit.queue.*;
import io.advantageous.qbit.util.PortUtils;
import org.apache.activemq.broker.BrokerService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
*
* Created 10/8/15.
*/
public class JmsTest {
private Queue<Person> personQueue;
private SendQueue<Person> personSendQueue;
private ReceiveQueue<Person> personReceiveQueue;
private BrokerService broker;
private int port;
@Before
public void setUp() throws Exception {
port = PortUtils.findOpenPortStartAt(4000);
broker= new BrokerService();
broker.addConnector("tcp://localhost:" + port);
broker.start();
final JmsServiceBuilder jmsBuilder = JmsServiceBuilder.newJmsServiceBuilder()
.setDefaultDestination("foobarQueue").setAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE).setPort(port);
final Queue<String> textQueue = new JmsTextQueue(jmsBuilder);
personQueue = new JsonQueue<>(Person.class, textQueue);
personSendQueue = personQueue.sendQueue();
personReceiveQueue = personQueue.receiveQueue();
personSendQueue.shouldBatch();
personSendQueue.name();
personSendQueue.size();
personQueue.name();
personQueue.size();
}
@Test
public void testSendConsume() throws Exception {
personSendQueue.send(new Person("Geoff"));
personSendQueue.send(new Person("Rick"));
personSendQueue.flushSends();
final Person geoff = personReceiveQueue.pollWait();
final Person rick = personReceiveQueue.pollWait();
assertTrue(geoff.name.equals("Rick") || geoff.name.equals("Geoff"));
assertTrue(rick.name.equals("Rick") || rick.name.equals("Geoff"));
assertEquals(true, personQueue.started());
}
@Test
public void testSendConsume2() throws Exception {
personSendQueue.sendAndFlush(new Person("Geoff"));
personSendQueue.sendAndFlush(new Person("Rick"));
final Person geoff = personReceiveQueue.pollWait();
Sys.sleep(100);
final Person rick = personReceiveQueue.poll();
assertTrue(geoff.name.equals("Rick") || geoff.name.equals("Geoff"));
assertTrue(rick.name.equals("Rick") || rick.name.equals("Geoff"));
}
@Test
public void testSendConsume3() throws Exception {
personSendQueue = personQueue.sendQueueWithAutoFlush(10, TimeUnit.MILLISECONDS);
personSendQueue.sendMany(new Person("Geoff"), new Person("Rick"));
final Person geoff = personReceiveQueue.take();
final Person rick = personReceiveQueue.take();
assertTrue(geoff.name.equals("Rick") || geoff.name.equals("Geoff"));
assertTrue(rick.name.equals("Rick") || rick.name.equals("Geoff"));
}
@Test
public void testSendConsume4() throws Exception {
personSendQueue = personQueue.sendQueueWithAutoFlush(QBit.factory().periodicScheduler(),
10, TimeUnit.MILLISECONDS);
personSendQueue.sendBatch(Lists.list(new Person("Geoff"), new Person("Rick")));
final Person geoff = personReceiveQueue.take();
final Person rick = personReceiveQueue.take();
assertTrue(geoff.name.equals("Rick") || geoff.name.equals("Geoff"));
assertTrue(rick.name.equals("Rick") || rick.name.equals("Geoff"));
}
@Test
public void testSendConsume5() throws Exception {
final List<Person> list = Lists.list(new Person("Geoff"), new Person("Rick"));
Iterable<Person> persons = list::iterator;
personSendQueue.sendBatch(persons);
personSendQueue.flushSends();
final Person geoff = personReceiveQueue.pollWait();
final Person rick = personReceiveQueue.pollWait();
assertTrue(geoff.name.equals("Rick") || geoff.name.equals("Geoff"));
assertTrue(rick.name.equals("Rick") || rick.name.equals("Geoff"));
}
@Test
public void testSendConsume6() throws Exception {
personSendQueue.send(new Person("Geoff"));
personSendQueue.send(new Person("Rick"));
personSendQueue.flushSends();
Sys.sleep(2000);
final List<Person> personsBatch = (List<Person>) personReceiveQueue.readBatch();
}
@Test
public void testSendConsume7() throws Exception {
final List<Person> list = Lists.list(new Person("Geoff"), new Person("Rick"));
final Iterable<Person> persons = list::iterator;
personSendQueue.sendBatch(persons);
personSendQueue.flushSends();
final List<Person> personsBatch = (List<Person>) personReceiveQueue.readBatch(5);
}
@Test
public void testSendConsume8() throws Exception {
final ArrayBlockingQueue<Person> personsABQ = new ArrayBlockingQueue<>(100);
personSendQueue.send(new Person("Geoff"));
personSendQueue.send(new Person("Rick"));
personSendQueue.flushSends();
personQueue.startListener(personsABQ::add);
Sys.sleep(1000);
int count = 0;
while (personsABQ.size() < 2) {
Sys.sleep(100);
count++;
if (count > 100) break;
}
Sys.sleep(1000);
assertEquals(2, personsABQ.size());
final Person geoff = personsABQ.poll();
final Person rick = personsABQ.poll();
assertTrue(geoff.name.equals("Rick") || geoff.name.equals("Geoff"));
assertTrue(rick.name.equals("Rick") || rick.name.equals("Geoff"));
}
@Test
public void builder() throws Exception {
JmsServiceBuilder jmsServiceBuilder = JmsServiceBuilder.newJmsServiceBuilder();
jmsServiceBuilder.setJndiSettings(Collections.emptyMap());
jmsServiceBuilder.getJndiSettings();
jmsServiceBuilder.setConnectionFactory(new ConnectionFactory() {
@Override
public Connection createConnection() throws JMSException {
return null;
}
@Override
public Connection createConnection(String userName, String password) throws JMSException {
return null;
}
});
jmsServiceBuilder.getConnectionFactory();
jmsServiceBuilder.setConnectionFactoryName("Foo");
jmsServiceBuilder.getConnectionFactoryName();
jmsServiceBuilder.setContext(null);
jmsServiceBuilder.getContext();
jmsServiceBuilder.setDefaultDestination("foo");
jmsServiceBuilder.getDefaultDestination();
jmsServiceBuilder.setAcknowledgeMode(5);
jmsServiceBuilder.getAcknowledgeMode();
jmsServiceBuilder.setDefaultTimeout(1);
jmsServiceBuilder.getDefaultTimeout();
jmsServiceBuilder.setHost("foo");
jmsServiceBuilder.getHost();
jmsServiceBuilder.setUserName("rick");
jmsServiceBuilder.getUserName();
jmsServiceBuilder.setPassword("foo");
jmsServiceBuilder.getPassword();
jmsServiceBuilder.setProviderURL("");
jmsServiceBuilder.getProviderURL();
jmsServiceBuilder.setProviderURLPattern("");
jmsServiceBuilder.getProviderURLPattern();
jmsServiceBuilder.setConnectionSupplier(null);
jmsServiceBuilder.getConnectionSupplier();
jmsServiceBuilder.setStartConnection(true);
jmsServiceBuilder.isStartConnection();
jmsServiceBuilder.setTransacted(true);
jmsServiceBuilder.isTransacted();
jmsServiceBuilder.setJndiSettings(null);
jmsServiceBuilder.addJndiSetting("foo","bar");
jmsServiceBuilder = JmsServiceBuilder.newJmsServiceBuilder().setPort(port);
jmsServiceBuilder.build().start();
}
@After
public void tearDown() throws Exception {
personQueue.stop();
personReceiveQueue.stop();
personSendQueue.stop();
try {
broker.stop();
} catch (Exception ex) {
ex.printStackTrace();
}
broker = null;
Sys.sleep(1000);
}
private static class Person {
final String name;
private Person(String name) {
this.name = name;
}
}
}
No comments:
Post a Comment