Rick

Rick
Rick

Friday, February 18, 2011

Random thoughts on JGroups

I have been messing around with JGroups for a project.
I am impressed how easy it is to get started. How much power it has! And how quickly they answer questions on the mailing list (even-though all my messages were screened for being too big). (Although the IRC chat is dead.)

Some notes on JGroups (I took out the actual product names. I don't want to start a flame war.)


JGroups is a framework for Reliable Multicast Communication.

JGroups is used by JBoss clustering, JBoss fault tolerant JMS support, ehcache, OSCache, Apache Tomcat for clustering communication.
JGroups is the de facto standard for reliable multicast communication a.k.a cluster communication.
If there is a successful open source project that needs clustering, there is a good chance they are using JGroups.
I have personally worked on projects that used JGroup. It works and it is easy to use.
The API is similar to product X without all of the stupidity and needless abstractions that we don't need.
If you simplify the Product X API and boil it down to exactly what we needed and then made the API work on the mostly widely used toolkit for clustering communication you would have JGroups.
The classes we are using in Product X to do cluster communication are undocumented features. They are complicated to use.
Conversely, JGroups has tons of documentation and is open source so we can even look at the code if we need to. Also the community is active and vibrant. 
Steps for a JGroups push
  1. Some script or human invokes a startFlip method on the Revision Flipper
  2. Revision Flipper already knows about every service node in the cluster
  3. Revision flipper sends out a message on the "serviceACache" channel to repopulate the cache as part of this message is list of unique service names, e.g., serviceA1, serviceA2, etc.
  4. The individual service nodes get this message and populate their caches from the database if their service name is in the list (this prevents sending messages to services that were down)
  5. When the service nodes are done, they send a done message on the "serviceACache" channel
  6. If all service nodes that registered do not respond within a certain timeout, then the Revision Flipper files an bug report
  7. If any service nodes send an error message on the "serviceACache" channel then the Revision Flipper files a bug report with the details of the error
  8. Once all service are correlated by the Revision Flipper, it sends out a message to flip the caches on the "serviceACache" channel.
  9. The Revision Flipper does similar correlation of responses, error messages handling and timeout handling.
  10. When the Revision Flipper is finished with a particular service cache flipping, it sends out a message on the "serviceACache" channel that it is done. (It is then for services that were started in the middle of process to initialize themselves.)
  11. The Revision Flipper does a similar coordination with other service caches in the correct order based on configuration in the workflow engine
  12. If a service node comes up in the middle of flip, it waits until it gets the done message, it then populates it cache accordingly,



package com.foo;

import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.jgroups.blocks.MessageDispatcher;
import org.jgroups.blocks.Request;
import org.jgroups.blocks.RequestHandler;
import org.jgroups.blocks.RequestOptions;

import java.io.BufferedReader;
import java.io.InputStreamReader;

public class RevisionFlipper extends ReceiverAdapter implements RequestHandler {
    private JChannel channel;
    private MessageDispatcher messageDispatcher;
    private String clusterName;

    public RevisionFlipper(String clusterName) {
        this.clusterName = clusterName;
    }

    public void viewAccepted(View newView) {
        // this.view = newView;
    }

    public void receive(Message msg) {
        System.out.println(msg.getSrc() + ": " + msg.getObject());
    }

    private void start() throws Exception {
        channel = new JChannel();
        channel.setReceiver(this);
        channel.connect(clusterName);
        messageDispatcher = new MessageDispatcher(channel, this, this, this);
        eventLoop();

        channel.close();
    }

    private void eventLoop() throws Exception {
        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
        while (true) {
            System.out.print("> ");
            System.out.flush();
            String line = in.readLine();
            if (line.equals("startFlip")) {
                startFlip();
            } else {
                Message msg = new Message(null, null, line);
                channel.send(msg);
            }
        }
    }

    private void startFlip() {

        System.out.println("In start flip method");
        // 2. Revision Flipper already knows about every service node in the
        // cluster
        Message msg = new Message(null, null, "startFlip message");
        RequestOptions requestOptions = new RequestOptions(Request.GET_ALL,
                5000);
        // 3. Using castMessage will block until all respond
        messageDispatcher.castMessage(null, msg, requestOptions);
        System.out.println("done casting");
    }

    public static void main(String[] args) throws Exception {
        new RevisionFlipper("serviceACache").start();
    }

    @Override
    public Object handle(Message msg) {
        System.out.println("HANDLE " + msg);
        return "success";
    }
}


I think this RPC approach is more appealing than using castMessage...

package com.foo;    

import org.jgroups.Channel;
import org.jgroups.JChannel;
import org.jgroups.blocks.Request;
import org.jgroups.blocks.RequestOptions;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.util.Util;

public class RpcTest {
        Channel            channel;
        RpcDispatcher      disp;
        Object            rsp_list;
        String             props; // set by application

        public int print(int number) throws Exception {
            return number * 2;
        }

        public void start() throws Throwable {
            channel=new JChannel(props);
            disp=new RpcDispatcher(channel, null, null, this);
            channel.connect("RpcDispatcherTestGroup");

            for(int i=0; i < 10; i++) {
                Util.sleep(100);
                
                RequestOptions requestOptions = new RequestOptions(Request.GET_ALL,
                        5000);

                
                rsp_list=disp.callRemoteMethods(null, "print", new Object[]{new Integer(i)}, new Class[]{int.class}, requestOptions);
                System.out.println("Responses: " +rsp_list);
            }
            channel.close();
            disp.stop();
         }

        public static void main(String[] args) throws Throwable {
            try {
                new RpcTest().start();
            }
            catch(Exception e) {
                System.err.println(e);
            }
        }
    }

Thursday, February 3, 2011

Thoughts on Activiti deployments and usage


I am trying to get some ideas on how people deploy Activiti. Do you use it embedded? Did  you just extend the REST-API war project? Do you actually deploy jar files to the REST-API war project? etc.
We have a project that is going to manage a series of batch jobs. Described here under problem domain. The first phase of the project will require no human interaction (HI), but future phases will.
I don't want to include HI, but I don't want to preclude it either.
In order to use the tools for Activiti with all of its tooling, tracking, auditing and process viewing, we need to deploy our jars under the activiti-rest webapp. Essentially the tools (modeler, probe, explorer, etc.) use the REST API for Activiti (I think).
I think we want to reduce this type of deployment as much as possible, i.e., we don't want to deploy just jar files that have tasks.
I assert: If we want custom tasks and want to use the Activiti tooling (likely as per last meeting at work) then we will have to deploy Java classes (jar files) into that webapp or create a webapp that encompasses our tasks and the REST API. I don't like this approach. I don't think we would be able to get this past SCM group.
The other option is to get a process id for the workflow send this around and have the clients of the workflow mark the current task as done (which will advance it to the next task or branch decision). This approach relies heavily on the API and in my opinion violates separating areas of concern.
Our original idea (Ian and I's) was to use a message bus (Spring Integration) to mark the task done and then each task sends out a message where one or more batch processes (living in another JVM) will handle that step and send a done message when they are done. 
The other possibility is to run embedded and not use the tools (no chance of HI in the future and we loose a lot of the tracking ability of the workflow). 
I can send out some code examples and diagrams with pros and cons of each approach.
Options:
  • Option 1: Run embedded to avoid deployment of custom tasks 
    • Can we stil use the Activiti tooling? Perhaps just to visualize, but can't run the process remotely, must run locally as custom tasks will not be deployed.
    • Custom tasks are embedded with custom nodes
    • If processing nodes exist on two different boxes, how do we send the id around
    • If it all exists on one box, then it will call into our existing custom service bus
  • Option 2: Send process id around, have each node be responsible for marking its task done
    • We could still use the tooling
    • This does spread the code around, we could wrap in a facade/lib
    • How do the nodes get the process id?
    • Seems like we need messaging
  • Option 3: Have the workflow be the message coordinator
    • Create one custom task that fires a start process message via Spring Integration (and passes name/value pairs of the execution context)
    • This custom task waits until it gets a message back that says that step is done, then it marks the task done
    • Use the complete tooling of Activiti to see processes and manage them
    • The message listeners can be Spring Managed Message Driven Beans, we can use JMS and Spring Integration for the message delivery
  • Option 4: Create custom war file based on activiti REST API war file
    • Allows use of Activiti tooling
    • We need to carry the source or break it up into jar files
    • Deploying new tasks is just a matter of rebuilding our war file
    • Calls out to services in our custom service bus
To me the only option that make sense for our goals are Option 3 with some Option 2 (mostly Option 3 with a little Option 2 where needed) or Option 4.
I don't like Option 1 because we lose a lot of the tooling, auditing and future human interaction.
If I had to pick, I would pick Option 3. It divorces us from Activiti a lot. We just create one custom task to send a message and wait until it gets a continue or fail message. We can integrate with all other tasks easily including HI.
None of our code (processing nodes) will rely on a workflow engine. Also there is some work already done to integrate Spring Integration and Activiti so we might not even have to create any custom tasks just use the integration as is.
Based on internal meetings we have had it sounds like we are going to do either Option 1 or Option 4.

10 minute guide Activiti improvements

Technology adoption is a tricky business. I think Hibernate and Spring were successful in a large part due to their abundance of documentation and example code.

Activiti has fairly decent documentation at least from a reference standpoint. It could use some more high-level descriptions (like where the different pieces run). I was able to figure some of this out through trial and error and from a variety of resources on the web (I plan on documenting them in a future blog post 0-60 undestanding what Activiti is or something of the ilk.)

One thing that I feel Activiti lacks is a good getting started tutorial that covers some basic usages. They have a 10 minute guide that drops the user in the middle of using Activiti. It shows them using classes that they did not instantiate that somehow magically show up. Keep in mind that this 10 minute guide is on page 20 of a 93 page document (as of 2/3/2011). It is an exercise for the user to go through the previous 19 pages and guess which combination of options will get them a working example.

My point being that the tutorial should be closer to the front of the document and it should describe in detail how to get things running. Don't show references to classes we did not instantiate.

At this point, you may think, why didn't you just look at the sample code. Well the sample code was all based on unit tests. Also, the sample code was more like a test suite than true getting started examples. It is good to look at test suites when you are trying to learn things or get to the next level, but it does not make sense for samples when you are just starting out.

I say this because I think Activiti is a good project and it deserves to be successful. This is not criticism out of hatred but criticism out of fondness and a wish for continued success and wider adoption.

I was able to get things working by posting on the forums (and getting answers), reading blogs and watching slide decks and videos. The information is out there, but it really belongs in the Activiti user manual.

I wanted the 10 minute guide to show me how to run a workflow standalone and as part of the Activiti system, i.e., toolable via the activiti-modeler, activiti-probe, activiti-cycle, etc.

To create a working pom file, I used one of the example pom files as a basis. Later, I found that I could also use the Activiti Eclipse Plugin to generate an activiti project, then you just run mvn eclipse:eclipse to configure the .classpath and .project for Eclipse. (I will show the pom at the end.)

Here is the complete Java source for a working example based on the one in the 10 minute guide.

package com.demo;
package com.demo;

import java.util.List;

import org.activiti.engine.ProcessEngine;
import org.activiti.engine.ProcessEngineConfiguration;
import org.activiti.engine.task.Task;

public class Demo {

    public static void main(String[] args) {
        // Process engine that is standalone in memory. Good for testing and
        // trying out examples.
        // ProcessEngine processEngine =
        // ProcessEngineConfiguration.createStandaloneInMemProcessEngineConfiguration()
        // .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_FALSE)
        // .setJdbcUrl("jdbc:h2:mem:my-own-db;DB_CLOSE_DELAY=1000")
        // .setDatabaseSchemaUpdate("create-drop")
        // .setJobExecutorActivate(true)
        // .buildProcessEngine();

        // Process engine that will work with the way they deploy Activiti with
        // the demo
        ProcessEngine processEngine = ProcessEngineConfiguration
                .createStandaloneProcessEngineConfiguration()
                .buildProcessEngine();

        // Deploy the xml file that contains the process flow.
        processEngine.getRepositoryService().createDeployment()
                .addClasspathResource("com/demo/demo-flow.bpmn20.xml").deploy();

        // Start the process by id.
        processEngine.getRuntimeService().startProcessInstanceByKey(
                "financialReport2");

        List tasks = processEngine.getTaskService().createTaskQuery()
                .taskCandidateUser("fozzie").list();

        System.out.println("Got some tasks for fozzie" + tasks + "\n");
        for (Task task : tasks) {
            System.out.println(task.getId());
        }

        Task task = tasks.get(0);
        System.out.println(task.getName());

        processEngine.getTaskService().claim(task.getId(), "fozzie");
        // do some work
        processEngine.getTaskService().complete(task.getId());
        System.out.println("Done");

        tasks = processEngine.getTaskService().createTaskQuery()
                .taskCandidateUser("fozzie").list();

        System.out.println("Looking for some tasks for kermit" + tasks + "\n");
        tasks = processEngine.getTaskService().createTaskQuery()
                .taskCandidateUser("kermit").list();

        System.out.println("Got some tasks for kermit" + tasks + "\n");
        for (Task t : tasks) {
            System.out.println(t.getId());
            System.out.println(t.getName());
            // claim it and complete it
            processEngine.getTaskService().claim(t.getId(), "kermit");
            processEngine.getTaskService().complete(t.getId());
        }

    }

}

I believe the 10 minute guide does a fairly good explanation of what the various steps do.

The working process XML ("com/demo/demo-flow.bpmn20.xml") is as follows:

NOTE: The actual XML is slightly different than what I have below. The syntax highlighter chews it up. To see the actual XML do a view source on this page and search for XXJJ.





 

  
  
    
  
    
  
    
      Write monthly financial report for publication to shareholders.
    
    
      
        accountancy
      
    
  
    
  
      
  
    
      Verify monthly financial report composed by the accountancy department.
      This financial report is going to be sent to all the company shareholders.  
    
    
      
        management
      
    
  
    
  
      
  
      




The XML in the guide was short a few elements. This could be mitigated if they ship with the 10 minute guide example.

Lastly, here is the pom.xml file (if you don't know what a pom.xml file is, then read a getting started with maven2 guide).


  4.0.0

  Activiti Engine Examples
  BPM and workflow engine
  org.activiti
  activiti-engine-examples

  jar
  1.0

  
  
    
      org.activiti
      activiti-engine
      5.1
    
    
    
      junit
      junit
      4.8.1
      test
    
    
      com.h2database
      h2
      1.2.132
      test
    
    
    
      org.subethamail
      subethasmtp-wiser
      1.2
      test
    
    
  

  
    
      alfresco
      http://maven.alfresco.com/nexus/content/groups/public
    
    
        Activiti third party
        http://maven.alfresco.com/nexus/content/repositories/activiti-thirdparty/
     
    
      spring-extensions-milestone
      Spring Extensions Milestone Repository
      http://extensions.springframework.org/milestone
    
  

  
    
      maven2.java.net
      Java.net Repository for Maven 2
      http://download.java.net/maven/2/
    
  

  
    
      src/test/resources
      src/main/process
    
    
      
        maven-compiler-plugin
        
          1.5
          1.5
          true
          true
          true
        
      
      
        maven-surefire-plugin
        
          false
          false
          true
          
            **/*TestCase.java
          
        
      
    
  

  
    
      Apache v2
      http://www.apache.org/licenses/LICENSE-2.0.html
    
  




Happy Hunting!

Tuesday, February 1, 2011

#activiti getting something to work standalone

I went through the tutorial for the eclipse plugin and it worked.
Well, the unit test worked in the unit test only.

package org.activiti.designer.test;



import java.util.HashMap;
import java.util.Map;

import org.activiti.engine.RuntimeService;
import org.activiti.engine.test.Deployment;
import org.activiti.engine.test.ActivitiRule;
import org.junit.Rule;
import org.junit.Test;

public class ProcessTestHelloworld {

   @Rule
   public ActivitiRule activitiRule = new ActivitiRule();

   @Test
   @Deployment(resources="diagrams/my_bpmn2_diagram.activiti.bpmn20.xml")
   public void startProcess() {
      RuntimeService runtimeService = activitiRule.getRuntimeService();
      Map variableMap = new HashMap();
      variableMap.put("name", "Activiti");
      variableMap.put("color", "BLUE");
      runtimeService.startProcessInstanceByKey("helloworld", variableMap);
   }
}


However, I don't know how to run it standalone.

I tried writing a main method that loads the same process.

Code:

package com.demo;

import java.util.List;

import org.activiti.engine.ProcessEngine;
import org.activiti.engine.ProcessEngineConfiguration;
import org.activiti.engine.task.Task;

public class Demo {
   
   public static void main (String [] args) {
      ProcessEngine processEngine = ProcessEngineConfiguration.createStandaloneInMemProcessEngineConfiguration()
           .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_FALSE)
           .setJdbcUrl("jdbc:h2:mem:my-own-db;DB_CLOSE_DELAY=1000")
           .setDatabaseSchemaUpdate("create-drop")
           .setJobExecutorActivate(true)
           .buildProcessEngine();
      
      
      /* Deploy the xml file. */
      processEngine.getRepositoryService()
      .createDeployment().addClasspathResource("diagrams/my_bpmn2_diagram.activiti.bpmn20.xml").deploy();
      
      /* Start the process by id. */
      processEngine.getRuntimeService().startProcessInstanceById("helloworld");
            
      List tasks = processEngine.getTaskService()
            .createTaskQuery().taskCandidateUser("fozzie").list();
      
      System.out.println("Got some tasks " + tasks);
      for (Task task : tasks) {
         System.out.println(task.getId());
      }
   }


}


I get the same exception I was getting before (well similar).


Code:

Feb 1, 2011 5:08:47 PM org.activiti.engine.impl.ProcessEngineImpl 
INFO: ProcessEngine default created
Feb 1, 2011 5:08:47 PM org.activiti.engine.impl.jobexecutor.JobAcquisitionThread run
INFO: JobAcquisitionThread starting to acquire jobs
Feb 1, 2011 5:08:47 PM org.activiti.engine.impl.bpmn.deployer.BpmnDeployer deploy
INFO: Processing resource diagrams/my_bpmn2_diagram.activiti.bpmn20.xml
Feb 1, 2011 5:08:47 PM org.activiti.engine.impl.bpmn.parser.BpmnParse parseDefinitionsAttributes
INFO: XMLSchema currently not supported as typeLanguage
Feb 1, 2011 5:08:47 PM org.activiti.engine.impl.bpmn.parser.BpmnParse parseDefinitionsAttributes
INFO: XPath currently not supported as expressionLanguage
Feb 1, 2011 5:08:49 PM org.activiti.engine.impl.interceptor.CommandContext close
SEVERE: Error while closing command context
org.activiti.engine.ActivitiException: no deployed process definition found with id 'helloworld'
   at org.activiti.engine.impl.db.DbRepositorySession.findDeployedProcessDefinitionById(DbRepositorySession.java:217)
   at org.activiti.engine.impl.cmd.StartProcessInstanceCmd.execute(StartProcessInstanceCmd.java:47)
   at org.activiti.engine.impl.cmd.StartProcessInstanceCmd.execute(StartProcessInstanceCmd.java:29)
   at org.activiti.engine.impl.interceptor.CommandExecutorImpl.execute(CommandExecutorImpl.java:22)
   at org.activiti.engine.impl.interceptor.CommandContextInterceptor.execute(CommandContextInterceptor.java:37)
   at org.activiti.engine.impl.interceptor.LogInterceptor.execute(LogInterceptor.java:33)
   at org.activiti.engine.impl.RuntimeServiceImpl.startProcessInstanceById(RuntimeServiceImpl.java:57)
   at com.demo.Demo.main(Demo.java:25)
Exception in thread "main" org.activiti.engine.ActivitiException: no deployed process definition found with id 'helloworld'
   at org.activiti.engine.impl.db.DbRepositorySession.findDeployedProcessDefinitionById(DbRepositorySession.java:217)
   at org.activiti.engine.impl.cmd.StartProcessInstanceCmd.execute(StartProcessInstanceCmd.java:47)
   at org.activiti.engine.impl.cmd.StartProcessInstanceCmd.execute(StartProcessInstanceCmd.java:29)
   at org.activiti.engine.impl.interceptor.CommandExecutorImpl.execute(CommandExecutorImpl.java:22)
   at org.activiti.engine.impl.interceptor.CommandContextInterceptor.execute(CommandContextInterceptor.java:37)
   at org.activiti.engine.impl.interceptor.LogInterceptor.execute(LogInterceptor.java:33)
   at org.activiti.engine.impl.RuntimeServiceImpl.startProcessInstanceById(RuntimeServiceImpl.java:57)
   at com.demo.Demo.main(Demo.java:25)


I want just a simple standalone example (that is not a unit test) that works.

BTW I did explore and try a few things with the unit test so I am not completely stuck.
Also the screenshots and generated code for the tutorial do not match the current plugin.

10 minute guide missing pieces.
Working tutorial with old screenshots.

#activiti 10 minute guide example, deployment fails

I am trying to get the 10 minute guide example to work in a standalone application.

I am getting the following error:

// Comment
SEVERE: Error while closing command context
org.activiti.engine.ActivitiException: no deployed process definition found with id 'financialReport'
 at org.activiti.engine.impl.db.DbRepositorySession.findDeployedProcessDefinitionById(DbRepositorySession.java:217)
 at org.activiti.engine.impl.cmd.StartProcessInstanceCmd.execute(StartProcessInstanceCmd.java:47)
 at org.activiti.engine.impl.cmd.StartProcessInstanceCmd.execute(StartProcessInstanceCmd.java:29)
 at org.activiti.engine.impl.interceptor.CommandExecutorImpl.execute(CommandExecutorImpl.java:22)
 at org.activiti.engine.impl.interceptor.CommandContextInterceptor.execute(CommandContextInterceptor.java:37)
 at org.activiti.engine.impl.interceptor.LogInterceptor.execute(LogInterceptor.java:33)
 at org.activiti.engine.impl.RuntimeServiceImpl.startProcessInstanceById(RuntimeServiceImpl.java:57)
 at com.demo.Demo.main(Demo.java:41)

I am running demo demo.start and the unit tests in the examples work.
I took the pom.xml from activiti-engine.examples and repurposed it for this example.

package com.demo;

import java.util.List;

import org.activiti.engine.ProcessEngine;
import org.activiti.engine.ProcessEngineConfiguration;
import org.activiti.engine.repository.Deployment;
import org.activiti.engine.repository.DeploymentQuery;
import org.activiti.engine.task.Task;

public class Demo {
 
 public static void main (String [] args) {
  ProcessEngine processEngine = ProcessEngineConfiguration.createStandaloneInMemProcessEngineConfiguration()
     .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_FALSE)
     .setJdbcUrl("jdbc:h2:mem:my-own-db;DB_CLOSE_DELAY=1000")
     .setDatabaseSchemaUpdate("create-drop")
     .setJobExecutorActivate(true)
     .buildProcessEngine();
  
  
  /* Deploy the xml file. */
  processEngine.getRepositoryService()
  .createDeployment().addClasspathResource("com/demo/demo-flow.bpmn20.xml").deploy();
  
  
  /* Start the process by id. */
  processEngine.getRuntimeService().startProcessInstanceById("financialReport");
    
  //fails with the above call with the above exception
 }


}


The above uses this demo-flow.bpmn20.xml:


 

  
  
    
  
    
  
    
      Write monthly financial report for publication to shareholders.
    
    
      
        accountancy
      
    
  
    
  
      
  
    
      Verify monthly financial report composed by the accountancy department.
      This financial report is going to be sent to all the company shareholders.  
    
    
      
        management
      
    
  
    
  
      
  
      




I have written some other code that works in the same project as follows:

package com.demo;

import org.activiti.engine.impl.pvm.ProcessDefinitionBuilder;
import org.activiti.engine.impl.pvm.PvmActivity;
import org.activiti.engine.impl.pvm.PvmExecution;
import org.activiti.engine.impl.pvm.PvmProcessDefinition;
import org.activiti.engine.impl.pvm.PvmProcessInstance;
import org.activiti.engine.impl.pvm.delegate.ActivityExecution;
import org.activiti.engine.impl.pvm.delegate.SignallableActivityBehavior;


public class DemoUsingFluentAPI {
 
 static PvmProcessDefinition processDefinition = new ProcessDefinitionBuilder()
   .createActivity("find bacon")
     .initial()
     .behavior(new MyActivityBehavior())
     .transition("find eggs")
   .endActivity()
   .createActivity("find eggs")
     .behavior(new MyActivityBehavior2())
     .transition("find orange juice")
   .endActivity()
   .createActivity("find orange juice")
     .behavior(new MyActivityBehavior3())
   .endActivity()
   .buildProcessDefinition();

 
 public static class MyActivityBehavior implements SignallableActivityBehavior {

  public void execute(ActivityExecution execution) throws Exception {
   System.out.println("Here ");
   execution.take(execution.getActivity().getOutgoingTransitions().get(0));
  }

  public void signal(ActivityExecution arg0, String arg1, Object arg2)
    throws Exception {
   System.out.println("Here signal");   
  }
  
 }
 
 public static class MyActivityBehavior2 implements SignallableActivityBehavior {

  public void execute(ActivityExecution execution) throws Exception {
   System.out.println("Here 2");
   execution.take(execution.getActivity().getOutgoingTransitions().get(0));
  }

  public void signal(ActivityExecution arg0, String arg1, Object arg2)
    throws Exception {
   System.out.println("Here signal 2");
   
  }
  
 }
 
 public static class MyActivityBehavior3 implements SignallableActivityBehavior {

  public void execute(ActivityExecution execution) throws Exception {
   System.out.println("Here 3");
   execution.end();
  }

  public void signal(ActivityExecution arg0, String arg1, Object arg2)
    throws Exception {
   System.out.println("Here signal 3");
  }
  
 }


 public static void main (String [] args) {
  
  
  PvmProcessInstance processInstance = processDefinition.createProcessInstance();
  processInstance.start();
  
  PvmActivity activity = processDefinition.findActivity("find bacon");
  activity.findOutgoingTransition("find eggs");
  
  PvmExecution activityInstance = processInstance.findExecution("find bacon");
  assert activityInstance!=null : "Not Null";
    
  
  
 }

}


Can I get some ideas on how to debug this? I will blog about the help I got and what worked.
Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training