Rick

Rick
Rick

Sunday, February 21, 2016

Step 2 Change Hello World to use EventBus (Microservices with Vertx)

Now we want to deploy to verticles running in the same JVM. The first verticle is an HTTP verticle and it will send a Message on the Vert.x EventBus.
Now we are going to change things up a bit. We will have a main verticle called MainVerticlewhich will start up two verticles, namely, HelloWorldVerticle and WebVerticle.
You can find the source for this in this branch.
We also setup logging.
One of the biggest mistakes is not logging errors in an async program.
The HelloWorldVerticle handles events from the event bus, and responds.

Change HelloWorldVerticle to use event bus instead of HTTP Server

package com.github.vertx.node.example;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HelloWorldVerticle extends AbstractVerticle {

    private final Logger logger = LoggerFactory.getLogger(HelloWorldVerticle.class);
    @Override
    public void start() {
        vertx.eventBus().consumer(Services.HELLO_WORLD.toString(), message -> {
            dispatchMessage(message);
        });
    }

    private void dispatchMessage(final Message<Object> message) {

        try {
            final HelloWorldOperations operation = HelloWorldOperations.valueOf(message.body().toString());
            switch (operation) {
                case SAY_HELLO_WORLD:
                    message.reply("HELLO WORLD");
                    break;
                default:
                    logger.error("Unable to handle operation {}", operation);
                    message.reply("Unsupported operation");
            }
        }catch (final Exception ex) {
            logger.error("Unable to handle operation due to exception" + message.body(), ex);
        }
    }

}

The WebVerticle handles all HTTP request by delegating to the HelloWorldVerticle.

Create WebVerticle to send a hello world request message to the HelloVerticle

package com.github.vertx.node.example;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.http.HttpServerRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WebVerticle extends AbstractVerticle {


    private final Logger logger = LoggerFactory.getLogger(WebVerticle.class);

    @Override
    public void start() {
        vertx.createHttpServer()
                .requestHandler(httpRequest -> handleHttpRequest(httpRequest) )
                .listen(8080);
    }

    private void handleHttpRequest(final HttpServerRequest httpRequest) {

        /* Invoke using the event bus. */
        vertx.eventBus().send(Services.HELLO_WORLD.toString(),
                HelloWorldOperations.SAY_HELLO_WORLD.toString(), response -> {

            if (response.succeeded()) {
                /* Send the result from HelloWorldService to the http connection. */
                httpRequest.response().end(response.result().body().toString());
            } else {
                logger.error("Can't send message to hello service", response.cause());
                httpRequest.response().setStatusCode(500).end(response.cause().getMessage());
            }
        });
    }
}

We define some constants via Enums to operations and such.

Enum for service names and operations

package com.github.vertx.node.example;

public enum Services {

    HELLO_WORLD
}

package com.github.vertx.node.example;

public enum HelloWorldOperations {

    SAY_HELLO_WORLD
}
The MainVerticle starts up the other two verticles, and then uses a vertx timer to see when the operation completes.
Note we added a main method so it would be easy to startup in our IDE.

Define a verticle to start the other two.

package com.github.vertx.node.example;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Vertx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class MainVerticle extends AbstractVerticle {



    private final Logger logger = LoggerFactory.getLogger(MainVerticle.class);

    @Override
    public void start() throws InterruptedException {

        /** Count of services. */
        final AtomicInteger serviceCount = new AtomicInteger();

        /** List of verticles that we are starting. */
        final List<AbstractVerticle> verticles = Arrays.asList(new HelloWorldVerticle(), new WebVerticle());

        verticles.stream().forEach(verticle -> vertx.deployVerticle(verticle, deployResponse -> {

            if (deployResponse.failed()) {
                logger.error("Unable to deploy verticle " + verticle.getClass().getSimpleName(),
                        deployResponse.cause());
            } else {
                logger.info(verticle.getClass().getSimpleName() + " deployed");
                serviceCount.incrementAndGet();
            }
        }));


        /** Wake up in five seconds and check to see if we are deployed if not complain. */
        vertx.setTimer(TimeUnit.SECONDS.toMillis(5), event -> {

            if (serviceCount.get() != verticles.size()) {
                logger.error("Main Verticle was unable to start child verticles");
            } else {
                logger.info("Start up successful");
            }
        });

    }


    public static void main(final String... args) {
        final Vertx vertx = Vertx.vertx();
        vertx.deployVerticle(new MainVerticle());
    }
}
We added logging to the gradle build file as well as a logback.xml file to the test resources.
In addition to adding logging, we changed the main verticle to MainVerticle fromHelloWorldVerticle.

Updated gradle file

plugins {
    id 'java'
    id 'application'
    id 'com.github.johnrengelman.shadow' version '1.2.2'
    id 'idea'
}

group 'rickhigh'
version '1.0-SNAPSHOT'


idea {
    project {
        languageLevel = '1.8'
    }
}
repositories {
    mavenCentral()
    maven {
        url = 'http://oss.sonatype.org/content/repositories/snapshots/'
    }
    mavenLocal() /* Just in case we want to use local artifacts that we build locally. */
}


sourceCompatibility = '1.8'
mainClassName = 'io.vertx.core.Launcher'

dependencies {
    compile "io.vertx:vertx-core:3.2.0"
    compile 'ch.qos.logback:logback-core:1.1.3'
    compile 'ch.qos.logback:logback-classic:1.1.3'
    compile 'org.slf4j:slf4j-api:1.7.12'
    testCompile group: 'junit', name: 'junit', version: '4.11'
}

/* used to create the fat jar files. */
shadowJar {
    classifier = 'fat'
    manifest {
        attributes 'Main-Verticle': 'com.github.vertx.node.example.MainVerticle'
    }
    mergeServiceFiles {
        include 'META-INF/services/io.vertx.core.spi.VerticleFactory'
    }
}

task wrapper(type: Wrapper) {
    gradleVersion = '2.9'
}

Notice the main veritcle changed

shadowJar {
    classifier = 'fat'
    manifest {
        attributes 'Main-Verticle': 'com.github.vertx.node.example.MainVerticle'
    }
    mergeServiceFiles {
        include 'META-INF/services/io.vertx.core.spi.VerticleFactory'
    }
}
Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training