Rick

Rick
Rick

Sunday, November 22, 2015

QBit, Microservices Lib, MDC - Logging Mapped Diagnostic Context, RequestContext and HttpContext

QBit, microservices lib, has implemented the Logging Mapped Diagnostic Context (MDC) to make debugging microservices easier. Prior to MDC integration it was difficult to track request information in logs. Now we have added MDC integration it is easy to track Request URI, and more even beyond thread boundaries of internal services.
We added support for MDC to log the requests coming from Microservice you write like:
  • UserID
  • First name, last name
  • Request URI
  • remote address
  • browser/use-agent
For microservice to microservice calls (service to service) we can track the URI, source IP of the system, request URI. You should also be able to track (or set) any extra headers that will make sense to log in the context of the application.
This allows you to customize the Pattern used to log - and allows items to be properly pushed to SplunkGreyLog, or LogStash. This allows the MDC fields to be used for real-time operational intelligence using tools like SplunkLogStash and GreyLog. This allows you to search, analyze and log streams from your microservices log to learn about usages of these services which is critical for debugging and learning about how your microservices are used. Distributed logging and distributed log analysis is sort of a given in a microservices architecture.
Modern logging systems are designed to audit and debug distributed applications. QBit being a reactive microservice lib allows the creation of in-proc services and remote services, i.e., distributed applications. The in-proc services run in one or more threads using an actor/service queue model. QBit MDC support crosses the Thread/Queue/Actor Message boundary to facilitated debugging and provide a complete async call stack which is essential for debugging a message passing system like QBit which relies of ServiceQueue's.
Since QBit, the microservices lib, focuses on creating distributed systems you have to deal with multiple clients simultaneously when dealing with logging and auditing. The concepts of MDC (Mapped Diagnostic Contexts) was covered in the book Logging Diagnostic Messages in Pattern Languages of Program Design 3, by Neil Harrison (Addison-Wesley, 1997). QBit uses SLF4J API for Mapped Diagnostic Contexts (MDC).

Examples of using MDC

QBit provides the class ManagedServiceBuilder which is a utility class for when you are running in a PaaS like Heroku or Docker. It also allows you to share stat, health and system manager setup. You can also do things like enable stats (statsD support baked in), and install service discovery / distributed health (Consul, SkyDNS), etc.
The ManagedServiceBuilder to support MDC now provides a methodenableLoggingMappedDiagnosticContext(). This installs the correct QBit interceptors to provide MDC logging and to create an async service call stack.
Let's demonstrate how this works with a simple example RestService.

RestService example to demonstrate MDC

@RequestMapping ("rest")
public class RestService {

    private final Logger logger = LoggerFactory.getLogger(RestService.class);

...

    @RequestMapping ("mdc")
    public void mdc(final Callback<Map<String,String>> callback) {
        logger.info("CALLED MDC");
        callback.returnThis(MDC.getCopyOfContextMap());
    }
...
To make mdc work for the log, we will add the logback.xml file to the Java resources.

/resources/logback.xml

<configuration>

    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n- 
%X{requestRemoteAddress} - %X{requestUri} - %X{requestHttpMethod}%n</pattern>
        </encoder>
    </appender>


    <logger name="io.advantageous.qbit" level="DEBUG"/>

    <root level="INFO">
        <appender-ref ref="STDOUT"/>
    </root>
</configuration>
Notice the syntax %X{requestRemoteAddress}, this allows you to access the current HTTP request's remote address.
When constructing this service, you have to call enableLoggingMappedDiagnosticContext.

Turning on MDC support for ManagedServiceBuilder

        final ManagedServiceBuilder managedServiceBuilder = ManagedServiceBuilder.managedServiceBuilder();
        managedServiceBuilder.setRootURI("/");
        managedServiceBuilder.enableLoggingMappedDiagnosticContext();

        final RestService restService = new RestService();

        managedServiceBuilder.addEndpointService(restService);


        managedServiceBuilder.getEndpointServerBuilder().build().startServer();
At this point you can access the service with curl as follows:

Accessing the service with curl to see mdc

 curl http://localhost:8080/rest/mdc | jq .
{
  "requestRemoteAddress": "0:0:0:0:0:0:0:1:63772",
  "requestUri": "/rest/mdc",
  "requestHttpMethod": "GET"
}

Output of log

18:57:00.516 [QueueListener|Send Queue  rest] INFO  i.a.qbit.example.mdc.RestService - CALLED MDC
- 0:0:0:0:0:0:0:1:63797 - /rest/mdc - GET
Notice that the requestRemoteAddressrequestUri, and requestHttpMethod was output to the log.
Note that the enableLoggingMappedDiagnosticContext allows you to pass in N number of header names which will become part of the logging MDC and if you are using Splunk, GreyLog or LogStash will become part of the custom fields that you can parse. Once you use LogStash, Splunk or GreyLog with custom fields for headers, and requests etc. for debugging and log analysis, you will not know how you did it without it.
This is important and nice. But what if you have downstream services, i.e., ServiceQueue service or a ServiceBundle service. These services are running on other threads. Will QBit pass handle this case? Yes. Yes it will.

Downstream services example

To show QBit's ability to breach the thread chasm of other actor service queues, let's create some services.
And, in addition let's show off RequestContext which QBit uses to create this context by creating a service queue call stack to show the call stack.

Service to show capturing service call stack and MDC working N-levels deep

public interface InternalService {

    void getCallStack(Callback<List<String>> listCallback);
}

....


public class InternalServiceImpl {


    private final Logger logger = LoggerFactory.getLogger(InternalServiceImpl.class);
    private final RequestContext requestContext;

    public InternalServiceImpl(final RequestContext requestContext) {
        this.requestContext = requestContext;
    }

    public List<String> getCallStack() {

        logger.info("GET CallStack called");

        final Optional<MethodCall<Object>> currentMethodCall = requestContext.getMethodCall();

        if (!currentMethodCall.isPresent()) {
            logger.info("Method not found");
            return Arrays.asList("MethodCall Not Found");
        }

        final List<String> callStack = new ArrayList<>();
        MethodCall<Object> methodCall = currentMethodCall.get();
        callStack.add("Service Call(" + methodCall.objectName()
                + "." + methodCall.name() + ")");

        while (methodCall!=null) {

            final Request<Object> request = methodCall.originatingRequest();
            if (request ==null) {
                methodCall = null;
            } else if (request instanceof MethodCall) {
                methodCall = ((MethodCall<Object>) request);
                callStack.add("Service Call(" + methodCall.objectName()
                        + "." + methodCall.name() + ")");
            } else if (request instanceof HttpRequest) {
                final HttpRequest httpRequest = ((HttpRequest) request);

                callStack.add("REST Call(" + httpRequest.getRemoteAddress()
                        + "." + httpRequest.getUri() + ")");

                methodCall = null;
            } else {
                methodCall = null;
            }
        }

        return callStack;
    }

}
Now let's wire in this service twice. Once being used from a ServiceQueue (fastest) and once using a ServiceBundle. Notice the above calls logger.info("GET CallStack called"), and then it uses requestContext.getMethodCall() to get the current MethodCall in QBit. Note that aMethodCall is a Request in QBit, and an HttpRequest is a Request as well. You can now track the current MethodCall all the way back to the HttpRequest using therequest.originatingRequest() method as shown above. We use originatingRequest to find the original HttpRequest and all of the MethodCall's in-between.

Wiring in InternalServiceImpl to show service queue call stack and logging mdc

    public static void main(String... args) throws Exception {
        final ManagedServiceBuilder managedServiceBuilder = ManagedServiceBuilder.managedServiceBuilder();


        managedServiceBuilder.setRootURI("/");

        managedServiceBuilder.enableLoggingMappedDiagnosticContext();

        /** Create Service from Service Queue. */
        final InternalService internalServiceFromServiceQueue = getInternalServiceFromServiceQueue(managedServiceBuilder);


        /** Create Service from Service Bundle. */
        final InternalService internalServiceFromServiceBundle = getInternalServiceFromServiceBundle(managedServiceBuilder);


        final StatsCollector statsCollectorForRest = managedServiceBuilder.getStatServiceBuilder().buildStatsCollector();

        final RestService restService = new RestService(internalServiceFromServiceBundle,
                internalServiceFromServiceQueue,
                ReactorBuilder.reactorBuilder().build(), Timer.timer(), statsCollectorForRest);

        managedServiceBuilder.addEndpointService(restService);


        managedServiceBuilder.getEndpointServerBuilder().build().startServer();


    }

    private static InternalService getInternalServiceFromServiceQueue(ManagedServiceBuilder managedServiceBuilder) {
        final InternalServiceImpl internalServiceImpl = new InternalServiceImpl(new RequestContext());
        final ServiceBuilder serviceBuilderForServiceObject = managedServiceBuilder.createServiceBuilderForServiceObject(internalServiceImpl);
        final ServiceQueue serviceQueue = serviceBuilderForServiceObject.buildAndStartAll();
        return serviceQueue.createProxy(InternalService.class);
    }


    private static InternalService getInternalServiceFromServiceBundle(ManagedServiceBuilder managedServiceBuilder) {
        final InternalServiceImpl internalServiceImpl = new InternalServiceImpl(new RequestContext());
        final ServiceBundle serviceBundle = managedServiceBuilder.createServiceBundleBuilder().build().startServiceBundle();
        serviceBundle.addServiceObject("myService", internalServiceImpl);
        return serviceBundle.createLocalProxy(InternalService.class, "myService");
    }
Then we use these services from the RestService example that we created to show a call stack from a ServiceQueue and a call stack from a ServiceBundle.

Using services to show call stack from ServiceQueue andServiceBundle.

@RequestMapping ("rest")
public class RestService extends BaseService {

    private final Logger logger = LoggerFactory.getLogger(RestService.class);
    private final InternalService internalServiceFromServiceQueue;
    private final InternalService internalServiceFromServiceBundle;

    public RestService(final InternalService internalServiceFromServiceBundle,
                       final InternalService internalServiceFromServiceQueue,
                       final Reactor reactor,
                       final Timer timer,
                       final StatsCollector statsCollector) {
        super(reactor, timer, statsCollector);
        this.internalServiceFromServiceBundle = internalServiceFromServiceBundle;
        this.internalServiceFromServiceQueue = internalServiceFromServiceQueue;
        reactor.addServiceToFlush(internalServiceFromServiceBundle);
        reactor.addServiceToFlush(internalServiceFromServiceQueue);
    }

    @RequestMapping ("callstack/queue")
    public void callStackFromQueue(final Callback<List<String>> callback) {
        logger.info("Logger {}", MDC.getCopyOfContextMap());
        internalServiceFromServiceQueue.getCallStack(callback);
    }

    @RequestMapping ("callstack/bundle")
    public void callStackFromBundle(final Callback<List<String>> callback) {
        logger.info("Logger {}", MDC.getCopyOfContextMap());
        internalServiceFromServiceBundle.getCallStack(callback);
    }
Now let's call this service with REST and see the results.

Calling REST service to see example service queue call stack

$ curl http://localhost:8080/rest/callstack/queue | jq .
[
  "Service Call(.getCallStack)",
  "Service Call(restservice.callStackFromQueue)",
  "REST Call(0:0:0:0:0:0:0:1:63881./rest/callstack/queue)"
]

Calling REST service to see example service bundle call stack

$ curl http://localhost:8080/rest/callstack/bundle | jq .
[
  "Service Call(myService.getCallStack)",
  "Service Call(restservice.callStackFromBundle)",
  "REST Call(0:0:0:0:0:0:0:1:63899./rest/callstack/bundle)"
]

Output

19:20:12.807 [QueueListener|Send Queue  rest] INFO  i.a.qbit.example.mdc.RestService - Logger {requestRemoteAddress=0:0:0:0:0:0:0:1:63909, requestUri=/rest/callstack/queue, requestHttpMethod=GET}
- 0:0:0:0:0:0:0:1:63909 - /rest/callstack/queue - GET

19:20:12.808 [QueueListener|Send Queue  internalserviceimpl] INFO  i.a.q.e.mdc.InternalServiceImpl - GET CallStack called
- 0:0:0:0:0:0:0:1:63909 - /rest/callstack/queue - GET

19:20:14.906 [QueueListener|Send Queue  rest] INFO  i.a.qbit.example.mdc.RestService - Logger {requestRemoteAddress=0:0:0:0:0:0:0:1:63910, requestUri=/rest/callstack/bundle, requestHttpMethod=GET}
- 0:0:0:0:0:0:0:1:63910 - /rest/callstack/bundle - GET

19:20:14.958 [QueueListener|Send Queue  /services/myService] INFO  i.a.q.e.mdc.InternalServiceImpl - GET CallStack called
- 0:0:0:0:0:0:0:1:63910 - /rest/callstack/bundle - GET
Think about this for a moment. We just passed our call context and we jumped the Thread call context chasm. Pretty cool?

Getting the current request context

You do not have to call ManagedServiceBuilder.enableLoggingMappedDiagnosticContext to get the request context. All you need to call is enableRequestChain. CallingenableRequestChain enables the request chain. There is some slight overhead for this, but this allows REST and WebSocket services to pass the originating request, methodCall, etc. to downstream services where it will be available via the RequestContext. Remember that aMethodCallHttpRequestWebSocketMessageEvent are all Requests objects in QBit. Calling ManagedServiceBuilder.enableLoggingMappedDiagnosticContext also enablesManagedServiceBuilder.enableRequestChain.
The class RequestContext allows you to access the current request or current method.

QBit's RequestContext class

package io.advantageous.qbit.service;

import io.advantageous.qbit.message.MethodCall;
import io.advantageous.qbit.message.Request;

import java.util.Optional;

/**
 * Holds the current request for the method call.
 */
public class RequestContext {
    /** Grab the current  request.
     *
     * @return Optional  request.
     */
    public Optional<Request<Object>> getRequest() {
        ...
    }

    /** Grab the current method call.
     *
     * @return Optional  method call.
     */
    public Optional<MethodCall<Object>> getMethodCall() {
       ... 
    }

...

}
In addition QBit provides a way to access the current HttpRequest associated with a service call chain. This is done via the HttpContext which extends the RequestContext.

QBit's HttpContext class

/**
 * Holds current information about the HttpRequest.
 */
public class HttpContext extends RequestContext {


    /** Grab the current http request.
     *
     * @return Optional http request.
     */
    public Optional<HttpRequest> getHttpRequest() {
        ...
    }
...
We can extend our example to capture the HttpContext. Let's add this to the RestService.
    @RequestMapping ("http-info")
    public String httpInfo() {

        final StringBuilder builder = new StringBuilder();
        final HttpContext httpContext = new HttpContext();
        final Optional<HttpRequest> httpRequest = httpContext.getHttpRequest();
        if (httpRequest.isPresent()) {
            builder.append("URI = ").append(httpRequest.get().getUri()).append("\n");
            builder.append("HTTP Method = ").append(httpRequest.get().getMethod()).append("\n");
            builder.append("USER AGENT = ").append(
                    httpRequest.get().getHeaders().getFirst(HttpHeaders.USER_AGENT)).append("\n");
        } else {
            builder.append("request not found");
        }


        final RequestContext requestContext = new RequestContext();

        if (requestContext.getMethodCall().isPresent()) {
            final MethodCall<Object> methodCall = requestContext.getMethodCall().get();
            builder.append("Object Name = ").append(methodCall.objectName()).append("\n");
            builder.append("Method Name = ").append(methodCall.name()).append("\n");
        }
        return builder.toString();
    }
The above shows how to use both HttpContext and another example of RequestContext.
$ curl http://localhost:8080/rest/http-info | jq .
"URI = /rest/http-info\nHTTP Method = GET\nUSER AGENT = curl/7.43.0\nObject Name = restservice\nMethod Name = httpInfo\n"

Complete code for the example

package io.advantageous.qbit.example.mdc;

import io.advantageous.qbit.reactive.Callback;

import java.util.List;

public interface InternalService {

    void getCallStack(Callback<List<String>> listCallback);
}
...
package io.advantageous.qbit.example.mdc;


import io.advantageous.qbit.http.request.HttpRequest;
import io.advantageous.qbit.message.MethodCall;
import io.advantageous.qbit.message.Request;
import io.advantageous.qbit.service.RequestContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

public class InternalServiceImpl {


    private final Logger logger = LoggerFactory.getLogger(InternalServiceImpl.class);
    private final RequestContext requestContext;

    public InternalServiceImpl(final RequestContext requestContext) {
        this.requestContext = requestContext;
    }

    public List<String> getCallStack() {

        logger.info("GET CallStack called");

        final Optional<MethodCall<Object>> currentMethodCall = requestContext.getMethodCall();

        if (!currentMethodCall.isPresent()) {
            logger.info("Method not found");
            return Arrays.asList("MethodCall Not Found");
        }

        final List<String> callStack = new ArrayList<>();
        MethodCall<Object> methodCall = currentMethodCall.get();


        callStack.add("Service Call(" + methodCall.objectName()
                + "." + methodCall.name() + ")");

        while (methodCall!=null) {

            final Request<Object> request = methodCall.originatingRequest();
            if (request ==null) {
                methodCall = null;
            } else if (request instanceof MethodCall) {
                methodCall = ((MethodCall<Object>) request);
                callStack.add("Service Call(" + methodCall.objectName()
                        + "." + methodCall.name() + ")");
            } else if (request instanceof HttpRequest) {
                final HttpRequest httpRequest = ((HttpRequest) request);

                callStack.add("REST Call(" + httpRequest.getRemoteAddress()
                        + "." + httpRequest.getUri() + ")");

                methodCall = null;
            } else {
                methodCall = null;
            }
        }

        return callStack;
    }

}
...
package io.advantageous.qbit.example.mdc;

import io.advantageous.qbit.admin.ManagedServiceBuilder;
import io.advantageous.qbit.annotation.RequestMapping;
import io.advantageous.qbit.http.HttpContext;
import io.advantageous.qbit.http.HttpHeaders;
import io.advantageous.qbit.http.request.HttpRequest;
import io.advantageous.qbit.message.MethodCall;
import io.advantageous.qbit.reactive.Callback;
import io.advantageous.qbit.reactive.Reactor;
import io.advantageous.qbit.reactive.ReactorBuilder;
import io.advantageous.qbit.service.*;
import io.advantageous.qbit.service.stats.StatsCollector;
import io.advantageous.qbit.util.Timer;
import org.slf4j.MDC;

import org.slf4j.LoggerFactory;

import org.slf4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.Optional;


/**
 * curl http://localhost:8080/rest/mdc
 * curl http://localhost:8080/rest/callstack/queue
 * curl http://localhost:8080/rest/callstack/queue
 *
 */
@RequestMapping ("rest")
public class RestService extends BaseService {

    private final Logger logger = LoggerFactory.getLogger(RestService.class);
    private final InternalService internalServiceFromServiceQueue;
    private final InternalService internalServiceFromServiceBundle;

    public RestService(final InternalService internalServiceFromServiceBundle,
                       final InternalService internalServiceFromServiceQueue,
                       final Reactor reactor,
                       final Timer timer,
                       final StatsCollector statsCollector) {
        super(reactor, timer, statsCollector);
        this.internalServiceFromServiceBundle = internalServiceFromServiceBundle;
        this.internalServiceFromServiceQueue = internalServiceFromServiceQueue;
        reactor.addServiceToFlush(internalServiceFromServiceBundle);
        reactor.addServiceToFlush(internalServiceFromServiceQueue);
    }

    @RequestMapping ("callstack/queue")
    public void callStackFromQueue(final Callback<List<String>> callback) {
        logger.info("Logger {}", MDC.getCopyOfContextMap());
        internalServiceFromServiceQueue.getCallStack(callback);
    }

    @RequestMapping ("callstack/bundle")
    public void callStackFromBundle(final Callback<List<String>> callback) {
        logger.info("Logger {}", MDC.getCopyOfContextMap());
        internalServiceFromServiceBundle.getCallStack(callback);
    }


    @RequestMapping ("mdc")
    public void mdc(final Callback<Map<String,String>> callback) {
        logger.info("CALLED MDC");
        callback.returnThis(MDC.getCopyOfContextMap());
    }

    @RequestMapping ("ping")
    public boolean ping() {
        return true;
    }


    @RequestMapping ("http-info")
    public String httpInfo() {

        final StringBuilder builder = new StringBuilder();
        final HttpContext httpContext = new HttpContext();
        final Optional<HttpRequest> httpRequest = httpContext.getHttpRequest();
        if (httpRequest.isPresent()) {
            builder.append("URI = ").append(httpRequest.get().getUri()).append("\n");
            builder.append("HTTP Method = ").append(httpRequest.get().getMethod()).append("\n");
            builder.append("USER AGENT = ").append(
                    httpRequest.get().getHeaders().getFirst(HttpHeaders.USER_AGENT)).append("\n");
        } else {
            builder.append("request not found");
        }


        final RequestContext requestContext = new RequestContext();

        if (requestContext.getMethodCall().isPresent()) {
            final MethodCall<Object> methodCall = requestContext.getMethodCall().get();
            builder.append("Object Name = ").append(methodCall.objectName()).append("\n");
            builder.append("Method Name = ").append(methodCall.name()).append("\n");
        }
        return builder.toString();
    }


    public static void main(String... args) throws Exception {
        final ManagedServiceBuilder managedServiceBuilder = ManagedServiceBuilder.managedServiceBuilder();


        managedServiceBuilder.setRootURI("/");

        managedServiceBuilder.enableLoggingMappedDiagnosticContext();

        /** Create Service from Service Queue. */
        final InternalService internalServiceFromServiceQueue = getInternalServiceFromServiceQueue(managedServiceBuilder);


        /** Create Service from Service Bundle. */
        final InternalService internalServiceFromServiceBundle = getInternalServiceFromServiceBundle(managedServiceBuilder);


        final StatsCollector statsCollectorForRest = managedServiceBuilder.getStatServiceBuilder().buildStatsCollector();

        final RestService restService = new RestService(internalServiceFromServiceBundle,
                internalServiceFromServiceQueue,
                ReactorBuilder.reactorBuilder().build(), Timer.timer(), statsCollectorForRest);

        managedServiceBuilder.addEndpointService(restService);


        managedServiceBuilder.getEndpointServerBuilder().build().startServer();


    }

    private static InternalService getInternalServiceFromServiceQueue(ManagedServiceBuilder managedServiceBuilder) {
        final InternalServiceImpl internalServiceImpl = new InternalServiceImpl(new RequestContext());
        final ServiceBuilder serviceBuilderForServiceObject = managedServiceBuilder.createServiceBuilderForServiceObject(internalServiceImpl);
        final ServiceQueue serviceQueue = serviceBuilderForServiceObject.buildAndStartAll();
        return serviceQueue.createProxy(InternalService.class);
    }


    private static InternalService getInternalServiceFromServiceBundle(ManagedServiceBuilder managedServiceBuilder) {
        final InternalServiceImpl internalServiceImpl = new InternalServiceImpl(new RequestContext());
        final ServiceBundle serviceBundle = managedServiceBuilder.createServiceBundleBuilder().build().startServiceBundle();
        serviceBundle.addServiceObject("myService", internalServiceImpl);
        return serviceBundle.createLocalProxy(InternalService.class, "myService");
    }

}
<configuration>

    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n- %X{requestRemoteAddress} - %X{requestUri} - %X{requestHttpMethod}%n</pattern>
        </encoder>
    </appender>


    <logger name="io.advantageous.qbit" level="INFO"/>

    <root level="INFO">
        <appender-ref ref="STDOUT"/>
    </root>
</configuration>

Internals of QBit MDC support.

To make this all happen we added some new guts to QBit. We added some AOP interceptors to intercept method calls and added a new type of interceptor to intercept when we are going to send a call.
QBit has three types of interceptors.

BeforeMethodSent (NEW added for MDC and RequestContext support!)

package io.advantageous.qbit.client;


import io.advantageous.qbit.message.MethodCallBuilder;

public interface BeforeMethodSent {

    default void beforeMethodSent(final MethodCallBuilder methodBuilder) {}
}

BeforeMethodSent gets called just before a method is sent to a service queue or a service bundle.

BeforeMethodCall

package io.advantageous.qbit.service;

import io.advantageous.qbit.message.MethodCall;

/**
 * Use this to register for before method calls for services.
 * <p>
 * created by Richard on 8/26/14.
 *
 * @author rhightower
 */
@SuppressWarnings("SameReturnValue")
public interface BeforeMethodCall {

    /**
     *
     * @param call method call
     * @return true if the method call should continue.
     */
    boolean before(MethodCall call);
}

AfterMethodCall


/**
 * Use this to register for after method calls for services.
 * created by Richard on 8/26/14.
 *
 * @author rhightower
 */
@SuppressWarnings({"BooleanMethodIsAlwaysInverted", "SameReturnValue"})
public interface AfterMethodCall {


    boolean after(MethodCall call, Response response);
}

We also provide interceptor chains to make it easier to wire these up.

Interceptor Chains

package io.advantageous.qbit.client;

import io.advantageous.qbit.message.MethodCallBuilder;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class BeforeMethodSentChain implements BeforeMethodSent {


    private final List<BeforeMethodSent> beforeMethodCallSentList;

    public static BeforeMethodSentChain beforeMethodSentChain(BeforeMethodSent... beforeMethodSentCalls) {
        return new BeforeMethodSentChain(Arrays.asList(beforeMethodSentCalls));
    }

    public BeforeMethodSentChain(List<BeforeMethodSent> beforeMethodCallSentList) {
        this.beforeMethodCallSentList = Collections.unmodifiableList(beforeMethodCallSentList);
    }

    @Override
    public void beforeMethodSent(final MethodCallBuilder methodBuilder) {

        for (final BeforeMethodSent beforeMethodCallSent : beforeMethodCallSentList) {
            beforeMethodCallSent.beforeMethodSent(methodBuilder);
        }
    }

}

...

package io.advantageous.qbit.service;

import io.advantageous.qbit.message.MethodCall;
import io.advantageous.qbit.message.Response;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class AfterMethodCallChain implements AfterMethodCall{

    public static AfterMethodCallChain afterMethodCallChain(final AfterMethodCall... calls) {
        return new AfterMethodCallChain(Arrays.asList(calls));

    }
    private final List<AfterMethodCall> afterMethodCallList;

    public AfterMethodCallChain(List<AfterMethodCall> afterMethodCallList) {
        this.afterMethodCallList = Collections.unmodifiableList(afterMethodCallList);
    }

    @Override
    public boolean after(final MethodCall call, final Response response) {

        for (final AfterMethodCall afterMethodCall : afterMethodCallList) {
            if (!afterMethodCall.after(call, response)) {
                return false;
            }
        }
        return true;
    }
}
...
package io.advantageous.qbit.service;

import io.advantageous.qbit.message.MethodCall;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class BeforeMethodCallChain implements BeforeMethodCall {


    private final List<BeforeMethodCall> beforeMethodCallList;

    public static BeforeMethodCallChain beforeMethodCallChain(BeforeMethodCall... beforeMethodCalls) {
        return new BeforeMethodCallChain(Arrays.asList(beforeMethodCalls));
    }

    public BeforeMethodCallChain(List<BeforeMethodCall> beforeMethodCallList) {
        this.beforeMethodCallList = Collections.unmodifiableList(beforeMethodCallList);
    }


    @Override
    public boolean before(final MethodCall call) {

        for (final BeforeMethodCall beforeMethodCall : beforeMethodCallList) {
            if (!beforeMethodCall.before(call)) {
                return false;
            }
        }
        return true;
    }
}
The chains are all new to make dealing with AOP a bit easier in QBit.
The builders for ServiceQueueServiceBundle and ServiceEndpointServer allow you to pass a BeforeMethodCall, an AfterMethodCall, and a BeforeMethodSent which could be the aforementioned chains.
To setup MDC we created an interceptor called SetupMdcForHttpRequestInterceptor which is both a BeforeMethodCall, and an AfterMethodCall interceptor.
/**
 * Provides MDC support for QBit REST services.
 * Intercepts method calls to a service.
 * Looks at the originatingRequest to see if HTTP request is the originating request.
 * If an HTTP request is the originating request then we decorate the Log with
 * MDC fields.
 *
 * [http://logback.qos.ch/manual/mdc.html](Mapped Diagnostic Context)
 *
 * You can specify the headers that you want extracted and placed inside
 * the Mapped Diagnostic Context as well.
 *
 */
public class SetupMdcForHttpRequestInterceptor implements BeforeMethodCall, AfterMethodCall {

    public static final String REQUEST_URI = "requestUri";
    public static final String REQUEST_REMOTE_ADDRESS = "requestRemoteAddress";
    public static final String REQUEST_HTTP_METHOD = "requestHttpMethod";
    public static final String REQUEST_HEADER_PREFIX = "requestHeader.";
    /**
     * Holds the headers that we want to extract from the request.
     */
    private final Set<String> headersToAddToLoggingMappingDiagnosticsContext;

    /**
     * Construct a SetupMdcForHttpRequestInterceptor
     * @param headersToAddToLoggingMappingDiagnosticsContext headers to add to the Logging Mapping Diagnostics Context.
     */
    public SetupMdcForHttpRequestInterceptor(Set<String> headersToAddToLoggingMappingDiagnosticsContext) {
        this.headersToAddToLoggingMappingDiagnosticsContext =
                Collections.unmodifiableSet(headersToAddToLoggingMappingDiagnosticsContext);
    }

    /**
     * Gets called before a method gets invoked on a service.
     * This adds request URI, remote address and request headers of the HttpRequest if found.
     * @param methodCall methodCall
     * @return true to continue, always true.
     */
    @Override
    public boolean before(final MethodCall methodCall) {

        final Optional<HttpRequest> httpRequest = findHttpRequest(methodCall);
        if (httpRequest.isPresent()) {
            extractRequestInfoAndPutItIntoMappedDiagnosticContext(httpRequest.get());
        }
        return true;
    }

    private Optional<HttpRequest> findHttpRequest(Request<Object> request) {

        if (request.originatingRequest() instanceof HttpRequest) {
            return Optional.of(((HttpRequest) request.originatingRequest()));
        } else if (request.originatingRequest()!=null) {
            return findHttpRequest(request.originatingRequest());
        } else {
            return Optional.empty();
        }
    }


    /**
     * Gets called after a method completes invocation on a service.
     * Used to clear the logging Mapped Diagnostic Context.
     * @param call method call
     * @param response response from method
     * @return always true
     */
    @Override
    public boolean after(final MethodCall call, final Response response) {
        MDC.clear();
        return true;
    }

    /**
     * Extract request data and put it into the logging Mapped Diagnostic Context.
     * @param httpRequest httpRequest
     */
    private void extractRequestInfoAndPutItIntoMappedDiagnosticContext(final HttpRequest httpRequest) {
        MDC.put(REQUEST_URI, httpRequest.getUri());
        MDC.put(REQUEST_REMOTE_ADDRESS, httpRequest.getRemoteAddress());
        MDC.put(REQUEST_HTTP_METHOD, httpRequest.getMethod());

        extractHeaders(httpRequest);

    }

    /**
     * Extract headersToAddToLoggingMappingDiagnosticsContext data and put them into the logging mapping diagnostics context.
     * @param httpRequest httpRequest
     */
    private void extractHeaders(final HttpRequest httpRequest) {
        if (headersToAddToLoggingMappingDiagnosticsContext.size() > 0) {
            final MultiMap<String, String> headers = httpRequest.getHeaders();
            headersToAddToLoggingMappingDiagnosticsContext.forEach(header -> {
                String value = headers.getFirst(header);
                if (!Str.isEmpty(value)) {
                    MDC.put(REQUEST_HEADER_PREFIX + header, value);
                }
            });
        }
    }

}
To send originating requests to downstream servers, we created theForwardCallMethodInterceptor which implements BeforeMethodSent.

Creates the call chain.

package io.advantageous.qbit.http.interceptor;

import io.advantageous.qbit.client.BeforeMethodSent;
import io.advantageous.qbit.message.MethodCallBuilder;
import io.advantageous.qbit.message.Request;
import io.advantageous.qbit.service.RequestContext;

import java.util.Optional;

/** This is used by proxies to find the parent request and forward it
 * to the service that the parent calls.
 */
public class ForwardCallMethodInterceptor implements BeforeMethodSent {

    /**
     * Holds the request context, which holds the active request.
     */
    private final RequestContext requestContext;

    /**
     *
     * @param requestContext request context
     */
    public ForwardCallMethodInterceptor(final RequestContext requestContext) {
        this.requestContext = requestContext;
    }

    /**
     * Intercept the call before it gets sent to the service queue.
     * @param methodBuilder methodBuilder
     */
    @Override
    public void beforeMethodSent(final MethodCallBuilder methodBuilder) {

        if (methodBuilder.getOriginatingRequest() == null) {
            final Optional<Request<Object>> request = requestContext.getRequest();
            if (request.isPresent()) {
                methodBuilder.setOriginatingRequest(request.get());
            }
        }
    }
}
But as you may have noticed, that before it creates the call chain, the RequestContext has to be properly populated. And this is done by the CaptureRequestInterceptor.

CaptureRequestInterceptor

package io.advantageous.qbit.service;

import io.advantageous.qbit.message.MethodCall;
import io.advantageous.qbit.message.Response;

/**
 * Captures the Request if any present and puts it in the RequestContext.
 */
public class CaptureRequestInterceptor implements BeforeMethodCall, AfterMethodCall {


    /** Captures the current method call and if originating as an HttpRequest,
     * then we pass the HttpRequest into the the RequestContext.
     * @param methodCall methodCall
     * @return always true which means continue.
     */
    @Override
    public boolean before(final MethodCall methodCall) {

        RequestContext.setRequest(methodCall);
        return true;
    }


    /**
     * Clear the request out of the context
     * @param methodCall methodCall
     * @param response response
     * @return always true
     */
    @Override
    public boolean after(final MethodCall methodCall, final Response response) {
        RequestContext.clear();
        return true;
    }

}
The rest of RequestContext which we introduced earlier is:

QBit's RequestContext

package io.advantageous.qbit.service;

import io.advantageous.qbit.message.MethodCall;
import io.advantageous.qbit.message.Request;

import java.util.Optional;

/**
 * Holds the current request for the method call.
 */
public class RequestContext {

    /** Current request. */
    private final static ThreadLocal<Request<Object>> requestThreadLocal = new ThreadLocal<>();


    /** Grab the current  request.
     *
     * @return Optional  request.
     */
    public Optional<Request<Object>> getRequest() {
        final Request request = requestThreadLocal.get();
        return Optional.ofNullable(request);

    }

    /** Grab the current  request.
     *
     * @return Optional  request.
     */
    public Optional<MethodCall<Object>> getMethodCall() {
        final Request<Object> request = requestThreadLocal.get();
        if (request instanceof MethodCall) {
            return Optional.of(((MethodCall<Object>) request));
        }
        return Optional.empty();

    }


    /**
     * Used from this package to populate request for this thread.
     * @param request request
     */
    static void setRequest(final Request<Object> request) {
        requestThreadLocal.set(request);
    }

    /**
     * Clear the request.
     */
    static void clear() {
        requestThreadLocal.set(null);
    }


}
You don't need ManagedServiceBuilder to use these features. It is just that it makes the wiring a lot easier. You can wire up these interceptors yourself as this example from QBit's unit tests shows:

Wiring up QBit MDC without ManagedServiceBuilder

    @Test
    public void testIntegrationWithServiceBundle() throws Exception{


        mdcForHttpRequestInterceptor = new SetupMdcForHttpRequestInterceptor(Sets.set("foo"));

        final CaptureRequestInterceptor captureRequestInterceptor = new CaptureRequestInterceptor();
        captureRequestInterceptor.before(
           methodCallBuilder.setName("restMethod").setOriginatingRequest(httpRequest).build());


        final ServiceBundle serviceBundle = ServiceBundleBuilder.serviceBundleBuilder()
             .setBeforeMethodCallOnServiceQueue(
                 BeforeMethodCallChain.beforeMethodCallChain(captureRequestInterceptor,
                                                mdcForHttpRequestInterceptor))
             .setAfterMethodCallOnServiceQueue(
                  AfterMethodCallChain.afterMethodCallChain(captureRequestInterceptor, 
                                    mdcForHttpRequestInterceptor))
             .setBeforeMethodSent(
                   new ForwardCallMethodInterceptor(new RequestContext()))
             .build().startServiceBundle();


        serviceBundle.addServiceObject("my", new MyServiceImpl());


        final MyService localProxy = serviceBundle.createLocalProxy(MyService.class, "my");

        final AsyncFutureCallback<String> callback = AsyncFutureBuilder.asyncFutureBuilder().build(String.class);
        localProxy.getRequestURI(callback);

        localProxy.clientProxyFlush();

        assertEquals("/foo", callback.get());



        final AsyncFutureCallback<Map<String, String>> callbackMap = AsyncFutureBuilder.asyncFutureBuilder()
                .buildMap(String.class, String.class);

        localProxy.getMDC(callbackMap);

        localProxy.clientProxyFlush();

        validate(callbackMap.get());

        captureRequestInterceptor.after(null, null);

        serviceBundle.stop();


    }

That about covers it for the internals. The ManagedServiceBuilder just configures those interceptors for you.

ManagedServiceBuilder configuring AOP interceptors for QBit

    /**
     * Hold lists of interceptors.
     */
    private static class Interceptors {
        List<BeforeMethodCall> before = new ArrayList<>();
        List<AfterMethodCall> after = new ArrayList<>();
        List<BeforeMethodSent> beforeSent = new ArrayList<>();
    }

    /**
     * Configure a list of common interceptors.
     * @return
     */
    private Interceptors configureInterceptors() {
        Interceptors interceptors = new Interceptors();
        SetupMdcForHttpRequestInterceptor setupMdcForHttpRequestInterceptor;
        if (enableLoggingMappedDiagnosticContext) {
            enableRequestChain = true;
            if (requestHeadersToTrackForMappedDiagnosticContext!=null &&
                    requestHeadersToTrackForMappedDiagnosticContext.size()>0) {
                setupMdcForHttpRequestInterceptor = 
                  new SetupMdcForHttpRequestInterceptor(requestHeadersToTrackForMappedDiagnosticContext);
            }else {
                setupMdcForHttpRequestInterceptor = 
                   new SetupMdcForHttpRequestInterceptor(Collections.emptySet());
            }
            interceptors.before.add(setupMdcForHttpRequestInterceptor);
            interceptors.after.add(setupMdcForHttpRequestInterceptor);
        }

        if (enableRequestChain) {
            final CaptureRequestInterceptor captureRequestInterceptor = new CaptureRequestInterceptor();
            interceptors.before.add(captureRequestInterceptor);
            interceptors.after.add(captureRequestInterceptor);
            interceptors.beforeSent.add(new ForwardCallMethodInterceptor(new RequestContext()));
        }
        return interceptors;
    }

...


    private void configureEndpointServerBuilderForInterceptors(final EndpointServerBuilder endpointServerBuilder) {

        final Interceptors interceptors = configureInterceptors();
        if (interceptors.before.size() > 0) {
            endpointServerBuilder.setBeforeMethodCallOnServiceQueue(new BeforeMethodCallChain(interceptors.before));
        }
        if (interceptors.after.size() > 0) {
            endpointServerBuilder.setAfterMethodCallOnServiceQueue(new AfterMethodCallChain(interceptors.after));
        }
        if (interceptors.beforeSent.size() > 0) {
            endpointServerBuilder.setBeforeMethodSent(new BeforeMethodSentChain(interceptors.beforeSent));
        }
    }


    private void configureServiceBundleBuilderForInterceptors(final ServiceBundleBuilder serviceBundleBuilder) {

        final Interceptors interceptors = configureInterceptors();
        if (interceptors.before.size() > 0) {
            serviceBundleBuilder.setBeforeMethodCallOnServiceQueue(new BeforeMethodCallChain(interceptors.before));
        }
        if (interceptors.after.size() > 0) {
            serviceBundleBuilder.setAfterMethodCallOnServiceQueue(new AfterMethodCallChain(interceptors.after));
        }
        if (interceptors.beforeSent.size() > 0) {
            serviceBundleBuilder.setBeforeMethodSent(new BeforeMethodSentChain(interceptors.beforeSent));
        }
    }


    private void configureServiceBuilderForInterceptors(final ServiceBuilder serviceBuilder) {

        final Interceptors interceptors = configureInterceptors();
        if (interceptors.before.size() > 0) {
            serviceBuilder.setBeforeMethodCall(new BeforeMethodCallChain(interceptors.before));
        }
        if (interceptors.after.size() > 0) {
            serviceBuilder.setAfterMethodCall(new AfterMethodCallChain(interceptors.after));
        }
        if (interceptors.beforeSent.size() > 0) {
            serviceBuilder.setBeforeMethodSent(new BeforeMethodSentChain(interceptors.beforeSent));
        }
    }


    public EndpointServerBuilder getEndpointServerBuilder() {
        if (endpointServerBuilder==null) {
            endpointServerBuilder = EndpointServerBuilder.endpointServerBuilder();
            endpointServerBuilder.setPort(this.getPort());
            ....



            configureEndpointServerBuilderForInterceptors(endpointServerBuilder);

...

//And so on

Conclusion

Distributed development can be harder to debug. Tools like MDC, LogStash and Splunk can make it much easier to manage microservices. With QBit, MDC, and distributed logging support is built-in so you can get things done when you are writing microservices.
Read more about QBit with these Wiki Pages or read the QBit Microservices tutorial.

No comments:

Post a Comment

Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training