Saturday, February 27, 2016

Step 6 Using ZooKeeper as the cluster manager

At some point we decided we are going to deploy to EC2/Mesos, and we will connect the Vertx nodes using ZooKeeper.
We just want to validate that this works. We will do that by starting two instances of our app and sending a broadcast message on the eventBus and making sure both nodes got the message.
You can find the source for this step here.

Installing Zookeeper

Install Zookeeper

$ brew install zookeeper

Run ZooKeeper

$ zkServer start
To do this, we will add an extra REST end point to our example app.


public class WebVerticle extends AbstractVerticle {

        /* Register a new handler to send broadcast message. */
        router.route("/broadcast/*").handler(event -> broadcastHello(event.request()));


    private void broadcastHello(HttpServerRequest request) {

        request.response().end("SENT BROADCAST");
The eventBus.publish will publish a message to all connected nodes.
Then we handle the event bus message in the first example that we created

HelloWorldService.kt Kotlin

    private fun dispatchMessage(message: Message<Any>) {

        try {
            val operation = HelloWorldOperations.valueOf(message.body().toString())

            /** Switch statement that handles various operations. */
            when (operation) {
                HelloWorldOperations.SAY_HELLO_WORLD -> message.reply("HELLO WORLD FROM KOTLIN")

                HelloWorldOperations.BROADCAST_HELLO_TO_ALL_NODES -> println("GOT HELLO BROADCAST!!!!!!!!!!!!!")
                else -> {
                    logger.error("Unable to handle operation {}", operation)
                    message.reply("Unsupported operation")
Pretty simple. We will start up two instances and then send a

Send a broadcast message

 $ curl http://localhost:8080/broadcast/
We should see a broadcast debug message in both.

Output from both nodes

Note the second node that we start will complain about not being able to bind to port 8080. This is ok. We are communicating with the clustered nodes with the vertx event bus.
Before we can run this, we need to install vertx's zookeeper support which is not yet distributed with vertx.

Installing vertx zookeeper Cluster Manager

You will need to check out vertx zookeeper support and build it.
Clone the vertx zookeeper repo and then build it with maven as follows.
$ git clone https://github.com/vert-x3/vertx-zookeeper.git
$ cd vertx-zookeeper
$ mvn clean install  -Dmaven.test.skip=true

Modify gradle build to use zookeeper

We will need to add this as a dependency of our project and create a script that can run the shadow jar file with the cluster options.

gradle.build adding dependency

dependencies {
    compile "io.vertx:vertx-core:3.2.0"
    compile 'io.vertx:vertx-zookeeper:3.2.0-SNAPSHOT' //ADDED THIS
    testCompile group: 'junit', name: 'junit', version: '4.11'
Then we will need to write up a script that knows how to run vertx shadow jar for this project with the Zookeeper cluster manager installed.

gradle.build run shadow jar file with ZookeeperClusterManager installed

task runShadowJar(dependsOn: shadowJar,  description: 'Runs the system executing the shadow jar.')  {
    def arguments = ["$buildDir/libs/$project.name-$version-fat.jar", "-cluster"]
    def jvmArguments = ["-Dvertx.cluster.managerClass=io.vertx.spi.cluster.impl.zookeeper.ZookeeperClusterManager"]

    javaexec {
        main = "-jar"
        jvmArgs jvmArguments
        args arguments
We also added a main method to MainService that configures the ZookeeperClusterManager so you can run this from your IDE.

Running from IDE

fun main(args: Array<String>) {

    val logger = LoggerFactory.getLogger(MainVerticle::class.java)
    val zkConfig = Properties();
    zkConfig.setProperty("hosts.zookeeper", "");
    zkConfig.setProperty("path.root", "io.vertx");
    zkConfig.setProperty("retry.initialSleepTime", "1000");
    zkConfig.setProperty("retry.intervalTimes", "3");

    val clusterManager =  ZookeeperClusterManager(zkConfig);
    val options =  VertxOptions().setClusterManager(clusterManager);

    Vertx.clusteredVertx(options) { response ->
        if (response.succeeded()) {
            val vertx = response.result();
            logger.info("Main is deployed")
        } else {
            logger.error("Issue deploying main", response.cause())

Later we will setup -Dvertx.zookeeper.conf=/somepath/zookeeper.properties so we can specify how we want to contact our Zookeeper cluster.

Run the example

Run this once

$ gradle runShadowJar 

Run this again in a new terminal

$ gradle runShadowJar 

Run the endpoint that does the broadcast

$ curl http://localhost:8080/broadcast/

Output from both vertx instances

Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training