Reliability of data processing

One of the USPs of Storm is guaranteed message processing that makes it a very lucrative solution. Having said that, we as programmers have to make certain modeling to use or not use to the reliability provided for by Storm.

First of all, it's very important to understand what happens when a tuple is emitted into the topology and how its corresponding DAG is constructed. The following diagram captures a typical case in this context:

Reliability of data processing

Here, the function of the topology is very clear: every emitted tuple has to be filtered, calculated, and written to the HDFS and database. Now, let's take an implication of DAG with respect to a single tuple being emitted into the topology.

Every single tuple that is emitted into the topology moves as follows:

  • Spout A -> Bolt A -> Bolt D -> Database
  • Spout A -> Bolt B -> Bolt D -> Database
  • Spout A -> Bolt C -> HDFS

So, one tuple from spout A is replicated at step 1 into three tuples that move to Bolt A, Bolt B, and Bolt 3 (three tuples). In the next step, there's no change in number, and a single tuple is resonated to the next step as single tuple only (three tuples, total 3+3 = six tuples). In the next step at Bolt D, two streams are joined so it's like it consumes to tuples but emits one so it would be like 6 +1 = 7 tuples.

So, for one event to be successfully processed, Storm has to internally generate, propagate, and manage multiple tuples based on the topology structure, parallelism, and grouping. In the previous example, we have assumed all parallelism to be 1 and shuffle grouping binding of bolts and spouts. Thus, we kept it very simple for the sake of illustration.

Another noteworthy aspect is that to acknowledge an event to be successfully processed, Storm accepts ack() from all nodes in the DAG the tuple is being executed. All the ack() should arrive within the Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS configured in Storm; the default value of the same attribute is 30 seconds. If ack() doesn't happen in 30 seconds, Storm assumes it as a failure and re-emits the tuple into the topology.

Replay the failed messages based on how we design our topologies. If we create reliable topologies with anchoring, then Storm replays all failed messages. In case of unreliable topologies (as the name suggests), replay does not happen.

The concept of anchoring and reliability

So far, we have understood very well the function of the nextTuple() method in the spout. It fetches the next event that becomes available at the source and emits it into the topology. The open() method of the spout holds the definition of the spout collector that's used to actually emit the tuple from the spout into the topology. Every tuple/event that is emitted by the spout into the topology is tagged with a message ID, which is the identifier for the tuple. Whenever the spouts emit the messages into the topology, it tags them with messageId, as shown here:

_collector.emit(new Values(...),msgId);

This messageId tag is actually an identifier for the tuple. Storm tracks and tags the tuples using this messageID as it plays the tuple through the bolt tree.

In case the event is successfully played through the DAG, it's acknowledged. Please note that this acknowledging of the tuple is done at the spout level by the spout that emitted the tuple. If the tuple is timed out, then the originating spout executes a fail() method on the tuple.

Now, we need to understand a very important aspect of replay. The question is, "how does Storm replay the tuples that are already emitted?" The answer is that the spout reads a tuple from the queue, but the tuple remains in the queue until it's successfully played and then once ack() is received only then does Storm acknowledge the tuple to the queue and thus removes the same. In case of fail(), the message is queued back to the queue for consumption and replay:

The concept of anchoring and reliability

As depicted in the diagram, the failed tuples are propagated back to the spout and are lined back into the queue. When the messages are read from the queue by the spout, they are marked as un-acked. During this duration, they are still in the queue but not available to be read by any other spout/topology/client. If such a un-acked message is played successfully through the topology it's acked and removed from the queue. In case the event fails to complete its execution through the topology, it's failed and re-queued and made available for consumption to the spout again.

One of the most important points to note is anchoring. It's tagging of all edges of the DAG through which the tuple is played. Anchoring is a choice of the developer who wires in the topology, but Storm can replay the messages only in anchored topologies. Now the two logical questions to be asked are:

  • How to do the anchoring of tuples along the DAG of topology during execution?
  • Why would a developer create an un-anchored topology?

Let's answer these one by one. Anchoring is pretty simple and we as developers have to be cognizant of the same during emitting the tuples from the bolts and spouts. Here are two code snippets that would make it very clear:

_collector.emit(new Values(word));

The above snippet is the un-anchored version from unreliable topologies, where the tuples could not be replayed in the event of failure.

//simple anchoring
_collector.emit(new Values(1, 2, 3),msgID);
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
//multiple anchoring output tuple is anchored to two //input tuples viz tuple1 and tuple2
_collector.emit(anchors, new Values(1, 2, 3));

This second snippet captures the anchored version, where the tuples are tied together using the anchor list, and in case of any failure, this reliable topology would be able to replay all the events.

Now coming to our second question, having anchoring and reliable topology is a little expensive in terms of book keeping and message size that's being propagated through the topology. There could be numerous scenarios where reliability is not required and hence there are unreliable topologies where no anchoring is done.

Storm bolts can be broadly classified into two segments: the basic bolts and the ones that do the aggregation and joins. The basic bolts are plain and clearly depicted in the following diagram; prepare() sets up the output collector to which tuples are emitted immediately after they are processed. All the bolts following this general pattern are actually implemented using the IBasicBolt interface. The diagram also captures the second variety of not-so-simple Storm bolts that perform the tasks such as aggregation and joins. The tuples are anchored to multiple input tuples and sometimes they are emitted after a preprogrammed delay such as in the case of aggregates:

The concept of anchoring and reliability

Now that we have understood Storm reliability, let's touch upon the most important component that makes sure that all updates about ack or fail reach the spout successfully. They are acker processes.

The Storm acking framework

Now that we have understood Storm reliability, let's discuss the most important component that makes sure that all updates about ack or fail reach the spout successfully. They are acker processes. These are lightweight tasks that coexist with spouts and bolts, and are responsible for acking all successfully executed messages to the spout. There, the number is controlled by a property in Storm config called TOPLOGY_ACKER_EXECUTORS; by default, it's equal to the number of workers defined in the topology. If the topology operates at a less TPS, the num ackers should be reduced. However, for high-TPS topologies, this number should be increased so that tuples are acknowledged at the rate of arrival and processing.

Ackers follow the following algorithm to track the completion of the tuple tree for each tuple being played through the topology:

  • Maintain a checksum hash
  • For each executed tuple XOR it to the value of checksum hash

If all tuples in the tuple tree are acked the checksum would be 0; or else, it would be a non-zero value, the latter denoting a failure in the topology. Its working is largely driven and controlled using tick tuple:

The Storm acking framework

The preceding diagram captures the flow of the acker process. Note that it has a book keeping component, ledger or rotatingMap, and the entire flow of things is controlled on the basis of tick tuples.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset