Building a Bi-Directional Event Bus with the Google Guava « Web Age Dev Zone

:

The Google Guava is an open-source project released under Apache License 2.0 (a very permissive one) that bundles several core Java libraries used by Google software engineers in their projects.  The Guava library is packed with powerful utilities and efficient helper classes that aim at making the lives of Java programmers easier and their work more productive.

Generally, Guava offers a more facile way to interact with some of the existing Java language constructs as well as fills in a number of gaps that exist in the language.

For example, shown below is a piece of code that you need to write in order to create a dynamic proxy for some interface Foo using the regular Java constructs:

Foo foo = (Foo) Proxy.newProxyInstance(
Foo.class.getClassLoader(), new Class<?>[] {Foo.class}, handler);

While for some it may be perfectly fine to write code like this and they do write code this way as they believe that “Where there’s muck there’s brass”, others would argue that there is a great deal of unnecessary noise in the above code where after all the pain of getting your ducks in a row in the newProxyInstance() method, you are still forced to do downcasting to Foo.

In the Hakuna Matata world of Guava, the ugly-looking legacy Java Proxy construct (circa Java 1.3) gets hidden under the generified user-friendly API:

Foo fu = Reflection.newProxy(Foo.class, handler);

This Guava construct also enforces type-safety at design-time making casting superfluous as the type of the Lvalue (fu) is inferred from the first parameter passed to the newProxy()method (Foo.class).

That’s what SpringSource has always been so good at: shielding developers from the internal complexity of various systems with their user-friendly API.  They treat developers as valuable clients of their APIs.


Slowly but surely, Java is getting better with every new release and some parts of the Guava project are becoming less relevant (e.g. its common.io package that has been filling the gaps in pre-JDK 7’s file APIs, or some functional programming constructs that will be retired after Java 8 introduces the Java community to the joys of closures).  Still, other parts of the Guava library will sure to stay around and be further developed.  One of such vibrant parts of Guava is the Event Bus system housed in the com.google.common.eventbus package.

Guava’s Event Bus is a message dispatching system designed to allow publish-subscribe style of communication between components; it is no-nonsense, lightweight and very practical.

Note, that there is no JMS or Message Oriented Middleware infrastructure involved and everything happens within the run-time boundaries of the same Java application.

Building an Event Bus

It takes three components to build an application’s Event Bus.

  1. The Guava’s Event (Message) Bus itself – represented by the EventBus class that provides methods for subscribers (event listeners) to register and unregister themselves with the Bus as well as a method for dispatching events (messages) to the target subscribers
  2. The event (message), which can be any Java object: a Date, a String, your POJO, anything that can be routed by the Bus to your subscriber
  3. The event subscriber (listener) – an arbitrary complexity Java class that must have a specially annotated method for handling events (messages); this method is a call-back function that must return void and take one parameter of the same type as the type of the corresponding event (a Date, a String, your POJO, etc.)

The Event Bus comes in two flavors:

  • Synchronous (backed-up by the EventBus class), and
  • Asynchronous (backed-up by the AsyncEventBus class which extends EventBus)

Both classes are housed in the com.google.common.eventbus package and expose the following API operations:
void register(Object) – registers and caches subscribers for subsequent event handling
void unregister(Object) – undoes the register action
void post(Object) – posts an event (event) to all registered subscribers

Notice that the Event Bus API is totally permissive about the type of the subscribers and events.

So, how do you set up the stage and go about event dispatching?

Event Dispatching on the Bus

Here is the sequence of simple steps to follow to make your Bus busy with events dispatching:

  1. Create an instance of your event (message); as we said earlier, it can be any Java object
  2. Create a subscriber to the event (message)
    In the subscriber class, designate and annotate one method with the com.google.common.eventbus.Subscribe annotation (which simply marks a method as an event handler).  The method’s single input parameter should match the type of the event this subscriber wants to receive from the Bus.  You can use any allowed identifier for the method’s name
    class MyStringEventSubscriber {
    . . .
    @Subscribe
    public void onEvent(String e) {
    // Handle the string passed on by the Event Bus
    }

    . . .
  3. Instantiate the Event Bus
    The synchronous EventBus type gets instantiated as follows:
    EventBus eBus = new EventBus();
    The most common way to instantiate the asynchronous AsyncEventBus type is as follows:
    EventBus eBus =  new AsyncEventBus(java.util.concurrent.Executors.newCachedThreadPool());
  4. Register your subscriber(s) with the Event Bus:
    eBus.register(new MyStringEventSubscriber());
    Note: At this point, the EventBus object will parse your subscriber’s class definition for the presence of the @Subscribe annotated method and register it as the call-back handler for the specific type of an event it can handle
  5. Create an instance of your event (message), set payload as needed and post (send, fire) the event through the Bus:
    MyEvent me = new MyEvent ("Some payload");
    eBus.post(me);

    or
    Date now = new Date();
    eBus.post(now);

For handling situations when no matching event handler is found (e.g. because the target subscriber was unregistered, or an unsupported event was posted), the Event Bus offers an elegant and effective solution – you simply create a “Dead Event” (or “Catch-All-That-Fell-Thru-Cracks”) subscriber which acts as sort of a  dead-letter queue in the conventional message queuing systems for intercepting messages that failed to be delivered to any known subscriber (destination). This subscriber must take as a parameter the predefined com.google.common.eventbus.DeadEvent event, which acts as a wrapper for the actual undelivered event.

public class MyDeadEventsSubscriber {
@Subscribe
public void handleDeadEvent(DeadEvent deadEvent) {
// You get to the actual undelivered event as follows:   deadEvent.getEvent());
}}

You instantiate and register the Dead Event handler with the Event Bus in the usual way:
eBus.register(new MyDeadEventsSubscriber()).

The synchronous event dispatching systems (supported by the EventBus class) are not that exciting – events processing will be serialized forcing the caller to wait until the target subscriber’s handler method returns before dispatching the next enqueued event.  In most cases, it is not what you may want due to poor performance of this arrangement or other considerations.

You will recall that asynchronous processing is backed-up by the AsyncEventBus class, which we will use in our discussion below.

With the AsyncEventBus class, the event handling method of your subscriber should be additionally decorated with the com.google.common.eventbus.AllowConcurrentEvents annotation in order to designate it as an asynchronous call-back event handler.  So, your event handler method should have the following signature:

@Subscribe
@AllowConcurrentEvents
public void someMethod(YourEventBean e) {
......
}

Wrapping up the Event Bus API in a Singleton

One of the practical ways to expose the Event Bus functionality to the clients is to create a singleton of an instance of the AsyncEventBus class. The methods of AsyncEventBus use internal synchronization, so our singleton is thread-safe:


import java.util.concurrent.Executors;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;

public class EventBusService {

private static EventBusService instance = new EventBusService();

public static EventBusService $() {
return instance;
}

private EventBus eventBus = null;

private EventBusService() {
eventBus = new AsyncEventBus(Executors.newCachedThreadPool());
}

public void registerSubscriber(Object subscriber) {
eventBus.register(subscriber);
}

public void unRegisterSubscriber(Object subscriber) {
eventBus.unregister(subscriber);
}

public void postEvent(Object e) {
eventBus.post(e);
}
}

And our singleton is used by clients as follows:
EventBusService.$().registerSubscriber(new MyEventBusSubscriber());
MyMessage msg = new MyMessage();
msg.setPayLoad(....);
EventBusService.$().postEvent(msg);

Making the Event Bus Bi-Directional

You may have already noticed that the Event Bus only works one way: it sends events from the caller down to the subscribers and there is no directly supported mechanism for the system to notify the caller about the outcome of the event handling operation.

Wouldn’t it be great if you could fix this little but annoying shortcoming and make it bi-directional?

And that’s what we are going to do right now.

Here are the technical requirements of what we are going to do:

  1. Re-use the event bean (message) for passing back the subscriber’s response (the result of handling the event)
  2. Use Java standard concurrency API to suspend the caller thread until either of the following occurs:
    • the caller thread times out after waiting for a specified period of time
    • response as a payload to the original event is set and is ready to be fetched by the caller
  3. Encapsulate processing logic in a helper class that can be leveraged in other event types through extension
  4. Provide design-time type safety for the return type

Here is the prototype of the solution.

The Helper Class

The design of this class addresses all four of the above requirements.  The 4th requirement is ensured via the <T> generic type parameter.

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* {@literal <<PROTOTYPE>>}
*
* @author     Mikhail Vladimirov
*/
public class AsyncCallDataBean<T> {
private T response = null;
private CountDownLatch latch = new CountDownLatch(1);

public T getReponse(long timeoutMS) {
try {
   latch.await(timeoutMS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
return response;
}

public void setResponse (T response) {
   this.response = response;
   latch.countDown();
 }
}

The goal of the latch field is two-fold:

  1. Suspend the execution of the current thread when the getResponse () method is called (this is done on the caller’s thread) for the duration passed on as a method parameter
  2. Wake up the current (caller’s) thread when the response is set; the latch.countDown() call will bring the value of the latch down from 1 to 0 which effectively unblocks the thread sitting in the wait state letting the caller get the response via the getResponse() method

So, what you have to do is simply extend your event bean by the AsyncCallDataBean type.

/**
* {@literal <<EVENT>>}
*/
public class RequestResponseDataBean extends AsyncCallDataBean<String>{

private Object payLoad;

public RequestResponseDataBean (Object payLoad) {
   this.payLoad = payLoad;
}

public Object getPayLoad() {
   return payLoad;
 }
}

The usage of the extension is as follows:

EventBusService.$().registerSubscriber(new MyEventBusSubscriber());
RequestResponseDataBean rrb = new RequestResponseDataBean("Some event payload");
EventBusService.$().postEvent(rrb);
long timeOut = 3000; // wait for 3 seconds before timing out
String result = rrb.getReponse (timeOut); // will unblock on time out or call to the rrb.setResponse()
...

// Somewhere on another thread … The handler method of the MyEventBusSubscriber gets the RequestResponseDataBean event and sets its response value: rrb.setReponse ("Some response");
So, the rrb.getReponse (timeOut); call will unblock either on a time out (in which case the response will be null) or when the result is ready for a pick-up, whichever occurs first. There is no conditions for a memory leak as all references to the event bean will eventually be released and it will be GCed.

That’s it. We created a nice micro-architecture for a bi-directional Event Bus.
Happy Bus driving!