Saturday, February 27, 2016

Step 4 Call Vertx app from event bus from Node

You can find the steps for step 4 in this branch.

Install node

If you don't have node setup, then now is a good time.
$ brew install node


==> Reinstalling node
==> Downloading https://homebrew.bintray.com/bottles/node-5.3.0.el_capitan.bottle.tar.gz
Already downloaded: /Library/Caches/Homebrew/node-5.3.0.el_capitan.bottle.tar.gz
==> Pouring node-5.3.0.el_capitan.bottle.tar.gz
==> Summary
🍺  /usr/local/Cellar/node/5.3.0: 2827 files, 37M


$ node -v

$ npm -v
You should see later versions of both projects.
Create a project folder and navigate to it. We put all node files in {root_project}/node and we put all vertx files in {root_project}/vertx.

Init project

$ npm init

Install http

$ npm install http --save


  "name": "node-2-vertx",
  "version": "1.0.0",
  "description": "Call Vertx",
  "main": "run.js",
  "scripts": {
    "test": "test"
  "author": "Rick Hightower, Geoff Chandler",
  "license": "ISC",
  "dependencies": {
    "http": "0.0.0"
Just to see if we have everything setup.

Call vertx service via HTTP

var http = require('http');

var options = {
  host: 'localhost',
  path: '/hello',
  port: 8080

callback = function(response) {
  var str = '';

  //another chunk of data has been received, so append it to `str`
  response.on('data', function (chunk) {
    str += chunk;

  //the whole response has been received, so we just print it out here
  response.on('end', function () {
    console.log("FROM SERVER " + str);

http.request(options, callback).end();

Next up, Let's use the event bus from this NPM module vertx3 NPM module.

Adding dependency for vertx event bus

$ npm install vertx3-eventbus-client  --save
$ npm install sockjs-client  --save
This will add the dependencies to your package.json file in your node folder.
Let's review the Java code which calls the bus and let's look at the code to install the vertx bridge. First we need to install vertx-web support, and then we need the SockJS bridge support. We will also install the TCP event bus bridge so we can call this microservice from not only Node but other Vertx/Java microservices.

Gradle build script build.gradle

dependencies {
    compile "io.vertx:vertx-core:3.2.0"
    compile 'io.vertx:vertx-tcp-eventbus-bridge:3.2.0' // ** ADDED
    compile 'io.vertx:vertx-web:3.2.0'                 //** ADDED
    testCompile group: 'junit', name: 'junit', version: '4.11'
Next we need to configure the event bus bridge. We can do this in the WebVerticle as follows.

WebVerticle.java configure event bus sockJS bridge

package com.github.vertx.node.example;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.sockjs.BridgeOptions;
import io.vertx.ext.web.handler.sockjs.PermittedOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

 * This gets started by the MainVerticle.
 * It configures the event bus bridge and install the REST routes.
public class WebVerticle extends AbstractVerticle {

    private final Logger logger = LoggerFactory.getLogger(WebVerticle.class);

    public void start() {

        /* Create vertx web router. */
        final Router router = Router.router(vertx);

        /* Install our original "REST" handler at the /hello/ uri. */
        router.route("/hello/*").handler(event -> handleHttpRequestToHelloWorld(event.request()));

        /* Allow Hello World service to be exposed to Node.js. */
        final BridgeOptions options = new BridgeOptions()
                        new PermittedOptions().setAddress(Services.HELLO_WORLD.toString()));

        /* Configure bridge at this HTTP/WebSocket URI. */

        /* Install router into vertx. */

    /** This REST endpoint if for hello.
     *  It invokes the hello world service via the event bus.
     * @param httpRequest HTTP request from vertx.
    private void handleHttpRequestToHelloWorld(final HttpServerRequest httpRequest) {

        /* Invoke using the event bus. */
                HelloWorldOperations.SAY_HELLO_WORLD.toString(), response -> {

           /* If the response was successful, this means we were able to execute the operation on
              the HelloWorld service.
              Pass the results to the http request's response.
            if (response.succeeded()) {
                /* Send the result to the http connection. */
                logger.debug("Successfully invoked HelloWorld service {}", response.result().body());
            } else {
                logger.error("Can't send message to hello world service", response.cause());
                //noinspection ThrowableResultOfMethodCallIgnored
Notice that we had to add the event topic new PermittedOptions().setAddress(Services.HELLO_WORLD.toString() to expose it to the outside world (in this case the Node.js world).
HelloWorldService.kt and MainService.kt stay the same. (Kotlin files). We will include them at the end with some extra comments. Now we can change our node.js example to use the event bus bridge.

Change node to use event bus to call hello world service instead of REST

var EventBus = require('vertx3-eventbus-client');
var eventBus = new EventBus("http://localhost:8080/eventbus/");

/** Don't call until the event bus is open. */
function onopenEventBus() {

      //Call using event bus.
              "SAY_HELLO_WORLD", function(response, json) {

/** Get notified of errors. */
function onerrorEventBus(error) {
  console.log("Problem calling event bus " + error)

eventBus.onopen = onopenEventBus;
eventBus.onerror = onerrorEventBus;
For completeness here is the Kotlin files up to this point:


package com.github.vertx.node.example

import io.vertx.core.AbstractVerticle
import io.vertx.core.Vertx
import org.slf4j.LoggerFactory
import java.util.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.collections.forEach

enum class Services {

public class MainVerticle : AbstractVerticle() {

    private val logger = LoggerFactory.getLogger(MainVerticle::class.java)

    override fun start() {

        /** Count of services.  */
        val serviceCount = AtomicInteger()

        /** List of verticles that we are starting.  */
        val verticles = Arrays.asList(HelloWorldVerticle(), WebVerticle())

        verticles.forEach { verticle ->
            vertx.deployVerticle(verticle) { deployResponse ->

                if (deployResponse.failed()) {
                    logger.error("Unable to deploy verticle ${verticle.javaClass.simpleName}",
                } else {
                    logger.info("${verticle.javaClass.simpleName} deployed")

        /** Wake up in five seconds and check to see if we are deployed if not complain.  */
        vertx.setTimer(TimeUnit.SECONDS.toMillis(5)) { event ->

            if (serviceCount.get() != verticles.size) {
                logger.error("Main Verticle was unable to start child verticles")
            } else {
                logger.info("Start up successful")



fun main(args: Array<String>) {
    val vertx = Vertx.vertx()


package com.github.vertx.node.example

import io.vertx.core.AbstractVerticle
import io.vertx.core.eventbus.Message
import org.slf4j.LoggerFactory

enum class HelloWorldOperations {


/** Hello World Verticle gets started by Main Verticle.
 * Listens to the event bus.
class HelloWorldVerticle : AbstractVerticle() {

    private val logger = LoggerFactory.getLogger(HelloWorldVerticle::class.java)

    override fun start() {
        vertx.eventBus().consumer<Any>(Services.HELLO_WORLD.toString()) { message -> dispatchMessage(message) }

     * Handles message from the event bus.
    private fun dispatchMessage(message: Message<Any>) {

        try {
            val operation = HelloWorldOperations.valueOf(message.body().toString())

            /** Switch statement that handles various operations. */
            when (operation) {
                HelloWorldOperations.SAY_HELLO_WORLD -> message.reply("HELLO WORLD FROM KOTLIN")
                else -> {
                    logger.error("Unable to handle operation {}", operation)
                    message.reply("Unsupported operation")
        } catch (ex: Exception) {
            logger.error("Unable to handle operation due to exception" + message.body(), ex)

Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training