Rick

Rick
Rick

Wednesday, August 19, 2015

Request filtering based on headers in QBit - Filtering requests with HttpRequest shouldContinue predicate

We added support for doing things that you would normally do in a ServletFilter or its ilk. We had the hook there already and the Predicate already allowed you to chainPredicates but this did not address that we started to use Predicates to wire in health checks and stat check endpoints. We added a mechanism to create chains of predicates. The first one that returns false, the chain stops processing.
The HTTP server allows you to pass a predicate.
setShouldContinueHttpRequest(Predicate<HttpRequest> predicate) 
Predicate<HttpRequest> predicate.
The predicate allows for things like security interception. Look for an auth header. Reject request if auth header is not in place.
Predicates are nest-able.
It is often the case, that you will want to run more than one predicate.
To support this, we added addShouldContinueHttpRequestPredicate(final Predicate<HttpRequest> predicate) to the HttpServerBuilder.
The HttpServerBuilder will keep a list of predicates, and register them with the HttpServer when it builds the http server.
You can add your own predicates or replace the default predicate mechanism.

HttpServerBuilder

    private RequestContinuePredicate requestContinuePredicate = null;

    public RequestContinuePredicate getRequestContinuePredicate() {
        if (requestContinuePredicate == null) {
            requestContinuePredicate = new RequestContinuePredicate();
        }
        return requestContinuePredicate;
    }

    public HttpServerBuilder setRequestContinuePredicate(final RequestContinuePredicate requestContinuePredicate) {
        this.requestContinuePredicate = requestContinuePredicate;
        return this;
    }

    public HttpServerBuilder addShouldContinueHttpRequestPredicate(final Predicate<HttpRequest> predicate) {
        getRequestContinuePredicate().add(predicate);
        return this;
    }



public class RequestContinuePredicate implements Predicate<HttpRequest>{

    private final CopyOnWriteArrayList <Predicate<HttpRequest>> predicates = new CopyOnWriteArrayList<>();

    public RequestContinuePredicate add(Predicate<HttpRequest> predicate) {
        predicates.add(predicate);
        return this;
    }

    @Override
    public boolean test(final HttpRequest httpRequest) {
        boolean shouldContinue;

        for (Predicate<HttpRequest> shouldContinuePredicate : predicates) {
            shouldContinue = shouldContinuePredicate.test(httpRequest);
            if (!shouldContinue) {
                return false;
            }
        }
        return true;
    }
}
We added a bunch of unit tests to make sure this actually works. :)
We created an example to show how this works.
package com.mammatustech;


import io.advantageous.qbit.admin.ManagedServiceBuilder;
import io.advantageous.qbit.annotation.RequestMapping;
import io.advantageous.qbit.http.request.HttpRequest;
import io.advantageous.qbit.http.server.HttpServerBuilder;

import java.util.function.Predicate;

/**
 * Default port for admin is 7777.
 * Default port for main endpoint is 8080.
 *
 * <pre>
 * <code>
 *
 *     Access the service:
 *
 *    $ curl http://localhost:8080/root/hello/hello
 *
 *    This above will respond "shove off".
 *
 *    $ curl --header "X-SECURITY-TOKEN: shibboleth" http://localhost:8080/root/hello/hello
 *
 *    This will get your hello message.
 *
 *     To see swagger file for this service:
 *
 *    $ curl http://localhost:7777/__admin/meta/
 *
 *     To see health for this service:
 *
 *    $ curl http://localhost:8080/__health
 *     Returns "ok" if all registered health systems are healthy.
 *
 *     OR if same port endpoint health is disabled then:
 *
 *    $ curl http://localhost:7777/__admin/ok
 *     Returns "true" if all registered health systems are healthy.
 *
 *
 *     A node is a service, service bundle, queue, or server endpoint that is being monitored.
 *
 *     List all service nodes or endpoints
 *
 *    $ curl http://localhost:7777/__admin/all-nodes/
 *
 *
 *      List healthy nodes by name:
 *
 *    $ curl http://localhost:7777/__admin/healthy-nodes/
 *
 *      List complete node information:
 *
 *    $ curl http://localhost:7777/__admin/load-nodes/
 *
 *
 *      Show service stats and metrics
 *
 *    $ curl http://localhost:8080/__stats/instance
 * </code>
 * </pre>
 */
@RequestMapping("/hello")
public class HelloWorldService {


    @RequestMapping("/hello")
    public String hello() {
        return "hello " + System.currentTimeMillis();
    }

    public static void main(final String... args) {
        final ManagedServiceBuilder managedServiceBuilder =
                ManagedServiceBuilder.managedServiceBuilder().setRootURI("/root");

        final HttpServerBuilder httpServerBuilder = managedServiceBuilder.getHttpServerBuilder();

        /** We can register our security token checker here. */
        httpServerBuilder.addShouldContinueHttpRequestPredicate(HelloWorldService::checkAuth);

        /* Start the service. */
        managedServiceBuilder.addEndpointService(new HelloWorldService())
                .getEndpointServerBuilder()
                .build().startServer();

        /* Start the admin builder which exposes health end-points and meta data. */
        managedServiceBuilder.getAdminBuilder().build().startServer();

        System.out.println("Servers started");


    }

    /**
     * Checks to see if the header <code>X-SECURITY-TOKEN</code> is set to "shibboleth".
     * @param httpRequest http request
     * @return true if we should continue, i.e., auth passed, false otherwise.
     */
    private static boolean checkAuth(final HttpRequest httpRequest) {

        /* Only check uri's that start with /root/hello. */
        if (httpRequest.getUri().startsWith("/root/hello")) {

            final String x_security_token = httpRequest.headers().getFirst("X-SECURITY-TOKEN");

            /* If the security token is set to "shibboleth" then continue processing the request. */
            if ("shibboleth".equals(x_security_token)) {
                return true;
            } else {
                /* Security token was not what we expected so send a 401 auth failed. */
                httpRequest.getReceiver().response(401, "application/json", "\"shove off\"");
                return false;
            }
        }
        return true;
    }
}
To exercise this and show that it is working, let's use curl.

Pass the header token

$ curl --header "X-SECURITY-TOKEN: shibboleth" http://localhost:8080/root/hello/hello
"hello 1440012093122"

No header token

Health request

You may wonder why/how health comes up in this conversation. It is clear really.EndpointServerBuilder and ManagedServiceBuilder configure the health system as a should you continue Predicate as well.

EndpointServerBuilder

public class EndpointServerBuilder {
    public EndpointServerBuilder setupHealthAndStats(final HttpServerBuilder httpServerBuilder) {

        if (isEnableStatEndpoint() || isEnableHealthEndpoint()) {
            final boolean healthEnabled = isEnableHealthEndpoint();
            final boolean statsEnabled = isEnableStatEndpoint();


            final HealthServiceAsync healthServiceAsync = healthEnabled ? getHealthService() : null;

            final StatCollection statCollection = statsEnabled ? getStatsCollection() : null;

            httpServerBuilder.addShouldContinueHttpRequestPredicate(
                    new EndPointHealthPredicate(healthEnabled, statsEnabled,
                            healthServiceAsync, statCollection));
        }


        return this;
    }

package io.advantageous.qbit.server;

import io.advantageous.boon.json.JsonFactory;
import io.advantageous.qbit.http.request.HttpRequest;
import io.advantageous.qbit.service.health.HealthServiceAsync;
import io.advantageous.qbit.service.stats.StatCollection;

import java.util.function.Predicate;

public class EndPointHealthPredicate  implements Predicate<HttpRequest>  {

    private final boolean healthEnabled;
    private final boolean statsEnabled;
    private final HealthServiceAsync healthServiceAsync;
    private final StatCollection statCollection;


    public EndPointHealthPredicate(boolean healthEnabled, boolean statsEnabled,
                                   HealthServiceAsync healthServiceAsync, StatCollection statCollection
                                   ) {
        this.healthEnabled = healthEnabled;
        this.statsEnabled = statsEnabled;
        this.healthServiceAsync = healthServiceAsync;
        this.statCollection = statCollection;
    }

    @Override
    public boolean test(final HttpRequest httpRequest) {

        boolean shouldContinue = true;
        if (healthEnabled && httpRequest.getUri().startsWith("/__health")) {
            healthServiceAsync.ok(ok -> {
                if (ok) {
                    httpRequest.getReceiver().respondOK("\"ok\"");
                } else {
                    httpRequest.getReceiver().error("\"fail\"");
                }
            });
            shouldContinue = false;
        } else if (statsEnabled && httpRequest.getUri().startsWith("/__stats")) {

            if (httpRequest.getUri().equals("/__stats/instance")) {
                if (statCollection != null) {
                    statCollection.collect(stats -> {
                        String json = JsonFactory.toJson(stats);
                        httpRequest.getReceiver().respondOK(json);
                    });
                } else {
                    httpRequest.getReceiver().error("\"failed to load stats collector\"");
                }
            } else if (httpRequest.getUri().equals("/__stats/global")) {
                /* We don't support global stats, yet. */
                httpRequest.getReceiver().respondOK("{\"version\":1}");
            } else {

                httpRequest.getReceiver().notFound();
            }
            shouldContinue = false;
        }

        return shouldContinue;

    }
}

Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training