Tuesday, June 5, 2012

Using Guava EventBus with Spring

Recently, I started working on VeloxMUD again. The time away has really given me a new perspective on the code. Usually, that perspective goes like this:
  • What the heck was I thinking?
The first thing I looked at was the code that handled connections for the MUD. There is a thread that waits on socket.accept() and then passes the Socket objects off to a handler service (injected by Spring). Although this isn't terrible, I thought I could improve it by using a Publish-Subscribe pattern to decouple the two.

I started my search with Observable and found that it wasn't going to fit. Although the Observer interface isn't bad, the need to derive from Observable wasn't going to work for me. My publisher (the "observable" object) already derived from Thread. And it seemed there had to be a better way.

I stumbled upon Guava's EventBus. Ah ha! This is what I need! But wait, where are all the publish-subscribe guts that I need to implement? Gone... Publishers post events of any arbitrary type, subscribers add a method (with any name) and a @Subscribe annotation. Done. Wow! This is observable done right!

Oops... one small problem. Most of the stuff that needs to register is created and managed by Spring. How do I register my subscribers without tackling all sorts of nasty lifecycle problems?

BeanPostProcessor comes to the rescue! We can provide custom bean-processing code to Spring. With the right piece of code, Spring can do all the necessary registration for us.

/*
 * EventBusPostProcessor.java
 * Author: Patrick Meade
 *
 * EventBusPostProcessor.java is hereby placed into the public domain.
 * Use it as you see fit, and entirely at your own risk.
 */

package com.example.spring.guava;

import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;

/**
 * EventBusPostProcessor registers Spring beans with EventBus. All beans
 * containing Guava's @Subscribe annotation are registered.
 * @author pmeade
 */
public class EventBusPostProcessor implements BeanPostProcessor
{
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName)
            throws BeansException
    {
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName)
            throws BeansException
    {
        // for each method in the bean
        Method[] methods = bean.getClass().getMethods();
        for(Method method : methods)
        {
            // check the annotations on that method
            Annotation[] annotations = method.getAnnotations();
            for(Annotation annotation : annotations)
            {
                // if it contains the Subscribe annotation
                if(annotation.annotationType().equals(Subscribe.class))
                {
                    // register it with the event bus
                    eventBus.register(bean);
                    log.trace("Bean {} containing method {} was subscribed to {}",
                        new Object[] {
                            beanName, method.getName(),
                            EventBus.class.getCanonicalName()
                        });
                    // we only need to register once
                    return bean;
                }
            }
        }
        
        return bean;
    }
    
    @Autowired
    private EventBus eventBus;
}

We add the following XML stanzas to Spring's application context:

<bean id="eventBus"
      class="com.google.common.eventbus.EventBus"/>
<bean id="eventBusPostProcessor"
      class="com.example.spring.guava.EventBusPostProcessor"/>

And it works! Spring registers every bean with a @Subscribe annotation.

2012-06-05 00:50:09,084 TRACE [main] com.example.spring.guava.EventBusPostProcessor (EventBusPostProcessor.java:64) - Bean connectionService containing method handleConnection was subscribed to com.google.common.eventbus.EventBus

8 comments:

  1. Great article and great code. There seems to be very little written about using EventBus w/ Spring. Unfortunately, I think the license for your code really limits it's use, although it's so simple it's a no brainer to implement it independently. Thought about using a different license?

    ReplyDelete
    Replies
    1. I agree with you.

      Honestly, the license was carried over as part of a copy and paste from my VeloxMUD project. VeloxMUD uses the GNU Affero license, but the small snippet here on my blog does not need such a restrictive license. (For reasons you have pointed out.)

      I have modified the post, and placed the code into the public domain.

      Thank you for pointing out the licensing problem; I appreciate it.

      Delete
  2. You mentioned Spring registers every bean with a @Scheduled annotation. Did you mean @Subscribe? Great article, really appreciate your examples and thought process, thanks!

    ReplyDelete
    Replies
    1. You are correct. I must have had "@Scheduled" on the brain when I wrote "And it works! Spring registers every bean with a @Scheduled annotation." I have updated the post to reflect your correction.

      Thanks! :-)

      Delete
  3. Thanks for the hint.
    I think you can reduce the postProcessAfterInitialization to just the register call. Guava will figure out whether there are any @Subscribe annotations. Also, the logic guava applies to find annotated methods is more complex than the suggested. It handles the case where a method is overridden and the annotation is only on the superclass.
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) {
    eventBus.register(bean);
    }

    ReplyDelete
  4. Are you aware that guava's event bus uses strong references to the registered listeners? Your code potentially results in a memory leak. Scoped beans (view or request scoped for example in web environments) would not get garbage collected. You miss a way to unregister the listeners. In Spring environment this is not as easy as one might think. I had this situation and could not use guava for that reason. I created my own solution that uses weak references. Its even faster than the guava one and works the same.
    You can check it out on github if you're interested

    https://github.com/bennidi/mbassador

    ReplyDelete
    Replies
    1. Thank you for this! I hadn't realized that my solution came with a free memory leak. I've posted an updated version of the class in a new post.

      Also, I just started looking at your mbassador library. I haven't had a chance to try it out in practice yet, but hope to do so later this week. Two things I'd like to point out for anybody interested in your library:

      1. Friendly MIT license
      2. Available in the maven central repository:

      <dependency>
        <groupId>net.engio</groupId>
        <artifactId>mbassador</artifactId>
        <version>1.1.1</version>
      </dependency>

      Thanks again! :-)

      Delete
  5. Thanks for sharing. I want warning it use because it is not working with JDK dinamyc proxy (http://docs.oracle.com/javase/7/docs/api/java/lang/reflect/Proxy.html). There some workarounds, for example use in postProcessBeforeInitialization() instead of the method that you use.
    Another problem that I faced, if in the handler I have a problem and it is in the context of a transaction, the transaction is not mark to rollback. So, to solve this problems I chosen spring event listener and finally it works well.

    Thanks for share your experience.

    ReplyDelete