Testing Storm Bolts and Spouts
Since version 0.8.1 Storm's unit testing facilities have been exposed via Java:
- http://storm-project.net/2012/09/06/storm081-released.html
For an example how to use this API have a look here:
- https://github.com/xumingming/storm-lib/blob/master/src/jvm/storm/TestingApiDemo.java
Our approach is to use constructor-injection of a serializable factory into the spout/bolt. The spout/bolt then consults the factory in its open/prepare method. The factory's single responsibility is to encapsulate obtaining the spout/bolt's dependencies in a serializable fashion. This design allows our unit tests to inject fake/test/mock factories which, when consulted, return mock services. In this way we can narrowly unit test the spout/bolts using mocks e.g. Mockito.
Below is a generic example of a bolt and a test for it. I have omitted the implementation of the factory UserNotificationFactory
because it depends on your application. You might use service locators to obtain the services, serialized configuration, HDFS-accessible configuration, or really any way at all to get the correct services, so long as the factory can do it after a serde cycle. You should cover serialization of that class.
Bolt
public class NotifyUserBolt extends BaseBasicBolt {
public static final String NAME = "NotifyUser";
private static final String USER_ID_FIELD_NAME = "userId";
private final UserNotifierFactory factory;
transient private UserNotifier notifier;
public NotifyUserBolt(UserNotifierFactory factory) {
checkNotNull(factory);
this.factory = factory;
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
notifier = factory.createUserNotifier();
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
// This check ensures that the time-dependency imposed by Storm has been observed
checkState(notifier != null, "Unable to execute because user notifier is unavailable. Was this bolt successfully prepared?");
long userId = input.getLongByField(PreviousBolt.USER_ID_FIELD_NAME);
notifier.notifyUser(userId);
collector.emit(new Values(userId));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(USER_ID_FIELD_NAME));
}
}
Test
public class NotifyUserBoltTest {
private NotifyUserBolt bolt;
@Mock
private TopologyContext topologyContext;
@Mock
private UserNotifier notifier;
// This test implementation allows us to get the mock to the unit-under-test.
private class TestFactory implements UserNotifierFactory {
private final UserNotifier notifier;
private TestFactory(UserNotifier notifier) {
this.notifier = notifier;
}
@Override
public UserNotifier createUserNotifier() {
return notifier;
}
}
@Before
public void before() {
MockitoAnnotations.initMocks(this);
// The factory will return our mock `notifier`
bolt = new NotifyUserBolt(new TestFactory(notifier));
// Now the bolt is holding on to our mock and is under our control!
bolt.prepare(new Config(), topologyContext);
}
@Test
public void testExecute() {
long userId = 24;
Tuple tuple = mock(Tuple.class);
when(tuple.getLongByField(PreviousBolt.USER_ID_FIELD_NAME)).thenReturn(userId);
BasicOutputCollector collector = mock(BasicOutputCollector.class);
bolt.execute(tuple, collector);
// Here we just verify a call on `notifier`, but we could have stubbed out behavior befor
// the call to execute, too.
verify(notifier).notifyUser(userId);
verify(collector).emit(new Values(userId));
}
}
One approach we have taken is to move most of the application logic out of bolts and spouts and into objects that we use to do the heavy lifting by instantiating and using them via minimal interfaces. Then we do unit testing on those objects and integration testing, although this does leave a gap.
It turns out to be fairly easy to mock storm objects like OutputDeclarer, Tuple and OutputFieldsDeclarer. Of those, only OutputDeclarer ever sees any side effects so code the OutputDeclarer mock class to be able to answer any tuples and anchors emitted, for example. Your test class can then use instances of those mock classes to easily configure a bolt/spout instance, invoke it and validate the expected side effects.