Breeze binds Storm topology components to POJOs. Write Spring beans and use them easily within a cluster.
The SpringSpout
and SpringBolt
classes are configured with a Spring bean and a method signature. The compiler automagically orders the processing steps based on the field names.
Each topology gets a dedicated application context.
Breeze currently supports "none" grouping only.
The kickstarter project demonstrates how to define a topology with the Breeze namespace and regular bean definitions.
<dependency>
<groupId>eu.icolumbo.breeze</groupId>
<artifactId>breeze</artifactId>
<version>1.2.2</version>
</dependency>
The default topology starter can be used for local testing.
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.1</version>
<configuration>
<mainClass>eu.icolumbo.breeze.namespace.TopologyStarter</mainClass>
<arguments>
<argument>demo</argument>
</arguments>
<systemProperties>
<property>
<key>localRun</key>
</property>
</systemProperties>
</configuration>
</plugin>
For each read request on SpringSpout
and for each execute request on SpringBolt
the bean's configured method is invoked.
The scatter feature can split returned arrays and collections into multiple emissions. With scatter enabled a null
return means no emit in which case bolts can act as a filter.
When no output fields are defined the return value is discarded. By default a single output field gives the return value as is. In case of multiple output fields the return value is read by property (getter). More complicated bindings may be defined with SpEL as shown below.
<storm:bolt beanType="com.example.EntityExtractor" signature="read(doc)"
outputFields="nameCount names source">
<breeze:field name="nameCount" expression="Names.Size()"/>
<breeze:field name="source" expression="'x-0.9'"/>
</breeze:bolt>
Exceptions can be configured to cause a read delay.
<breeze:spout id="dumpFeed" beanType="com.example.DumpReader" signature="read()" outputFields="record">
<storm:exception type="java.nio.BufferUnderflowException" delay="500"/>
</breeze:spout>
Storm supports guaranteed message processing. Breeze provides this functionality with an ack and/or fail method signature.
With the following example configuration DumpReader#ok
is called with the hash code on succes and errors are reported with the value of getSerial()
on record at DumpReader#bad
.
<breeze:spout id="dumpFeed" beanType="com.example.DumpReader" signature="read()" outputFields="record">
<breeze:field name="hash" expression="hashCode()"/>
<breeze:transaction ack="ok(hash)" fail="bad(serial)"/>
</breeze:spout>
Storm's Distributed RPC or DRPC can also be configured with the beans XML extension.
<breeze:topology id="demo">
<breeze:rpc signature="dgreet(s)" outputFields="greeting"/>
<breeze:bolt beanType="com.example.Greeter" signature="greet(s)" outputFields="greeting"/>
</breeze:topology>
DRPCClient client = new DRPCClient("storm1.example.com", 3772);
String result = client.execute("dgreet", "World");
System.err.println(result);
- Pascal de Kloe @GitHub
- Jethro Bakker @GitHub
- Jasper van Veghel @GitHub
- Sonatype central repository support