Chapter 7. Using Non-JVM Languages with Storm

Sometimes you want to use languages that aren’t based on the JVM to implement a Storm project, either because you feel more comfortable with another language or you want to use a library written in that language.

Storm is implemented in Java, and all the spouts and bolts that you’ve seen in this book were written in Java as well. So is it possible to use languages like Python, Ruby, or even JavaScript to write spouts and bolts? The answer is yes! It is possible using something called the multilang protocol.

The multilang protocol is a special protocol implemented in Storm that uses standard input and standard output as a channel of communication with a process that does the job of a spout or a bolt. Messages are passed through this channel encoded as JSON or as lines of plain text.

Let’s take a look at a simple example of a spout and a bolt in a non-JVM language. You’ll have a spout that generates numbers from 1 to 10,000 and a bolt that filters for prime numbers, both written in PHP.

Note

In this example, we check for prime numbers in a naive way. There are much better implementations, but they are also more complex and out of the scope of this example.

There is an official implementation of a PHP DSL for Storm. In this chapter, we’ll show our implementation as an example. First of all, define the topology.

...
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("numbers-generator", new NumberGeneratorSpout(1, 10000));
builder.setBolt("prime-numbers-filter", new PrimeNumbersFilterBolt()).shuffleGrouping("numbers-generator");
StormTopology topology = builder.createTopology();
...

Note

There is a way to specify topologies in a non-JVM language. Since Storm topologies are just Thrift structures, and Nimbus is a Thrift daemon, you can create and submit topologies in any language you want. But this it out of the scope of this book.

Nothing new here. Let’s see the implementation of NumbersGeneratorSpout.

public class NumberGeneratorSpout extends ShellSpout implements IRichSpout {
    public NumberGeneratorSpout(Integer from, Integer to) {
    super("php", "-f", "NumberGeneratorSpout.php", from.toString(), to
        .toString());
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("number"));
    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

As you have probably noticed, this spout extends ShellSpout. This is a special class that comes with Storm and helps you run and control spouts written in other languages. In this case, it tells Storm how to execute your PHP script.

The NumberGeneratorSpout PHP script emits tuples to the standard output, and reads standard input to process acks or fails.

Before going over the implementation of the NumberGeneratorSpout.php script, look in more detail at how the multilang protocol works.

The spout generates sequential numbers counting from the from parameter up to the to parameter, passed to the constructor.

Next, look at PrimeNumbersFilterBolt. This class implements the shell mentioned earlier. It tells Storm how to execute your PHP script. Storm provides a special class for this purpose called ShellBolt, where the only thing you have to do is to indicate how to run the script and declare the fields that it emits.

public class PrimeNumbersFilterBolt extends ShellBolt implements IRichBolt {
    public PrimeNumbersFilterBolt() {
        super("php", "-f", "PrimeNumbersFilterBolt.php");
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("number"));
    }
}

In the constructor, just tell Storm how to run the PHP script. This is the equivalent of the following bash command:

php -f PrimeNumbersFilterBolt.php

The PrimeNumbersFilterBolt PHP script reads tuples from standard input, processes them, and emits, acks, or fails to standard output. Before going over the implementation of the PrimeNumbersFilterBolt.php script, let’s look in more detail at how the multilang protocol works.

The Multilang Protocol Specification

The protocol relies on standard input and standard output as a channel of communication between processes. Follow the steps a script needs to take in order to work:

  1. Initiate a handshake.

  2. Start looping.

  3. Read or write tuples.

Tip

There is a special way of logging from your script that uses Storm’s built-in logging mechanism, so you don’t need to implement your own logging system.

Let’s take a look at the detail of each of these steps, and how to implement it using a PHP script.

Initial Handshake

In order to control the process (to start and stop it), Storm needs to know the process ID (PID) of the script it is executing. According to the multilang protocol, the first thing that will happen when your process starts is that Storm will send a JSON object with storm configuration, topology context, and a PID directory to standard input. It will look something like the following code block:

{
    "conf": {
        "topology.message.timeout.secs": 3,
        // etc
    },
    "context": {
        "task->component": {
            "1": "example-spout",
            "2": "__acker",
            "3": "example-bolt"
        },
        "taskid": 3
    },
    "pidDir": "..."
}

The process must create an empty file at the path specified by pidDir, whose name is the process ID, and write the PID to standard out as a JSON object.

{"pid": 1234}

For example, if you receive /tmp/example and the PID of your script is 123, you should create an empty file at /tmp/example/123 and print the lines {"pid": 123}n and end to standard output. This is how Storm keeps track of the PID and kills the process when it shuts down. Let’s see how to do it in PHP:

$config = json_decode(read_msg(), true);
$heartbeatdir = $config['pidDir'];

$pid = getmypid();
fclose(fopen("$heartbeatdir/$pid", "w"));
storm_send(["pid"=>$pid]);
flush();

You’ve created a function called read_msg to handle reading messages from standard input. The multilang protocol states that messages can be either a single line or multiple lines encoded in JSON. A message is complete when Storm sends a single line with the word end .

function read_msg() {
    $msg = "";
    while(true) {
        $l = fgets(STDIN);
        $line = substr($l,0,-1);
        if($line=="end") {
            break;
        }
        $msg = "$msg$line
";
    }
    return substr($msg, 0, -1);
}

function storm_send($json) {
    write_line(json_encode($json));
    write_line("end");
}

function write_line($line) {
    echo("$line
");
}

Warning

The use of flush() is very important; there might be a buffer that won’t be flushed until a specific amount of characters are accumulated. This means that your script can hang forever waiting for an input from Storm, which it will never receive because Storm is in turn waiting on an output from your script. So it’s important to make sure that when your script outputs something it gets flushed immediately.

Start Looping and Read or Write Tuples

This is the most important step, where all the work gets done. The implementation of this step depends on if you are developing a spout or a bolt.

In case of a spout, you should start emitting tuples. In case of a bolt, loop and read tuples, process them and emit, ack or fail.

Let’s see the implementation of the spout that emits numbers.

$from = intval($argv[1]);
$to = intval($argv[2]);

while(true) {
    $msg = read_msg();

    $cmd = json_decode($msg, true);
    if ($cmd['command']=='next') {
        if ($from<$to) {
            storm_emit(array("$from"));
            $task_ids = read_msg();
            $from++;
        } else {
            sleep(1);
        }
    }
    storm_sync();
}

Get the from and to from the command-line arguments and start iterating. Everytime you get a next message from Storm, it means you are ready to emit a new tuple.

Once you’ve sent all the numbers and you don’t have more tuples to send, just sleep for some time.

In order to make sure the script is ready for the next tuple, Storm waits for the line sync before sending the next one. To read a command, just call read_msg() and JSON decode it.

In the case of bolts, this is a little different.

while(true) {
    $msg = read_msg();
    $tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING);
    if (!empty($tuple["id"])) {
        if (isPrime($tuple["tuple"][0])) {
            storm_emit(array($tuple["tuple"][0]));
        }
        storm_ack($tuple["id"]);
    }
}

Loop, reading tuples from standard input. As soon as you get a message, JSON decodes it. If it is a tuple, process it, checking if it is a prime number.

In case it is a prime number, emit that number; otherwise just ignore it.

In any case, ack the tuple.

Tip

The use of JSON_BIGINT_AS_STRING in the json_decode function is a workaround for a conversion problem between Java and PHP. Java sends some very big numbers, and they are decoded with less precision in PHP, which can cause problems. To work around this problem, tell PHP to decode big numbers as strings and to avoid using double quotes when printing numbers in JSON messages. PHP 5.4.0 or higher is required for this parameter to work.

Messages like emit, ack, fail, and log have the following structure:

Emit

{
    "command": "emit",
    "tuple": ["foo", "bar"]
}

Where the array has the values you are emitting for the tuple.

Ack

{
    "command": "ack",
    "id": 123456789
}

Where the id is the ID of the tuple you are processing.

Fail

{
    "command": "fail",
    "id": 123456789
}

Same as emit, the id is the ID of the tuple you are processing.

Log

{
    "command": "log",
    "msg": "some message to be logged by storm."
}

Putting it all together gives you the following PHP scripts.

For your spout:

<?php
function read_msg() {
    $msg = "";
    while(true) {
        $l = fgets(STDIN);
        $line = substr($l,0,-1);
        if ($line=="end") {
            break;
        }
        $msg = "$msg$line
";
    }
    return substr($msg, 0, -1);
}

function write_line($line) {
    echo("$line
");
}

function storm_emit($tuple) {
    $msg = array("command" => "emit", "tuple" => $tuple);
    storm_send($msg);
}

function storm_send($json) {
    write_line(json_encode($json));
    write_line("end");
}

function storm_sync() {
    storm_send(array("command" => "sync"));
}

function storm_log($msg) {
    $msg = array("command" => "log", "msg" => $msg);
    storm_send($msg);
    flush();
}

$config = json_decode(read_msg(), true);
$heartbeatdir = $config['pidDir'];

$pid = getmypid();
fclose(fopen("$heartbeatdir/$pid", "w"));
storm_send(["pid"=>$pid]);
flush();

$from = intval($argv[1]);
$to = intval($argv[2]);

while(true) {
    $msg = read_msg();

    $cmd = json_decode($msg, true);
    if ($cmd['command']=='next') {
        if ($from<$to) {
            storm_emit(array("$from"));
            $task_ids = read_msg();
            $from++;
        } else {
            sleep(1);
        }
    }
    storm_sync();
}
?>

And for your bolt:

<?php
function isPrime($number) {
    if ($number < 2) {
        return false;
    }
    if ($number==2) {
        return true;
    }
    for ($i=2; $i<=$number-1; $i++) {
        if ($number % $i == 0) {
            return false;
        }
    }
    return true;
}
function read_msg() {
    $msg = "";
    while(true) {
        $l = fgets(STDIN);
        $line = substr($l,0,-1);
        if ($line=="end") {
            break;
        }
        $msg = "$msg$line
";
    }
    return substr($msg, 0, -1);
}

function write_line($line) {
    echo("$line
");
}

function storm_emit($tuple) {
    $msg = array("command" => "emit", "tuple" => $tuple);
    storm_send($msg);
}

function storm_send($json) {
    write_line(json_encode($json));
    write_line("end");
}


function storm_ack($id) {
    storm_send(["command"=>"ack", "id"=>"$id"]);
}

function storm_log($msg) {
    $msg = array("command" => "log", "msg" => "$msg");
    storm_send($msg);
}

$config = json_decode(read_msg(), true);
$heartbeatdir = $config['pidDir'];

$pid = getmypid();
fclose(fopen("$heartbeatdir/$pid", "w"));
storm_send(["pid"=>$pid]);
flush();

while(true) {
    $msg = read_msg();
    $tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING);
    if (!empty($tuple["id"])) {
        if (isPrime($tuple["tuple"][0])) {
            storm_emit(array($tuple["tuple"][0]));
        }
        storm_ack($tuple["id"]);
    }
}
?>

Warning

It is important to put all these scripts in a special folder called multilang/resources in your project directory. This folder gets included in the jar file that is sent to the workers. If you don’t put the scripts in that folder, Storm won’t be able to run them and will report an error.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset