Rick

Rick
Rick

Friday, February 18, 2011

Random thoughts on JGroups

I have been messing around with JGroups for a project.
I am impressed how easy it is to get started. How much power it has! And how quickly they answer questions on the mailing list (even-though all my messages were screened for being too big). (Although the IRC chat is dead.)

Some notes on JGroups (I took out the actual product names. I don't want to start a flame war.)


JGroups is a framework for Reliable Multicast Communication.

JGroups is used by JBoss clustering, JBoss fault tolerant JMS support, ehcache, OSCache, Apache Tomcat for clustering communication.
JGroups is the de facto standard for reliable multicast communication a.k.a cluster communication.
If there is a successful open source project that needs clustering, there is a good chance they are using JGroups.
I have personally worked on projects that used JGroup. It works and it is easy to use.
The API is similar to product X without all of the stupidity and needless abstractions that we don't need.
If you simplify the Product X API and boil it down to exactly what we needed and then made the API work on the mostly widely used toolkit for clustering communication you would have JGroups.
The classes we are using in Product X to do cluster communication are undocumented features. They are complicated to use.
Conversely, JGroups has tons of documentation and is open source so we can even look at the code if we need to. Also the community is active and vibrant. 
Steps for a JGroups push
  1. Some script or human invokes a startFlip method on the Revision Flipper
  2. Revision Flipper already knows about every service node in the cluster
  3. Revision flipper sends out a message on the "serviceACache" channel to repopulate the cache as part of this message is list of unique service names, e.g., serviceA1, serviceA2, etc.
  4. The individual service nodes get this message and populate their caches from the database if their service name is in the list (this prevents sending messages to services that were down)
  5. When the service nodes are done, they send a done message on the "serviceACache" channel
  6. If all service nodes that registered do not respond within a certain timeout, then the Revision Flipper files an bug report
  7. If any service nodes send an error message on the "serviceACache" channel then the Revision Flipper files a bug report with the details of the error
  8. Once all service are correlated by the Revision Flipper, it sends out a message to flip the caches on the "serviceACache" channel.
  9. The Revision Flipper does similar correlation of responses, error messages handling and timeout handling.
  10. When the Revision Flipper is finished with a particular service cache flipping, it sends out a message on the "serviceACache" channel that it is done. (It is then for services that were started in the middle of process to initialize themselves.)
  11. The Revision Flipper does a similar coordination with other service caches in the correct order based on configuration in the workflow engine
  12. If a service node comes up in the middle of flip, it waits until it gets the done message, it then populates it cache accordingly,



package com.foo;

import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.jgroups.blocks.MessageDispatcher;
import org.jgroups.blocks.Request;
import org.jgroups.blocks.RequestHandler;
import org.jgroups.blocks.RequestOptions;

import java.io.BufferedReader;
import java.io.InputStreamReader;

public class RevisionFlipper extends ReceiverAdapter implements RequestHandler {
    private JChannel channel;
    private MessageDispatcher messageDispatcher;
    private String clusterName;

    public RevisionFlipper(String clusterName) {
        this.clusterName = clusterName;
    }

    public void viewAccepted(View newView) {
        // this.view = newView;
    }

    public void receive(Message msg) {
        System.out.println(msg.getSrc() + ": " + msg.getObject());
    }

    private void start() throws Exception {
        channel = new JChannel();
        channel.setReceiver(this);
        channel.connect(clusterName);
        messageDispatcher = new MessageDispatcher(channel, this, this, this);
        eventLoop();

        channel.close();
    }

    private void eventLoop() throws Exception {
        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
        while (true) {
            System.out.print("> ");
            System.out.flush();
            String line = in.readLine();
            if (line.equals("startFlip")) {
                startFlip();
            } else {
                Message msg = new Message(null, null, line);
                channel.send(msg);
            }
        }
    }

    private void startFlip() {

        System.out.println("In start flip method");
        // 2. Revision Flipper already knows about every service node in the
        // cluster
        Message msg = new Message(null, null, "startFlip message");
        RequestOptions requestOptions = new RequestOptions(Request.GET_ALL,
                5000);
        // 3. Using castMessage will block until all respond
        messageDispatcher.castMessage(null, msg, requestOptions);
        System.out.println("done casting");
    }

    public static void main(String[] args) throws Exception {
        new RevisionFlipper("serviceACache").start();
    }

    @Override
    public Object handle(Message msg) {
        System.out.println("HANDLE " + msg);
        return "success";
    }
}


I think this RPC approach is more appealing than using castMessage...

package com.foo;    

import org.jgroups.Channel;
import org.jgroups.JChannel;
import org.jgroups.blocks.Request;
import org.jgroups.blocks.RequestOptions;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.util.Util;

public class RpcTest {
        Channel            channel;
        RpcDispatcher      disp;
        Object            rsp_list;
        String             props; // set by application

        public int print(int number) throws Exception {
            return number * 2;
        }

        public void start() throws Throwable {
            channel=new JChannel(props);
            disp=new RpcDispatcher(channel, null, null, this);
            channel.connect("RpcDispatcherTestGroup");

            for(int i=0; i < 10; i++) {
                Util.sleep(100);
                
                RequestOptions requestOptions = new RequestOptions(Request.GET_ALL,
                        5000);

                
                rsp_list=disp.callRemoteMethods(null, "print", new Object[]{new Integer(i)}, new Class[]{int.class}, requestOptions);
                System.out.println("Responses: " +rsp_list);
            }
            channel.close();
            disp.stop();
         }

        public static void main(String[] args) throws Throwable {
            try {
                new RpcTest().start();
            }
            catch(Exception e) {
                System.err.println(e);
            }
        }
    }

Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training