Rick

Rick
Rick

Friday, April 1, 2016

New release. QBit Microservices Lib now supports SockJS and Kafka integration (You can invoke QBit service methods over Kafka/SockJS)


There is a new release of QBit microservices lib. This has the bug fixes and features that the some of our clients requested, and some new bridge tech. (included is JS and Java example of using new bridge tech).
New version just released.
compile 'io.advantageous.qbit:qbit-admin:0.9.3-M2'
compile 'io.advantageous.qbit:qbit-vertx:0.9.3-M2'
One more thing release 0.9.3-M4 or 0.9.3 (the one after this one, which will be out in less than a week), will have a nice new feature for integration with things like Kafka and SockJS. The new release has a Vertx/EventBus connector feature (code is written and tested). This will allow any app that can uses the Vertx EventBus to invoke QBit service methods over the event bus. There are connectors that connect Kafka to the Vertx EventBus, and other connectors. This effectively allows QBit service methods to be invoked from Kafka and SockJS. (Example is included.) :)
More importantly we improved the service queue dispatch and callback mechanism to make writing bridges to other forms of IO and messaging a lot easier, which give more options when streaming in method calls from reactive streams, which gives even more micorservices cred to QBit. This allows easier integration with other languages. 
By supporting this new approach, you can now invoke QBit service methods from Node / Browser using the Vertx EventBus bridge as well. Included is an JavaScript/NPM that I worked on with some other folks and some Java code to show how to do this.
You just need to send a header with the method name (using method) (part of the event bus call). The EventBus address is the QBit object address. The body of the message is a list of params to the method (in JSON). We support generic arguments (collections of Employees).

JavaScript example that talks to QBit over Vertx Event bus

It should be easy to adapt this to any programming langauge that can use the TCP Bridge Vertx EventBus bridge or the SockJS Vertx EventBus Bridge

Java service we are going to call from JavaScript/ES6

@RequestMapping("/es/1.0")
public class EmployeeService {

    @Bridge("/employee/")
    public boolean addEmployee(final Employee employee) {
        System.out.println(employee);
        return true;
    }

    @Bridge("/employee/err/")
    public boolean addEmployeeError(final Employee employee) {
        throw new IllegalStateException("Employee can't be added");
    }

    @Bridge
    public void getEmployee(final Callback<Employee> callback, final String id) {
        callback.returnThis(new Employee(id, "Bob", "Jingles", 1962, 999999999));
    }


    @GET("/employee/")
    public void getEmployeeWithParam(final Callback<Employee> callback, 
                                     @RequestParam(value = "id", required = true) final String id) {
        callback.returnThis(new Employee(id, "Bob", "Jingles", 1962, 999999999));
    }

    @Bridge
    public Employee singleton(final Employee employee) {
        return employee;
    }

    @Bridge
    public boolean twoArg(final Employee employee, boolean flag) {

        return employee.getId().equals("rick") && flag;
    }

    @Bridge
    public List<Employee> list(final List<Employee> employees) {
        return employees;
    }

serviceClient.js - NPM client lib to talk to QBit over Vertx/EventBus (next version of QBit, which will be out in a few days)

"use strict";
import logger from "winston";

export default class ServiceClient {

  constructor(asyncEventbusProvider, serviceAddress) {
    logger.verbose('constructing a service client for ' + serviceAddress);
    this._asyncEventbusProvider = asyncEventbusProvider;
    this._serviceAddress = serviceAddress;
  }

  get connected() {
    return this._connected;
  }

  connect() {
    const client = this;
    return new Promise((resolve, reject) => {
      if (this._connected) {
        logger.verbose('already connected.');
        resolve();
      } else {
        try {
          client._asyncEventbusProvider((error, eventBus) => {
            if (error) {
              reject(error);
              return;
            }
            client._eventbus = eventBus;
            logger.info('Connecting to vertx');
            client._eventbus.onopen = () => {
              logger.verbose('established connection to eventbus');
              client._connected = true;
              resolve();
            };
            client._eventbus.onclose = () => {
              logger.warn('Connection closed');
              client._connected = false;
              reject('connection to the eventbus has been closed');
            };
            client._eventbus.onerror = (error) => {
              logger.error('There was an error: ', error);
              reject(error);
            }
          });
        } catch (e) {
          logger.error('Exception encountered connecting to eventbus', e);
          reject(e);
        }
      }
    });
  }

  //TODO: it would be cool to use a proxy here... but sadly it's not supported in our version of node. someday.
  invoke(method, params) {
    if (this._connected) {
      return this._doInvoke(method, params);
    } else {
      return new Promise((resolve, reject) => {
        this.connect().then(() => {
          this._doInvoke(method, params).then(resolve).catch(reject);
        }).catch(reject);
      });
    }
  }

  _doInvoke(method, params) {
    const client = this;
    return new Promise((resolve, reject) => {
      try {
        client._eventbus.send(this._serviceAddress, JSON.stringify(params),
          {'method':method},
          (error, result) => {
            logger.debug('Vertx error result', error, result);
            if (error) {
              logger.error('error message from vertx or socketJS ', error);
              reject(error.body);
            } else if (result.failureCode) {
              logger.error('error from app  ', result);
              reject(result.message);
            } else {
              const body = JSON.parse(result.body);
              logger.debug('Success ', body);
              resolve(body);
            }
          });
      } catch (e) {
        logger.error('exception invoking method ' + method, e);
        reject(e);
      }
    });
  }

}

index.js for NPM

import ServiceClient from "./serviceClient";

export default ServiceClient

mockEventBus.js for unit testing NPM

import logger from "winston";
import {testEmployee, employees} from "./serviceClientSpec";

class EventBus {

  constructor(url) {
    const self = this;
    logger.debug('initializing mock eventbus for ' + url);
    setTimeout(() => {
      self.onopen && self.onopen();
    }, 5);
  }

  send(address, message, headers, callback) {

    const method = headers['method'];
    logger.debug(`calling ${method} on ${address}`);
    if ('addEmployee' === method) {
      callback(null, {body: true});
    } else if ('getEmployee' === method) {
      callback(null, {body: JSON.stringify(testEmployee)});
    }else if ('twoArg' === method) {
        callback(null, {body: true});
    }else if ('list' === method) {
      callback(null, {body: JSON.stringify(employees)});
    } else {
      callback({body: '[Unable to find handler]'});
    }
  }
}

module.exports = EventBus;

serviceClientSpec.js client spec for testing NPM QBit client, shows you how to use the client lib

"use strict";
import ServiceClient from "lib/index";
import logger from "winston";
import should from "should";

logger.level = 'debug';

export const testEmployee = {
  "id": "5",
  "firstName": "Bob",
  "lastName": "Jingles",
  "birthYear": 1962,
  "socialSecurityNumber": 999999999
};

export const testEmployee2 = {
  "id": "rick",
  "firstName": "Rick",
  "lastName": "Jingles",
  "birthYear": 1962,
  "socialSecurityNumber": 999999999
};

export const employees = [testEmployee2, testEmployee];

describe('ServiceClient', () => {

  let EventBus;

  before(() => {
    EventBus = require(process.env.INTEGRATION_TESTS ? "vertx3-eventbus-client" : "./mockEventBus");
  });

  const provider = (callback) => {
    callback(null, new EventBus('http://localhost:8080/eventbus/'));
  };

  it('should be able to connect', (done) => {
      const serviceClient = new ServiceClient(provider, '/es/1.0');
      serviceClient.connect().then(done);
    }
  );

  it('should invoke a service call when already connected', (done) => {
      const serviceClient = new ServiceClient(provider, '/es/1.0');
      serviceClient.connect().then(() => {
        serviceClient.invoke('addEmployee', [testEmployee]).then(
          (result) => {
            result.should.be.equal(true);
            logger.info("result", result);
            done();
          });
      });
    }
  );

  it('should invoke a service call when not already connected', (done) => {
      const serviceClient = new ServiceClient(provider, '/es/1.0');
      serviceClient.invoke('addEmployee', [testEmployee]).then(
        (result) => {
          result.should.be.equal(true);
          logger.info("result", result);
          done();
        });
    }
  );

  it('should handle list of employees', (done) => {
      const serviceClient = new ServiceClient(provider, '/es/1.0');
      serviceClient.invoke('list', [employees]).then(
        (result) => {
          logger.info("result", result);
          done();
        });
    }
  );

  it('should handle two args', (done) => {
      const serviceClient = new ServiceClient(provider, '/es/1.0');
      serviceClient.invoke('twoArg', [testEmployee2, true]).then(
        (result) => {
          logger.info("result", result);
          done();
        });
    }
  );

  it('should invoke a service call that returns JSON', (done) => {
      const serviceClient = new ServiceClient(provider, '/es/1.0');
      serviceClient.invoke('getEmployee', ["5"]).then(
        (result) => {
          should.exist(result);
          result.id.should.be.equal('5');
          logger.info("result", result);
          done();
        });
    }
  );

  it('should fail on an unknown method', (done) => {
      const serviceClient = new ServiceClient(provider, '/es/1.0');
      serviceClient.invoke('bogus').then().catch(
        (error) => {
          logger.info("error ", error);
          should.exist(error);
          error.should.be.equal('[Unable to find handler]');
          done();
        });
    }
  );

});

package.json

{
  "name": "qbit-service-client",
  "version": "1.1.3",
  "description": "Service Client for backend services using Vertx Eventbus.",
  "main": "distribution/index.js",
  "publishConfig": {
    "registry": "REDACTED"
  },
  "scripts": {
    "test": "NODE_PATH=./ mocha --require babel-core/register --require babel-polyfill --recursive --reporter spec",
    "build": "babel lib --out-dir distribution"
  },
  "dependencies": {
    "winston": "^2.2.0"
  },
  "devDependencies": {
    "babel-cli": "^6.6.5",
    "babel-core": "^6.7.2",
    "babel-plugin-transform-regenerator": "^6.6.5",
    "babel-polyfill": "^6.7.2",
    "babel-preset-es2015": "^6.6.0",
    "mocha": "^2.4.5",
    "should": "^8.2.2",
    "vertx3-eventbus-client": "^3.2.1"
  },
  "license": "MIT"
}

Java

Employee

package io.advantageous.qbit.example.vertx.eventbus.bridge;

public class Employee {


    private final String id;
    private final String firstName;
    private final String lastName;
    private final int birthYear;
    private final long socialSecurityNumber;

    public Employee(String id, String firstName, String lastName, int birthYear, long socialSecurityNumber) {
        this.id = id;
        this.firstName = firstName;
        this.lastName = lastName;
        this.birthYear = birthYear;
        this.socialSecurityNumber = socialSecurityNumber;
    }


    public String getFirstName() {
        return firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public int getBirthYear() {
        return birthYear;
    }

    public long getSocialSecurityNumber() {
        return socialSecurityNumber;
    }

    public String getId() {
        return id;
    }

    @Override
    public String toString() {
        return "Employee{" +
                "firstName='" + firstName + '\'' +
                ", lastName='" + lastName + '\'' +
                ", birthYear=" + birthYear +
                ", socialSecurityNumber=" + socialSecurityNumber +
                '}';
    }
}

Employee Service

package io.advantageous.qbit.example.vertx.eventbus.bridge;

import io.advantageous.qbit.annotation.RequestMapping;
import io.advantageous.qbit.annotation.RequestParam;
import io.advantageous.qbit.annotation.http.Bridge;
import io.advantageous.qbit.annotation.http.GET;
import io.advantageous.qbit.reactive.Callback;

import java.util.List;

@RequestMapping("/es/1.0")
public class EmployeeService {

    @Bridge("/employee/")
    public boolean addEmployee(final Employee employee) {
        System.out.println(employee);
        return true;
    }

    @Bridge("/employee/err/")
    public boolean addEmployeeError(final Employee employee) {
        throw new IllegalStateException("Employee can't be added");
    }

    @Bridge
    public void getEmployee(final Callback<Employee> callback, final String id) {
        callback.returnThis(new Employee(id, "Bob", "Jingles", 1962, 999999999));
    }


    @GET("/employee/")
    public void getEmployeeWithParam(final Callback<Employee> callback, 
                                     @RequestParam(value = "id", required = true) final String id) {
        callback.returnThis(new Employee(id, "Bob", "Jingles", 1962, 999999999));
    }

    @Bridge
    public Employee singleton(final Employee employee) {
        return employee;
    }

    @Bridge
    public boolean twoArg(final Employee employee, boolean flag) {

        return employee.getId().equals("rick") && flag;
    }

    @Bridge
    public List<Employee> list(final List<Employee> employees) {
        return employees;
    }

}

EventBusBridgeExample

package io.advantageous.qbit.example.vertx.eventbus.bridge;

import io.advantageous.qbit.admin.ManagedServiceBuilder;
import io.advantageous.qbit.http.server.HttpServer;
import io.advantageous.qbit.server.ServiceEndpointServer;
import io.advantageous.qbit.vertx.eventbus.bridge.VertxEventBusBridgeBuilder;
import io.advantageous.qbit.vertx.http.VertxHttpServerBuilder;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Vertx;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.sockjs.BridgeOptions;
import io.vertx.ext.web.handler.sockjs.PermittedOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;

/**
 * Send JSON POST.
 * <code>
 *     curl -X POST -H "Content-Type: application/json" \
 *     http://localhost:8080/es/1.0/employee/ \
 *     -d '{"id":"5","firstName":"Bob","lastName":"Jingles","birthYear":1962,"socialSecurityNumber":999999999}'
 * </code>
 *
 * Get JSON
 *
 * <code>
 *      curl http://localhost:8080/es/1.0/employee/?id=5
 * </code>
 */
public class EventBusBridgeExample extends AbstractVerticle {

    @Override
    public void start() throws Exception {


        final String address = "/es/1.0";
        final EmployeeService employeeService = new EmployeeService();
        final VertxEventBusBridgeBuilder vertxEventBusBridgeBuilder = VertxEventBusBridgeBuilder
                .vertxEventBusBridgeBuilder()
                .setVertx(vertx);
        final ManagedServiceBuilder managedServiceBuilder = ManagedServiceBuilder.managedServiceBuilder();
        final Router router = Router.router(vertx);


        managedServiceBuilder.setRootURI("/");
        vertxEventBusBridgeBuilder.addBridgeAddress(address, EmployeeService.class);
        /* Route everything under address to QBit http server. */
        router.route().path(address + "/*");
        /* Configure bridge at this HTTP/WebSocket URI. */
        router.route("/eventbus/*").handler(SockJSHandler.create(vertx).bridge(
                new BridgeOptions()
                        .addInboundPermitted(new PermittedOptions().setAddress(address))
                        .addOutboundPermitted(new PermittedOptions().setAddress(address))
        ));

        final io.vertx.core.http.HttpServer vertxHttpServer = vertx.createHttpServer();
        /*
         * Use the VertxHttpServerBuilder which is a special builder for Vertx/Qbit integration.
         */
        final HttpServer httpServer = VertxHttpServerBuilder.vertxHttpServerBuilder()
                .setRouter(router)
                .setHttpServer(vertxHttpServer)
                .setVertx(vertx)
                .build();


        final ServiceEndpointServer endpointServer = managedServiceBuilder.getEndpointServerBuilder()
                .setHttpServer(httpServer)
                .addService(employeeService)
                .build();
        vertxEventBusBridgeBuilder.setServiceBundle(endpointServer.serviceBundle()).build();
        endpointServer.startServer();
        vertxHttpServer.requestHandler(router::accept).listen(8080);


    }


    public static void main(String... args) throws Exception {
        final Vertx vertx = Vertx.vertx();
        vertx.deployVerticle(new EventBusBridgeExample());
    }
}

No comments:

Post a Comment

Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training