Chapter 5. Bolts

As you have seen, bolts are key components in a Storm cluster. In this chapter, you’ll look at a bolt’s life cycle, some strategies for bolt design, and some examples of how to implement them.

Bolt Lifecycle

A bolt is a component that takes tuples as input and produces tuples as output. When writing a bolt, you will usually implement the IRichBolt interface. Bolts are created on the client machine, serialized into the topology, and submitted to the master machine of the cluster. The cluster launches workers that deserialize the bolt, call prepare on it, and then start processing tuples.

Tip

To customize a bolt, you should set parameters in its constructor and save them as instance variables so they will be serialized when submitting the bolt to the cluster.

Bolt Structure

Bolts have the following methods:

declareOutputFields(OutputFieldsDeclarer declarer)

Declare the output schema for this bolt

prepare(java.util.Map stormConf, TopologyContext context, OutputCollector collector)

Called just before the bolt starts processing tuples

execute(Tuple input)

Process a single tuple of input

cleanup()

Called when a bolt is going to shut down

Take a look at an example of a bolt that will split sentences into words:

class SplitSentence implements IRichBolt {
    private OutputCollector collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for(String word: sentence.split(" ")) {
            collector.emit(new Values(word));
        }
    }

    public void cleanup() {
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

As you can see, this bolt is very straightforward. It’s worth mentioning that in this example there is no message guarantee. This means that if the bolt discards a message for some reason—either because it goes down or because it was deliberately discarded programmatically—the spout that generated the message will never be notified, and neither will any of the bolts and spouts in between.

In many cases, you’ll want to guarantee message processing through the entire topology.

Reliable versus Unreliable Bolts

As was said before, Storm guarantees that each message sent by a spout will be fully processed by all bolts. This is a design consideration, meaning that you will need to decide whether your bolts guarantee messages.

A topology is a tree of nodes in which messages (tuples) travel along one or more branches. Each node will ack(tuple) or fail(tuple) so that Storm knows when a message fails and notifies the spout or spouts that produced the message. Since a Storm topology runs in a highly parallelized environment, the best way to keep track of the original spout instance is to include a reference to the originating spout in the message tuple. This technique is called Anchoring. Change the SplitSentence bolt that you just saw, so that it guarantees message processing.

class SplitSentence implements IRichBolt {
    private OutputCollector collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for(String word: sentence.split(" ")) {
            collector.emit(tuple, new Values(word));
        }
        collector.ack(tuple);
    }

    public void cleanup() {
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

The exact line where the anchoring happens is at the collector.emit() statement. As mentioned earlier, passing along the tuple enables Storm to keep track of the originating spouts. collector.ack(tuple) and collector.fail(tuple) tell a spout what happened to each message. Storm considers a tuple coming of a spout fully processed when every message in the tree has been processed. A tuple is considered failed when its tree of messages fails to be fully processed within a configurable timeout. The default is 30 seconds.

Tip

You can change this timeout by changing the Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS configuration on the topology.

Of course, the spout needs to take care of the case when a message fails and retry or discard the message accordingly.

Note

Every tuple you process must be acked or failed. Storm uses memory to track each tuple, so if you don’t ack/fail every tuple, the task will eventually run out of memory.

Multiple Streams

A bolt can emit tuples to multiple streams using emit(streamId, tuple), where streamId is a string that identifies the stream. Then, in the TopologyBuilder, you can decide which stream to subscribe to.

Multiple Anchoring

To use a bolt to join or aggregate streams, you’ll need to buffer tuples in memory. In order to message guarantee in this scenario you have to anchor the stream to more than one tuple. This is done by calling emit with a List of tuples.

...
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, values);
...

That way, any time a bolt acks or fails, it notifies the tree, and because the stream is anchored to more than one tuple, all spouts involved are notified.

Using IBasicBolt to Ack Automatically

As you probably noticed, there are lots of use cases in which you need message guarantees. To make things easier, Storm provides another interface for bolts called IBasicBolt, which encapsulates the pattern of calling ack right after the execute method. An implementation of this interface, BaseBasicBolt, is used to do the acking automatically.

class SplitSentence extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String sentence = tuple.getString(0);
        for(String word: sentence.split(" ")) {
            collector.emit(new Values(word));
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

Note

Tuples emitted to BasicOutputCollector are automatically anchored to the input tuple.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset