Sometimes you want to use languages that aren’t based on the JVM to implement a Storm project, either because you feel more comfortable with another language or you want to use a library written in that language.
Storm is implemented in Java, and all the spouts and bolts that you’ve seen in this book were written in Java as well. So is it possible to use languages like Python, Ruby, or even JavaScript to write spouts and bolts? The answer is yes! It is possible using something called the multilang protocol.
The multilang protocol is a special protocol implemented in Storm that uses standard input and standard output as a channel of communication with a process that does the job of a spout or a bolt. Messages are passed through this channel encoded as JSON or as lines of plain text.
Let’s take a look at a simple example of a spout and a bolt in a non-JVM language. You’ll have a spout that generates numbers from 1 to 10,000 and a bolt that filters for prime numbers, both written in PHP.
In this example, we check for prime numbers in a naive way. There are much better implementations, but they are also more complex and out of the scope of this example.
There is an official implementation of a PHP DSL for Storm. In this chapter, we’ll show our implementation as an example. First of all, define the topology.
...
TopologyBuilder
builder
=
new
TopologyBuilder
();
builder
.
setSpout
(
"numbers-generator"
,
new
NumberGeneratorSpout
(
1
,
10000
));
builder
.
setBolt
(
"prime-numbers-filter"
,
new
PrimeNumbersFilterBolt
()).
shuffleGrouping
(
"numbers-generator"
);
StormTopology
topology
=
builder
.
createTopology
();
...
There is a way to specify topologies in a non-JVM language. Since Storm topologies are just Thrift structures, and Nimbus is a Thrift daemon, you can create and submit topologies in any language you want. But this it out of the scope of this book.
Nothing new here. Let’s see the implementation of NumbersGeneratorSpout
.
public
class
NumberGeneratorSpout
extends
ShellSpout
implements
IRichSpout
{
public
NumberGeneratorSpout
(
Integer
from
,
Integer
to
)
{
super
(
"php"
,
"-f"
,
"NumberGeneratorSpout.php"
,
from
.
toString
(),
to
.
toString
());
}
public
void
declareOutputFields
(
OutputFieldsDeclarer
declarer
)
{
declarer
.
declare
(
new
Fields
(
"number"
));
}
public
Map
<
String
,
Object
>
getComponentConfiguration
()
{
return
null
;
}
}
As you have probably noticed, this spout extends ShellSpout
. This is a special class that comes
with Storm and helps you run and control spouts written in other languages.
In this case, it tells Storm how to execute your PHP script.
The NumberGeneratorSpout PHP script emits tuples to the standard output, and reads standard input to process acks or fails.
Before going over the implementation of the NumberGeneratorSpout.php
script, look in more
detail at how the multilang protocol works.
The spout generates sequential numbers counting from the from
parameter up to the to
parameter, passed to the constructor.
Next, look at PrimeNumbersFilterBolt
. This class implements the
shell mentioned earlier. It tells Storm how to execute your PHP script.
Storm provides a special class for this purpose called ShellBolt
, where the only thing you have to do is
to indicate how to run the script and declare the fields that it
emits.
public
class
PrimeNumbersFilterBolt
extends
ShellBolt
implements
IRichBolt
{
public
PrimeNumbersFilterBolt
()
{
super
(
"php"
,
"-f"
,
"PrimeNumbersFilterBolt.php"
);
}
public
void
declareOutputFields
(
OutputFieldsDeclarer
declarer
)
{
declarer
.
declare
(
new
Fields
(
"number"
));
}
}
In the constructor, just tell Storm how to run the PHP script. This is the equivalent of the following bash command:
php -f PrimeNumbersFilterBolt.php
The PrimeNumbersFilterBolt PHP script reads tuples from standard
input, processes them, and emits, acks, or fails to standard output. Before
going over the implementation of the PrimeNumbersFilterBolt.php
script, let’s look in
more detail at how the multilang protocol works.
The protocol relies on standard input and standard output as a channel of communication between processes. Follow the steps a script needs to take in order to work:
Initiate a handshake.
Start looping.
Read or write tuples.
There is a special way of logging from your script that uses Storm’s built-in logging mechanism, so you don’t need to implement your own logging system.
Let’s take a look at the detail of each of these steps, and how to implement it using a PHP script.
In order to control the process (to start and stop it), Storm needs to know the process ID (PID) of the script it is executing. According to the multilang protocol, the first thing that will happen when your process starts is that Storm will send a JSON object with storm configuration, topology context, and a PID directory to standard input. It will look something like the following code block:
{
"conf"
:
{
"topology.message.timeout.secs"
:
3
,
//
etc
},
"context"
:
{
"task->component"
:
{
"1"
:
"example-spout"
,
"2"
:
"__acker"
,
"3"
:
"example-bolt"
},
"taskid"
:
3
},
"pidDir"
:
"..."
}
The process must create an empty file at the path specified by
pidDir
, whose name is the process ID,
and write the PID to standard out as a JSON object.
{
"pid"
:
1234
}
For example, if you receive /tmp/example
and the PID of your script is
123
, you should create an empty file
at /tmp/example/123
and print the
lines {"pid": 123}n
and end
to standard output. This is how Storm
keeps track of the PID and kills the process when it shuts down. Let’s
see how to do it in PHP:
$config
=
json_decode
(
read_msg
(),
true
);
$heartbeatdir
=
$config
[
'pidDir'
];
$pid
=
getmypid
();
fclose
(
fopen
(
"
$heartbeatdir
/
$pid
"
,
"w"
));
storm_send
([
"pid"
=>
$pid
]);
flush
();
You’ve created a function called read_msg
to handle reading messages from
standard input. The multilang protocol states that messages can be
either a single line or multiple lines encoded in JSON. A message is
complete when Storm sends a single line with the word end
.
function
read_msg
()
{
$msg
=
""
;
while
(
true
)
{
$l
=
fgets
(
STDIN
);
$line
=
substr
(
$l
,
0
,
-
1
);
if
(
$line
==
"end"
)
{
break
;
}
$msg
=
"
$msg$line
"
;
}
return
substr
(
$msg
,
0
,
-
1
);
}
function
storm_send
(
$json
)
{
write_line
(
json_encode
(
$json
));
write_line
(
"end"
);
}
function
write_line
(
$line
)
{
echo
(
"
$line
"
);
}
The use of flush()
is very
important; there might be a buffer that won’t be flushed until a
specific amount of characters are accumulated. This means that your
script can hang forever waiting for an input from Storm, which it will
never receive because Storm is in turn waiting on an output from your
script. So it’s important to make sure that when your script outputs
something it gets flushed immediately.
This is the most important step, where all the work gets done. The implementation of this step depends on if you are developing a spout or a bolt.
In case of a spout, you should start emitting tuples. In case of a bolt, loop and read tuples, process them and emit, ack or fail.
Let’s see the implementation of the spout that emits numbers.
$from
=
intval
(
$argv
[
1
]);
$to
=
intval
(
$argv
[
2
]);
while
(
true
)
{
$msg
=
read_msg
();
$cmd
=
json_decode
(
$msg
,
true
);
if
(
$cmd
[
'command'
]
==
'next'
)
{
if
(
$from
<
$to
)
{
storm_emit
(
array
(
"
$from
"
));
$task_ids
=
read_msg
();
$from
++
;
}
else
{
sleep
(
1
);
}
}
storm_sync
();
}
Get the from
and to
from the command-line arguments and start
iterating. Everytime you get a next
message from Storm, it means you are ready to emit a new tuple.
Once you’ve sent all the numbers and you don’t have more tuples to send, just sleep for some time.
In order to make sure the script is ready for the next tuple,
Storm waits for the line sync
before sending the next one. To read a command, just call read_msg()
and JSON decode it.
In the case of bolts, this is a little different.
while
(
true
)
{
$msg
=
read_msg
();
$tuple
=
json_decode
(
$msg
,
true
,
512
,
JSON_BIGINT_AS_STRING
);
if
(
!
empty
(
$tuple
[
"id"
]))
{
if
(
isPrime
(
$tuple
[
"tuple"
][
0
]))
{
storm_emit
(
array
(
$tuple
[
"tuple"
][
0
]));
}
storm_ack
(
$tuple
[
"id"
]);
}
}
Loop, reading tuples from standard input. As soon as you get a message, JSON decodes it. If it is a tuple, process it, checking if it is a prime number.
In case it is a prime number, emit that number; otherwise just ignore it.
In any case, ack the tuple.
The use of JSON_BIGINT_AS_STRING
in the
json_decode
function is a workaround for a
conversion problem between Java and PHP. Java sends some very big
numbers, and they are decoded with less precision in PHP, which can
cause problems. To work around this problem, tell PHP to decode big
numbers as strings and to avoid using double quotes when printing
numbers in JSON messages. PHP 5.4.0 or higher is required for this
parameter to work.
Messages like emit
, ack
, fail
,
and log
have the following
structure:
{
"command"
:
"emit"
,
"tuple"
:
[
"foo"
,
"bar"
]
}
Where the array has the values you are emitting for the tuple.
{
"command"
:
"fail"
,
"id"
:
123456789
}
Same as emit
, the id
is the ID of the tuple you are
processing.
{
"command"
:
"log"
,
"msg"
:
"some message to be logged by storm."
}
Putting it all together gives you the following PHP scripts.
For your spout:
<?
php
function
read_msg
()
{
$msg
=
""
;
while
(
true
)
{
$l
=
fgets
(
STDIN
);
$line
=
substr
(
$l
,
0
,
-
1
);
if
(
$line
==
"end"
)
{
break
;
}
$msg
=
"
$msg$line
"
;
}
return
substr
(
$msg
,
0
,
-
1
);
}
function
write_line
(
$line
)
{
echo
(
"
$line
"
);
}
function
storm_emit
(
$tuple
)
{
$msg
=
array
(
"command"
=>
"emit"
,
"tuple"
=>
$tuple
);
storm_send
(
$msg
);
}
function
storm_send
(
$json
)
{
write_line
(
json_encode
(
$json
));
write_line
(
"end"
);
}
function
storm_sync
()
{
storm_send
(
array
(
"command"
=>
"sync"
));
}
function
storm_log
(
$msg
)
{
$msg
=
array
(
"command"
=>
"log"
,
"msg"
=>
$msg
);
storm_send
(
$msg
);
flush
();
}
$config
=
json_decode
(
read_msg
(),
true
);
$heartbeatdir
=
$config
[
'pidDir'
];
$pid
=
getmypid
();
fclose
(
fopen
(
"
$heartbeatdir
/
$pid
"
,
"w"
));
storm_send
([
"pid"
=>
$pid
]);
flush
();
$from
=
intval
(
$argv
[
1
]);
$to
=
intval
(
$argv
[
2
]);
while
(
true
)
{
$msg
=
read_msg
();
$cmd
=
json_decode
(
$msg
,
true
);
if
(
$cmd
[
'command'
]
==
'next'
)
{
if
(
$from
<
$to
)
{
storm_emit
(
array
(
"
$from
"
));
$task_ids
=
read_msg
();
$from
++
;
}
else
{
sleep
(
1
);
}
}
storm_sync
();
}
?>
And for your bolt:
<?
php
function
isPrime
(
$number
)
{
if
(
$number
<
2
)
{
return
false
;
}
if
(
$number
==
2
)
{
return
true
;
}
for
(
$i
=
2
;
$i
<=
$number
-
1
;
$i
++
)
{
if
(
$number
%
$i
==
0
)
{
return
false
;
}
}
return
true
;
}
function
read_msg
()
{
$msg
=
""
;
while
(
true
)
{
$l
=
fgets
(
STDIN
);
$line
=
substr
(
$l
,
0
,
-
1
);
if
(
$line
==
"end"
)
{
break
;
}
$msg
=
"
$msg$line
"
;
}
return
substr
(
$msg
,
0
,
-
1
);
}
function
write_line
(
$line
)
{
echo
(
"
$line
"
);
}
function
storm_emit
(
$tuple
)
{
$msg
=
array
(
"command"
=>
"emit"
,
"tuple"
=>
$tuple
);
storm_send
(
$msg
);
}
function
storm_send
(
$json
)
{
write_line
(
json_encode
(
$json
));
write_line
(
"end"
);
}
function
storm_ack
(
$id
)
{
storm_send
([
"command"
=>
"ack"
,
"id"
=>
"
$id
"
]);
}
function
storm_log
(
$msg
)
{
$msg
=
array
(
"command"
=>
"log"
,
"msg"
=>
"
$msg
"
);
storm_send
(
$msg
);
}
$config
=
json_decode
(
read_msg
(),
true
);
$heartbeatdir
=
$config
[
'pidDir'
];
$pid
=
getmypid
();
fclose
(
fopen
(
"
$heartbeatdir
/
$pid
"
,
"w"
));
storm_send
([
"pid"
=>
$pid
]);
flush
();
while
(
true
)
{
$msg
=
read_msg
();
$tuple
=
json_decode
(
$msg
,
true
,
512
,
JSON_BIGINT_AS_STRING
);
if
(
!
empty
(
$tuple
[
"id"
]))
{
if
(
isPrime
(
$tuple
[
"tuple"
][
0
]))
{
storm_emit
(
array
(
$tuple
[
"tuple"
][
0
]));
}
storm_ack
(
$tuple
[
"id"
]);
}
}
?>
It is important to put all these scripts in a special folder
called multilang/resources
in
your project directory. This folder gets included in the jar file
that is sent to the workers. If you don’t put the scripts in that
folder, Storm won’t be able to run them and will report an
error.