Rick

Rick
Rick

Thursday, May 5, 2016

Reactive Java code examples using Reakt (with links to docs for further info)

This is a presentation of different Reakt features in the context of a real application. I have renamed the classnames and such, but this is from an in-progress microservice. It demonstrates where you would use the Reakt pieces to build a reactive Java application.
This covers usage of Reakt:
  • blocking promises
  • Promises.all
  • invokable promise
  • Expected values
  • Circuit breakers
  • Working with the Reactor
  • Working with streams
  • Using AsyncSuppliers to create downstream services
  • Reakt Guava integration
  • Using promise.thenMap

Blocking promise example

Let's say you have an async microservices application, and you want to write some unit and integration tests. You want the tests to wait until the system starts up. Rather you want the system to notify the test when it is done loading.
NOTE: The examples below are written in Kotlin which is a JVM language from Idea. I use Kotlin because the example are easier to read and take up less space. If you know Java 8, you should follow no problem. Think of it like pseudo code. Kotlin works well with Java classes and such.

Loading a system for testing.

import io.advantageous.qbit.util.PortUtils
import io.advantageous.reakt.AsyncSupplier
import io.advantageous.reakt.promise.Promises

object DFTestUtils {

    val adminPort : AtomicInteger = AtomicInteger(9090)
    val eventBusPort : AtomicInteger = AtomicInteger(8080)

    fun loadSystem(): ServicePlatform {
        /** Load System. */
        val loadPromise = Promises.blockingPromiseNotify(Duration.ofSeconds(4))
        val servicePlatform = servicePlatform().withNamespace(Constants.TODO_SERVICE)
                .setWaitForStartup(true).setAdminPort(PortUtils.findOpenPortStartAt(adminPort.andIncrement))
                .setEventBusPort(PortUtils.findOpenPortStartAt(eventBusPort.andIncrement))
        Main.run(servicePlatform).invokeWithPromise(loadPromise)

        loadPromise.get() //Wait for the system to load before we start.
        Assert.assertFalse("system loaded", loadPromise.failure())
        return servicePlatform
    }
Notice that we create a promising using Promises.blockingPromiseNotify(Duration.ofSeconds(4)). We call loadPromise.get() to wait until the system loads. Blocking promises are good for testing.
Now we can use the loadSystem in our tests. Here is a test that does a health check against a running server.

Using loadSystem / blocking promise in our test

    @Test
    @Throws(Exception::class)
    fun mainHealthCheckTest() {

        val servicePlatform = loadSystem()
        val httpTextResponse = HttpClientBuilder.httpClientBuilder().setPort(servicePlatform.adminPort)
                .buildAndStart().get("/__admin/ok")

        assertNotNull(httpTextResponse)
        assertEquals("true", httpTextResponse.body())
        assertEquals(200, httpTextResponse.code().toLong())

        shutdownSystem(servicePlatform)
    }

Using invokable promises to notify when the entire system is done loading

The actual code that loads our system uses an invokable promise.

Load system calls Main.run which returns an invokable promise

import io.advantageous.reakt.AsyncSupplier
import io.advantageous.reakt.Callback
import io.advantageous.reakt.Stream
import io.advantageous.reakt.promise.Promise
import io.advantageous.reakt.promise.Promises.*
object Main {

...
    var repoServiceQueue: ServiceQueue? = null
    private val logger = LoggerFactory.getLogger(Main::class.java)
...
    fun main(args: Array<String>) {
        run(servicePlatform().withNamespace(TODO_SERVICE)).invoke()
    }

...
    fun run(servicePlatform: ServicePlatform): Promise<Void> {
        return invokablePromise { donePromise ->

            val loadCollectionServicePromise = promiseBoolean()
            val loadTodoServicePromise = promiseBoolean()

            val loadPromise = all(loadCollectionServicePromise,
                    loadTodoServicePromise)
                    .thenPromise(donePromise)

            createCollectionService(servicePlatform, loadPromise)
            createTodoServiceService(servicePlatform, loadPromise)
            servicePlatform.start()
        }
    }

Working with Promises.all

There is a lot going on here. The run method is using Promises.invokablePromise. Then the run method uses Promises.all to chain the promise loadCollectionServicePromise and loadTodoServicePromise together. The method Promises.all is used when you want all of the promises to trigger then the all promise triggers. This way you are being notified when both the createCollectionService and the createTodoServiceService async reply. You don't want to start testing before the system is initialized.

Working with Reakt Streams

The project that I am working on uses leader election from elekt. Elekt, leadership API lib, uses Reakt streams. For testing, we just simulate the Elekt consul support.
The LeaderElector looks like this:

LeaderElector uses Reakt streams support

public interface LeaderElector {

    /**
     * Attempt to elect this service as leader.
     * Returns true if successful, and false if not successful.
     * @param endpoint endpoint describes the host and port of the leader.
     * @param callback callback
     */
    void selfElect(final Endpoint endpoint, final Callback<Boolean> callback);

    /**
     *
     * This will send leadership changes as the occur over the stream.
     *
     * @param callback callback returns new leader.
     */
    void leadershipChangeNotice(final Stream<Endpoint> callback);

    /**
     *
     * This will come back quickly with a new Leader.
     * If no Endpoint is returned in the callback then there is no leader.
     *
     * @param callback callback returns new leader.
     */
    void getLeader(final Callback<Endpoint> callback);


}
To simulate that for integration testing, we use this mock LeaderElector.

Using a test LeaderElector stream for integration testing

    private fun createLeaderElector(): Supplier<LeaderElector> {
        return Supplier {
            object : LeaderElector {
                override fun leadershipChangeNotice(stream: Stream<Endpoint>?) {
                    logger.info("Leader notice registered")
                    val idOfServer = Identity()
                    stream?.reply(Endpoint(idOfServer.host, idOfServer.servicePort))
                    Thread({
                        Thread.currentThread().isDaemon = true
                        while (true) {
                            Thread.sleep(1000 * 10)
                            stream?.reply(Endpoint(idOfServer.host, idOfServer.servicePort))
                        }
                    })
                }

                override fun selfElect(endpoint: Endpoint?, callback: Callback<Boolean>?) {
                    logger.info("Self elect was called")
                    callback?.resolve(true);
                }

                override fun getLeader(callback: Callback<Endpoint>?) {
                    logger.info("Self elect was called")
                    val idOfServer = Identity()
                    callback?.resolve(Endpoint(idOfServer.host, idOfServer.servicePort))
                }
            }
        }
    }
Notice the call to stream.reply to send a stream of leader elect notifications that this server has been elected the leader.

Reakt Expected and Circuit breakers

It is important to monitor the health of your system, and sometimes it is good not to beat a dead horse. If downstream services are broken there is no point in using them until the are fixed. In Reakt we use Circuit Breakers and Expected to handle when some service is support to be there or some value is expected.
Let's demonstrate this with MetricsCollectionServiceImpl.

MetricsCollectionServiceImpl

import io.advantageous.reakt.Breaker
import io.advantageous.reakt.Breaker.*
import io.advantageous.reakt.Expected
import java.util.function.Function
import java.util.function.Supplier

/**
 * Metrics Collection Service.
 * Manages finding leader with LeaderElector.
 */
class MetricsCollectionServiceImpl
/**
 * @param mgmt                            ServiceManagementBundle (from QBit)
 * *
 * @param leaderElectorSupplier           leaderElectorSupplier for connecting to the leader elector.
 * *
 * @param todoListServiceSupplier todoListServiceSupplier for connecting
 * *                                        to the todo service.
 * *
 * @param metricRepository                metric repo for storing metrics
 */
(
        /**
         * Service management bundle which includes stats collection, Reakt reactor, QBit health management, and more.
         */
        private val mgmt: ServiceManagementBundle,
        /**
         * Supplies a leader elector interface. LeaderElector is from Elekt.
         */
        private val leaderElectorSupplier: Supplier<LeaderElector>,
        /**
         * This is used to create a supplier.
         */
        private val todoListServiceSupplier: Function<Endpoint, TodoServiceClient>,
        /**
         * Metric repository for saving repositories.
         */
        private val metricRepository: MetricRepositoryClient)
: MetricsCollectionService {


    /**
     * The current leaderEndpoint which starts out empty.
     */
    private var leaderEndpoint = Expected.empty<Endpoint>()
    /**
     * The actual todoService wrapped in a Reakt Circuit breaker.
     */
    private var todoService = Breaker.opened<TodoServiceClient>()
    /**
     * The leader elector we are using, wrapped in Reakt Circuit breaker
     */
    private var leaderElectorBreaker: Breaker<LeaderElector> = Breaker.opened()
    /**
     * Call count per second.
     */
    private var callCount: Long = 0

    init {
        /*
         * Check circuit breaker health every 10 seconds.
         */
        mgmt.reactor().addRepeatingTask(seconds(10)) {
            healthCheck()
        }
        createLeaderElector()
        mgmt.reactor().addRepeatingTask(seconds(1)) { throughPut() }
        mgmt.reactor().addRepeatingTask(millis(100), { flushService(metricRepository) })
    }
Notice that we are using Reakt circuit breakers. Notice we are using Reakt reactor's Reactor.addRepeatingTask to periodically check the health of our repo. Reakt's Reactor is used to manage callbacks so they execute on this thread, callback timeouts, and repeating tasks.
Let's look at the healthCheck that runs every 10 seconds to see how circuit breakers work.

healthCheck that runs every 10 seconds via a Reakt Reactor task

    private fun healthCheck() {
        if (mgmt.isFailing) {
            logger.warn("CollectionService Health is suspect")
        } else {
            logger.debug("CollectionService is Healthy")
        }

        leaderElectorBreaker.ifBroken {
            createLeaderElector()
        }

         /* If the TODO service is broken, i.e. the circuit is open then do... */
        todoService.ifBroken {
            /* Check to see if we have a leaderEndpoint. */
            leaderEndpoint
                    .ifPresent { leaderEndpoint ->
                        this.handleNewLeader(leaderEndpoint)
                    }
                    /* If we don't have a leaderEndpoint, then look it up. */
                    .ifEmpty {
                        /* Look up the endpoint if the elector is not broken. */  
                        leaderElectorBreaker
                                .ifOk { elector ->
                                    this.lookupLeader(elector)
                                }
                                .ifBroken {
                                    logger.warn("We have no leader and the leader elector is down")
                                }
                    }
        }
    }
The leaderEndpoint is an expected value that might not exist. The methods ifOk and ifBroken are from circuit breaker. The ifOk means the fuse it not burned out. The ifBroken means the fuse blew. As you can see combining Expected values and services wrapped in Breakers allows us to simplify and reasoning on what to do if things go down.
When a fuse opens or breaks, then we can work around it. Here is how we mark a broken breaker.

Marking a service Breaker as broken (the fuse is open)

        try {
            leaderElectorBreaker = Breaker.operational(leaderElectorSupplier.get())
            leaderElectorBreaker
                    .ifOk { this.lookupLeader(it) }
                    .ifBroken {
                        logger.error("Unable to connect to leader supplier")
                    }

            if (leaderElectorBreaker.isOk)
                mgmt.increment("leader.elector.create.success")
            else
                mgmt.increment("leader.elector.create.fail")

            leaderElectorBreaker.ifOk {
                this.handleElectionStream(it)
            }
        } catch (ex: Exception) {
            mgmt.increment("leader.elector.create.fail.exception")
            logger.error("Unable to connect to leader supplier", ex)
            leaderElectorBreaker = Breaker.broken<LeaderElector>()
        }
Notice the use of Breaker.operational to denote that we have a new service to work with that should work. Then if the service fails, we mark it has broken with Breaker.broken.

Working with Reakt Streams

Here is us handling the election stream that we showed a mock-up of earlier.

Working with Reakt Streams

    private fun handleElectionStream(leaderElector: LeaderElector) {
        leaderElector.leadershipChangeNotice { result ->
            result
                    .catchError { error -> // Run on this service thread
                        mgmt.reactor()
                                .deferRun {
                                    logger.error("Error handling election stream")
                                    mgmt.increment("leader.stream.elect.error")
                                    this.leaderElectorBreaker = broken<LeaderElector>()
                                    createLeaderElector()
                                }
                    }
                    .then { endpoint -> // Run on this service thread
                        mgmt.reactor().deferRun {
                            mgmt.increment("leader.stream.elect.notify")
                            logger.info("New Leader Notify {} {}", endpoint.host, endpoint.port)
                            handleSuccessfulLeaderLookupOrStream(endpoint)
                        }
                    }
        }
    }
Notice that we use reactor.deferRun so we can handle this stream on this services thread.
Now let's show another example of Promises.all. We have a Cassandra service that wants to write a heap of records to the DB. It wants to write the records in parallel.
/**
 * Stores Metric data and results into Cassandra.
 */
internal class CassandraMetricRepository
/**
 * @param sessionAsyncSupplier supplier to supply Cassandra session.
 * @param serviceMgmt              serviceMgmt to manage callbacks and repeating tasks.
 * @param promise              returns when cassandra initializes.
 *
 */
(
        /**
         * Cassandra Session supplier.
         */
        private val sessionAsyncSupplier: AsyncSupplier<Session>,
        /**
         * QBit serviceMgmt for repeating tasks, stats, time and callbacks that execute on the caller's thread.
         */
        private val serviceMgmt: ServiceManagementBundle,
        promise: Promise<Boolean>) : MetricRepositoryService {
    /**
     * generate the sequence for backup.
     */
    private val sequenceGen = AtomicLong(2)
    /**
     * Reference to the cassandra session which get connected to async.
     */
    private var sessionBreaker = Breaker.opened<Session>()
    /**
     * Error counts from Cassandra driver for the last time period.
     */
    private val errorCount = AtomicLong()
...
Notice that we create our sessionBreaker, which is our reference to Cassandra as an opened Circuit. We define a sessionAsyncSupplier An AsyncSupplier is also from Reakt. It is like a regular Supplier except it is async.
We use the reactor to define a repeating task to check the health of the Cassandra connection.

Using the reactor

    init {

        /* Connect the Cassandra session. */
        connectSession(promise)

        /*
             This makes sure we are connected.
             It provides circuit breaker if sessionBreaker is down to auto reconnect.
         */
        serviceMgmt.reactor().addRepeatingTask(Duration.ofSeconds(5)) 
                          { this.cassandraCircuitBreaker() }
    }
There we check for the health of our Cassandra session and if it goes down, we try to reconnect just like before.
We use the circuit breaker to do alternative logic if our connection goes down.

using alternative Breaker logic

    override fun recordMetrics(callback: Callback<Boolean>, metrics: List<Metric>) {
        sessionBreaker()
                /* if we are not connected, fail fast. */
                .ifBroken { callback.reject("Not connected to Cassandra") }
                /* If we are connected then call cassandra. */
                .ifOk { session -> doStoreMetrics(session, callback, metrics) }
    }
Note the use of ifBroken and ifOk. This way we can control the reconnect.
The method doStoreMetrics stores many records to Cassandra asynchronously, and even though it saves records in parallel it does not let its caller know via a callback, unless all of the records were stored.

Using reactor.all to coordinate many async calls

    /**
     * Does the low level cassandra storage.
     */
    private fun doStoreMetrics(session: Session,
                                  callback: Callback<Boolean>,
                                  metrics: List<Metric>) {

        logger.debug("Storing metrics {}", metricss.size)
        /* Make many calls to cassandra using its async lib to recordMetrics
        each imprint. */
        val promises = metrics.map({ metric -> doStoreMetric(session, metric) }).toList()
        /*
         * Create a parent promise to contain all of the promises we
         * just created for each imprint.
         */
        serviceMgmt.reactor().all(promises)
                .then {
                    serviceMgmt.increment("bulk.store.success);
                    logger.info("metrics were stored {}", metrics.size)
                    callback.resolve(true)
                }
                .catchError { error ->
                    serviceMgmt.increment("bulk.store.error);
                    logger.error("Problem storing metrics ${metrics.size}", error)
                    callback.reject(error)
                }
    }
It does this call coordination by using reactor.all to create a promise that only replies if all of the other promise reply. The method doStoreMetric returns a single promise. We use Kotlin streams (just like Java streams but more concise) to turn the list of metrics into a list of calls to doStoreMetric into a list of Promises which we then pass to reactor.all to make all of those promises into a single promise.
The doStoreMetric uses Reakt Guava/Cassandra integration to turn a ListableFuture into a Reakt promise.

Working with Reakt Cassandra / Guava support, and using thenMap

import io.advantageous.reakt.guava.Guava.registerCallback

    private fun doStoreMetric(session: Session,
                                metric : Metric): Promise<Boolean> {
        val resultSetFuture = session.executeAsync(QueryBuilder.insertInto(METRICS_TABLE)
                .value("employeeId", metric.employeeId)
                .value("metricType", metric.metricType.name.toLowerCase())
                .value("metricName", metric.metricName)
                .value("provider", metric.provider)
                .value("externalId", metric.externalId)
                .value("value", metric.value)
                .value("surrogateKey", metric.surrogateKey)
                .value("created_at", metric.timestamp))
        return createPromiseFromResultSetFutureForStore(resultSetFuture, "Storing Metric")
    }


    private fun createPromiseFromResultSetFutureForStore(resultSetFuture: ResultSetFuture,
                                                         message: String): Promise<Boolean> {

        val resultSetPromise = serviceMgmt.reactor().promise<ResultSet>()

        val promise = resultSetPromise.thenMap({ it.wasApplied() }).catchError { error ->
            if (error is DriverException) {
                callback.ifPresent { callback1 -> callback1.reject(error.message, error) }
                logger.error("Error " + message, error)
                errorCount.incrementAndGet()
            }
        }
        registerCallback<ResultSet>(resultSetFuture, resultSetPromise)
        return promise
    }

Using thenMap to convert a promise into another type of Promise

Notice we use registerCallback from the Reakt Guava integration to convert the future into a promise. We also use promise.thenMap to convert a Promise into a Promise.

Using invokable Promises inside of an Actor or Managed event loop

Using invokeWithReactor

override fun collectTodo(callback: Callback<Boolean>,
                                todoList: List<Todo>) {
        callCount++
        todoRepo.recordTodoList(todoList)
                .then { ok ->
                    todoService
                            .ifOk { todoService1 ->
                                doCollectWithCallback(callback, todoList, todoService1)
                            }
                            .ifBroken {
                                mgmt.increment("collect.call.df.service.broken")
                                logger.error("Connection to todoService is down.")
                                mgmt.increment("collect.broken")
                            }
                }
                .catchError { error ->
                    mgmt.setFailing()
                    logger.error("Connection to cassandra is down.", error)
                    callback.reject("Connection to cassandra is down. " + error.message, error)
                }
                .invokeWithReactor(mgmt.reactor())
    }
You can invoke invokable promises in the context of a Reactor by using invokeWithReactor(mgmt.reactor()). This allows the callback handlers from the promises to run in the same thread as the service actor or event loop.

Conclusion

I hope you enjoyed this article. It links back to areas of the Reakt documentation where you can find more details. If you are new to Reakt and what to understand the question Why Reakt and What is Reakt I suggest reading this. Also this interview about Reakt might help.

No comments:

Post a Comment

Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training