QBit Microservice Lib Working With Workers Sharded and Pooled
This tutorial continues where the callbacks tutorial leaves off seeQBit Microservice Lib Working With CallBacks before you tackle this one.
Imagining an app - CPU Bound and IO Bound
Let's recall the app from the first example on callbacks. Remember this app will be a recommendation engine. Think of Cupid.com or DatesRUs.com or iTunes match or NetFlix suggestions or Amazon.com "Customers who bought this also bought these other fine products".
Now we are not building a real recommendation engine although QBit has been used for similar things.
The trick with an example is to keep the concepts clear enough without getting too much clutter with a real world implementation so it can be followed.
RecommendationService
is our CPU intensive service. will be the recommendation engine. We are going to shard CPU instances. Any user data would get pushed into its shard. We would replicate non-user data, and shard user data to live alongside the rules to operate on users.
It our hypothetical example
RecommendationService
is very CPU intensive. Now we can run on X CPUs without duplicating all of the user data which is a lot of data. It has to iterate through products and user data and pick a match. It is a classic N+1. In our example, we do all of this real time based on the latest user activity, to the last second. What page did that just view? What product did they just bookmark? What product did they buy? What product did their friends buy? What product are people in their same demographic buying now. This is real time analytics. This does not mean that there is not machine learning or Hadoop batch jobs churning data some where. But the churned data is mixed with the data science, pre-chewed caches and counters, and up to the second user activity to make a decision based on data in memory and historic data (blended).RecommendationService
is the tip of the ice-burg, but it is brute force, exhaustive and fast.UserDataService
is our heavy IO service. As much as we would like to, we do know which RecommendationService
node a user might land on we can't if we are truly elastic. Are they using an XBox on the west coast, an iPhone in Hawaii, when/how will they hit our recommendation engine who knows. UserDataService
manages editing, backup, and syncing user data. UserDataService
keeps most users in-memory and also manages replicating and storing user data. Once users get loaded into the system, we can keep them in-memory for a while. We will just sync any changes to the user back to the UserDataService
. Since UserDataService
is IO bound, we will create a pool of them.
This is what I want you to think about when I am talking about
UserDataService
andRecommendationService
.UserDataService
IO bound. RecommendationService
CPU bound.
In this tutorial we will cover Sharded Workers and Workers pools.
Creating a pool of IO Workers
To create a pool of IO workers, we will use
io.advantageous.qbit.service.dispatchers.ServiceWorkers.workers
method to create a pooled ServiceWorkers
instance.
Then we will create service queues that wrap our
UserDataService
and add those queues to our userDataServiceWorkers
.Creating a worker pool of services
private static UserDataServiceClient createUserDataServiceClientProxy(
final ServiceBundle serviceBundle,
final int numWorkers) {
final ServiceWorkers userDataServiceWorkers = workers();
for (int index =0; index < numWorkers; index++) {
ServiceQueue userDataService = serviceBuilder()
.setServiceObject(new UserDataService())
.build();
userDataService.startCallBackHandler();
userDataServiceWorkers.addService(userDataService);
}
userDataServiceWorkers.start();
serviceBundle.addServiceConsumer("workers", userDataServiceWorkers);
return serviceBundle.createLocalProxy(UserDataServiceClient.class, "workers");
}
Now we have a pool of IO workers. Every call to the
UserDataServiceClient
will go to a different service queue instance which will go to a different userDataService. The calls are round robin.
That takes care of our heavy IO. We can create just the right amount of workers which will talk to our backend database or key/value store. Next we need to create our CPU intensive service, our recommendation engine so that we can utilize all of our CPU cores when evaluating user data. Instead of copying user data to each shard, each shard will have a portion of the users.
This is very much like message driven beans except that you have more methods than just
onMessage
and you get the benefits of a high-speed queue system.Creating a CPU intensive shards to maximize CPU intensive services and use all of your cores
In this example, we will use a canned shard rule which will shard on the hashcode of the first argument to each method. We would want that first argument to be something like userName or some other object that would give us a nice consistent hashCode to use to divvy up users so we can execute the CPU intensive rules right next to the actual user data that we have in memory. We use the method
io.advantageous.qbit.service.dispatchers.ServiceWorkers.shardOnFirstArgumentWorkers
, and there are many such methods on the ServiceWorkers
class. You can also create your own ShardRule
s and pass that to the ServiceWorkers.shardedWorkers
method.
Other than that the code looks pretty similar to what we did with the IO bound workers. We pass the service queue client proxy
userDataServiceClient
from the last creation method as an argument to this one so that this recommendationService
can calluserDataServiceClient
as needed.Creating a sharded set of services
private static RecommendationServiceClient createRecommendationServiceClientProxy(
final ServiceBundle serviceBundle,
final UserDataServiceClient userDataServiceClient,
int numWorkers) {
final ServiceWorkers recommendationShardedWorkers = shardOnFirstArgumentWorkers();
for (int index = 0; index < numWorkers; index++) {
RecommendationService recommendationServiceImpl =
new RecommendationService(userDataServiceClient);
ServiceQueue serviceQueue = serviceBuilder()
.setServiceObject(recommendationServiceImpl)
.build();
serviceQueue.startCallBackHandler();
recommendationShardedWorkers.addService(serviceQueue);
}
recommendationShardedWorkers.start();
serviceBundle.addServiceConsumer("recommendation", recommendationShardedWorkers);
return serviceBundle.createLocalProxy(RecommendationServiceClient.class, "recommendation");
}
Each time the service queue client proxy is called, i.e.,
RecommendationServiceClient
it will select on the N RecommendationService
service queues to handle the method call. If we could only handle 20,000 recommendation lists per second for users, then with 5 CPU cores, we can approach 100,000 recommendation lists per second.Putting it altogether
The complete example with the changes for worker pools and sharded pools
package io.advantageous.qbit.example;
import io.advantageous.qbit.QBit;
import io.advantageous.qbit.service.ServiceBundle;
import io.advantageous.qbit.service.ServiceQueue;
import io.advantageous.qbit.service.dispatchers.ServiceWorkers;
import org.boon.core.Sys;
import static io.advantageous.qbit.service.ServiceBundleBuilder.serviceBundleBuilder;
import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;
import java.util.List;
import static io.advantageous.qbit.service.ServiceBuilder.serviceBuilder;
import static io.advantageous.qbit.service.dispatchers.ServiceWorkers.shardOnFirstArgumentWorkers;
import static io.advantageous.qbit.service.dispatchers.ServiceWorkers.workers;
import static org.boon.Lists.list;
/**
* Created by rhightower on 2/20/15.
*/
public class PrototypeMain {
public static void main(String... args) {
QBit.factory().systemEventManager();
final ServiceBundle serviceBundle = serviceBundleBuilder()
.setAddress("/root").build();
serviceBundle.start();
final UserDataServiceClient userDataServiceClient =
createUserDataServiceClientProxy(serviceBundle, 8);
final RecommendationServiceClient recommendationServiceClient =
createRecommendationServiceClientProxy(serviceBundle,
userDataServiceClient, 4);
List<String> userNames = list("Bob", "Joe", "Scott", "William");
userNames.forEach( userName->
recommendationServiceClient.recommend(recommendations -> {
System.out.println("Recommendations for: " + userName);
recommendations.forEach(recommendation->
System.out.println("\t" + recommendation));
}, userName)
);
flushServiceProxy(recommendationServiceClient);
Sys.sleep(1000);
}
private static RecommendationServiceClient createRecommendationServiceClientProxy(
final ServiceBundle serviceBundle,
final UserDataServiceClient userDataServiceClient,
int numWorkers) {
final ServiceWorkers recommendationShardedWorkers = shardOnFirstArgumentWorkers();
for (int index = 0; index < numWorkers; index++) {
RecommendationService recommendationServiceImpl =
new RecommendationService(userDataServiceClient);
ServiceQueue serviceQueue = serviceBuilder()
.setServiceObject(recommendationServiceImpl)
.build();
serviceQueue.startCallBackHandler();
recommendationShardedWorkers.addService(serviceQueue);
}
recommendationShardedWorkers.start();
serviceBundle.addServiceConsumer("recomendation", recommendationShardedWorkers);
return serviceBundle.createLocalProxy(RecommendationServiceClient.class, "recomendation");
}
private static UserDataServiceClient createUserDataServiceClientProxy(
final ServiceBundle serviceBundle,
final int numWorkers) {
final ServiceWorkers userDataServiceWorkers = workers();
for (int index =0; index < numWorkers; index++) {
ServiceQueue userDataService = serviceBuilder()
.setServiceObject(new UserDataService())
.build();
userDataService.startCallBackHandler();
userDataServiceWorkers.addService(userDataService);
}
userDataServiceWorkers.start();
serviceBundle.addServiceConsumer("workers", userDataServiceWorkers);
return serviceBundle.createLocalProxy(UserDataServiceClient.class, "workers");
}
}
What is QBit again?
QBit is a queuing library for microservices. It is similar to many other projects like Akka, Spring Reactor, etc. QBit is just a library not a platform. QBit has libraries to put a service behind a queue. You can use QBit queues directly or you can create a service. QBit services can be exposed by WebSocket, HTTP, HTTP pipeline, and other types of remoting. A service in QBit is a Java class whose methods are executed behind service queues. QBit implements apartment model threading and is similar to the Actor model or a better description would be Active Objects. QBit does not use a disruptor. It uses regular Java Queues. QBit can do north of 100 million ping pong calls per second which is an amazing speed (seen as high as 200M). QBit also supports calling services via REST, and WebSocket. QBit is microservices in the pure Web sense: JSON, HTTP, WebSocket, etc. QBit uses micro batching to push messages through the pipe (queue, IO, etc.) faster to reduce thread hand-off.
QBit lingo
QBit is a Java microservice lib supporting REST, JSON and WebSocket. It is written in Java but we could one day write a version in Rust or Go or C# (but that would require a large payday).
Service POJO (plain old Java object) behind a queue that can receive method calls via proxy calls or events (May have one thread managing events, method calls, and responses or two one for method calls and events and the other for responses so response handlers do not block service. One is faster unless responses block). Services can use Spring MVC style REST annotations to expose themselves to the outside world via REST and WebSocket.
ServiceBundle Many POJOs behind one response queue and many receive queues. There may be one thread for all responses or not. They also can be one receive queue.
Queue A thread managing a queue. It supports batching. It has events for empty, reachedLimit, startedBatch, idle. You can listen to these events from services that sit behind a queue. You don't have to use Services. You can use Queue's direct. In QBit, you have sender queues and receivers queues. They are separated to support micro-batching.
ServiceServer ServiceBundle that is exposed to REST and WebSocket communication.
EventBus EventBus is a way to send a lot of messages to services that may be loosely coupled.
ClientProxy ClientProxy is a way to invoke service through async interface, service can be inproc (same process) or remoted over WebSocket.
Non-blocking QBit is a non-blocking lib. You use CallBacks via Java 8 Lambdas. You can also send event messages and get replies. Messaging is built into the system so you can easily coordinate complex tasks. QBit takes an object-oriented approach to service development so services look like normal Java services that you already write, but the services live behind a queue/thread. This is not a new concept. Microsoft did this with DCOM/COM and called it active objects. Akka does it with actors and called them strongly typed Actors. The important concepts is that you get the speed of reactive and actor style messaging but you develop in a natural OOP approach. QBit is not the first. QBit is not the only.
Speed QBit is VERY fast. There is a of course a lot of room for improvement. But already 200M+ TPS inproc ping pong, 10M-20M+ TPS event bus, 500K TPS RPC calls over WebSocket/JSON, etc. More work needs to be done to improve speed, but now it is fast enough where we are focusing more on usability. The JSON support uses Boon by default which is up to 4x faster than other JSON parsers for the REST/JSON, WebSocket/JSON use case.
Would you like to learn more about QBit?
- [Detailed Tutorial] QBit microservice example
- [Detailed Tutorial] Building a single page; Todo List Application with QBit
- [Detailed Tutorial] Working with inproc MicroServices within QBit.
- [Doc] QBit Java microservice lib auto flushing service queue proxies
- [Doc] QBit Java microservice lib introducing EventBus replication and EventBus connectors
- [Doc] QBit microservice Java lib Using auto flushing client proxies
- [Doc] QBit Microservice WebSocket wire protocol draft 01 JSON ASCII method batching RPC
- [Doc] Queue Callbacks for QBit queue based services
- [Doc] Using QBit microservice lib's HttpClient GET, POST, et al, JSON, Java 8 Lambda
- [Doc] Using QBit microservice lib's WebSocket support
- [Quick Start] Building a simple Rest web microservice server with QBit
- [Quick Start] Building a single page; Todo List Application with QBit
- [Quick Start] building a single web page application with QBit
- [Quick Start] Building a TODO web microservice client with QBit
- [Quick Start] Building a TODO web microservice server with QBit
- [Quick Start] Building boon for the QBit microservice engine
- [Quick Start] Building QBit the microservice lib for Java
- [Quick Start] Working with inproc MicroServices within QBit
- [Rough Cut] Delivering up Single Page Applications from QBit Java JSON Microservice lib
- [Rough Cut] QBit Java Microservice Lib Working With Workers Sharded and Pooled
- [Rough Cut] QBit Microservice Lib Working With CallBacks
- [Rough Cut] QBit Microservices using Service Workers and sharded service workers
- [Rough Cut] Using QBit microservice lib's REST support with URI Params
- [Rough Cut] Using QBit microservice lib with Spring Boot
- [Rough Cut] Working with event bus for QBit the microservice engine
- [Rough Cut] Working with inproc MicroServices
- [Rough Cut] Working with private event bus for inproc microservices
- [Rough Cut] Working with strongly typed event bus proxies for QBit Java Microservice lib
- [Rough Cut] Working with System Manager for QBit Mircoservice lib
- [Z Blog] Qbit servlet support also a preview of using jetty and the QBit microservice lib together
- [Z Notebook] More benchmarking internal
- [Z Notebook] Performance testing for REST
- [Z Notebook] Roadmap
- Home
- Introduction to QBit
- Local Service Proxies
- OLD Home
- QBit Boon New Wave of JSON HTTP and Websocket
- QBit Docs
No comments:
Post a Comment