As you have seen, bolts are key components in a Storm cluster. In this chapter, you’ll look at a bolt’s life cycle, some strategies for bolt design, and some examples of how to implement them.
A bolt is a component that takes tuples as input and produces tuples
as output. When writing a bolt, you will usually implement the IRichBolt
interface. Bolts are created on the
client machine, serialized into the topology, and submitted to the master
machine of the cluster. The cluster launches workers that deserialize the
bolt, call prepare
on it, and then
start processing tuples.
To customize a bolt, you should set parameters in its constructor and save them as instance variables so they will be serialized when submitting the bolt to the cluster.
Bolts have the following methods:
declareOutputFields(OutputFieldsDeclarer
declarer)
Declare the output schema for this bolt
prepare(java.util.Map stormConf, TopologyContext
context, OutputCollector collector)
Called just before the bolt starts processing tuples
execute(Tuple input)
Process a single tuple of input
cleanup()
Called when a bolt is going to shut down
Take a look at an example of a bolt that will split sentences into words:
class
SplitSentence
implements
IRichBolt
{
private
OutputCollector
collector
;
public
void
prepare
(
Map
conf
,
TopologyContext
context
,
OutputCollector
collector
)
{
this
.
collector
=
collector
;
}
public
void
execute
(
Tuple
tuple
)
{
String
sentence
=
tuple
.
getString
(
0
);
for
(
String
word:
sentence
.
split
(
" "
))
{
collector
.
emit
(
new
Values
(
word
));
}
}
public
void
cleanup
()
{
}
public
void
declareOutputFields
(
OutputFieldsDeclarer
declarer
)
{
declarer
.
declare
(
new
Fields
(
"word"
));
}
}
As you can see, this bolt is very straightforward. It’s worth mentioning that in this example there is no message guarantee. This means that if the bolt discards a message for some reason—either because it goes down or because it was deliberately discarded programmatically—the spout that generated the message will never be notified, and neither will any of the bolts and spouts in between.
In many cases, you’ll want to guarantee message processing through the entire topology.
As was said before, Storm guarantees that each message sent by a spout will be fully processed by all bolts. This is a design consideration, meaning that you will need to decide whether your bolts guarantee messages.
A topology is a tree of nodes in which messages (tuples) travel
along one or more branches. Each node will ack(tuple)
or fail(tuple)
so that Storm knows when a message
fails and notifies the spout or spouts that produced the message. Since a
Storm topology runs in a highly parallelized environment, the best way to
keep track of the original spout instance is to include a reference to the
originating spout in the message tuple. This technique is called
Anchoring. Change the SplitSentence
bolt that you just saw, so that it
guarantees message processing.
class
SplitSentence
implements
IRichBolt
{
private
OutputCollector
collector
;
public
void
prepare
(
Map
conf
,
TopologyContext
context
,
OutputCollector
collector
)
{
this
.
collector
=
collector
;
}
public
void
execute
(
Tuple
tuple
)
{
String
sentence
=
tuple
.
getString
(
0
);
for
(
String
word:
sentence
.
split
(
" "
))
{
collector
.
emit
(
tuple
,
new
Values
(
word
));
}
collector
.
ack
(
tuple
);
}
public
void
cleanup
()
{
}
public
void
declareOutputFields
(
OutputFieldsDeclarer
declarer
)
{
declarer
.
declare
(
new
Fields
(
"word"
));
}
}
The exact line where the anchoring happens is at the
collector.emit()
statement. As mentioned earlier,
passing along the tuple enables Storm to keep track of the originating
spouts. collector.ack(tuple)
and
collector.fail(tuple)
tell a spout what
happened to each message. Storm considers a tuple coming of a spout fully
processed when every message in the tree has been processed. A tuple is
considered failed when its tree of messages fails to be fully processed
within a configurable timeout. The default is 30 seconds.
You can change this timeout by changing the Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
configuration on the topology.
Of course, the spout needs to take care of the case when a message fails and retry or discard the message accordingly.
Every tuple you process must be acked or failed. Storm uses memory to track each tuple, so if you don’t ack/fail every tuple, the task will eventually run out of memory.
A bolt can emit tuples to multiple streams using emit(streamId, tuple)
, where streamId
is a string that identifies the stream.
Then, in the TopologyBuilder
, you can
decide which stream to subscribe to.
To use a bolt to join or aggregate streams, you’ll need to buffer
tuples in memory. In order to message guarantee in this scenario you have
to anchor the stream to more than one tuple. This is done by calling
emit
with a List
of tuples.
...
List
<
Tuple
>
anchors
=
new
ArrayList
<
Tuple
>();
anchors
.
add
(
tuple1
);
anchors
.
add
(
tuple2
);
_collector
.
emit
(
anchors
,
values
);
...
That way, any time a bolt acks or fails, it notifies the tree, and because the stream is anchored to more than one tuple, all spouts involved are notified.
As you probably noticed, there are lots of use cases in which you
need message guarantees. To make things easier, Storm provides another
interface for bolts called IBasicBolt
,
which encapsulates the pattern of calling ack
right after the execute
method. An implementation of this
interface, BaseBasicBolt
, is used to do
the acking automatically.
class
SplitSentence
extends
BaseBasicBolt
{
public
void
execute
(
Tuple
tuple
,
BasicOutputCollector
collector
)
{
String
sentence
=
tuple
.
getString
(
0
);
for
(
String
word:
sentence
.
split
(
" "
))
{
collector
.
emit
(
new
Values
(
word
));
}
}
public
void
declareOutputFields
(
OutputFieldsDeclarer
declarer
)
{
declarer
.
declare
(
new
Fields
(
"word"
));
}
}
Tuples emitted to BasicOutputCollector
are automatically
anchored to the input tuple.