In this chapter, we’ll create a Storm project and our first Storm topology.
The following assumes that you have at least version 1.6 of the Java Runtime Environment (JRE) installed. Our recommendation is to use the JRE provided by Oracle, which can be found at http://www.java.com/downloads/.
Before we start, it’s important to understand Storm operation modes. There are two ways to run Storm.
In Local Mode, Storm topologies run on the local machine in a single JVM. This mode is used for development, testing, and debugging because it’s the easiest way to see all topology components working together. In this mode, we can adjust parameters that enable us to see how our topology runs in different Storm configuration environments. To run topologies in Local Mode, we’ll need to download the Storm development dependencies, which are all the things that we need to develop and test our topologies. We’ll see how soon, when we create our first Storm project.
Running a topology in Local Mode is similar to running it in a Storm cluster. However it’s important to make sure that all components are thread safe, because when they are deployed in Remote Mode they may run in different JVMs or on different physical machines without direct communication or shared memory.
In all of the examples in this chapter, we’ll work in Local Mode.
In Remote Mode, we submit our topology to the Storm cluster, which is composed of many processes, usually running on different machines. Remote Mode doesn’t show debugging information, which is why it’s considered Production Mode. However, it is possible to create a Storm cluster on a single development machine, and it’s a good idea to do so before deploying to production, to make sure there won’t be any problems running the topology in a production environment.
You’ll learn more about Remote Mode in Chapter 6, and I’ll show how to install a cluster in Appendix B.
For this project, we’ll create a simple topology to count words. We can consider this the “Hello World” of Storm topologies. However, it’s a very powerful topology because it can scale to virtually infinite size, and with some small modifications we could even use it to create a statistical system. For example, we could modify the project to find trending topics on Twitter.
To create the topology, we’ll use a spout that will be responsible for reading words, a first bolt to normalize words, and a second bolt to count words, as we can see in Figure 2-1.
You can download the source code of the example as a ZIP file at https://github.com/storm-book/examples-ch02-getting_started/zipball/master.
If you use git (a
distributed revision control and source code management), you can run
git clone
[email protected]:storm-book/examples-ch02-getting_started.git
into the directory where you want to download the source code.
The first step to set up the environment is to check which version
of Java you are running. Open a terminal window and run the command
java -version
. We should see
something similar to the following:
java -version java version "1.6.0_26" Java(TM) SE Runtime Environment (build 1.6.0_26-b03) Java HotSpot(TM) Server VM (build 20.1-b02, mixed mode)
If not, check your Java installation. (See http://www.java.com/download/.)
To start the project, create a folder in which to place the application (as you would for any Java application). This folder will contain the project source code.
Next we need to download the Storm dependencies: a set of jars that we’ll add to the application classpath. You can do so in one of two ways:
Download the dependencies, unpack them, and add them to the classpath
Use Apache Maven
Maven is a software project management and comprehension tool.
It can be used to manage several aspects of a project development
cycle, from dependencies to the release build process. In this book
we’ll use it extensively. To check if maven is installed, run the
command mvn
. If not you can
download it from http://maven.apache.org/download.html.
Although is not necessary to be a Maven expert to use Storm, it’s helpful to know the basics of how Maven works. You can find more information on the Apache Maven website (http://maven.apache.org/).
To define the project structure, we need to create a pom.xml (project object model) file, which describes dependencies, packaging, source code, and so on. We’ll use the dependencies and Maven repository set up by nathanmarz (https://github.com/nathanmarz/). These dependencies can be found at https://github.com/nathanmarz/storm/wiki/Maven.
The Storm Maven dependencies reference all the libraries required to run Storm in Local Mode.
Using these dependencies, we can write a pom.xml file with the basic components necessary to run our topology:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>storm.book</groupId> <artifactId>Getting-Started</artifactId> <version>0.0.1-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.6</source> <target>1.6</target> <compilerVersion>1.6</compilerVersion> </configuration> </plugin> </plugins> </build> <repositories> <!-- Repository where we can found the storm dependencies --> <repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository> </repositories> <dependencies> <!-- Storm Dependency --> <dependency> <groupId>storm</groupId> <artifactId>storm</artifactId> <version>0.6.0</version> </dependency> </dependencies> </project>
The first few lines specify the project name and version. Then we add a compiler plug-in, which tells Maven that our code should be compiled with Java 1.6. Next we define the repositories (Maven supports multiple repositories for the same project). clojars is the repository where Storm dependencies are located. Maven will automatically download all subdependencies required by Storm to run in Local Mode.
The application will have the following structure, typical of a Maven Java project:
our-application-folder/ ├── pom.xml └── src └── main └── java | ├── spouts | └── bolts └── resources
The folders under Java will contain our source code and we’ll put our Word files into the resource folder to process.
mkdir -p
creates all required parent
directories.
To build our first topology, we’ll create all classes required to run the word count. It’s possible that some parts of the example may not be clear at this stage, but we’ll explain them further in subsequent chapters.
The WordReader spout is a class that implements IRichSpout. We’ll see more detail in Chapter 4. WordReader will be responsible for reading the file and providing each line to a bolt.
A spout emits a list of defined fields. This architecture allows you to have different kinds of bolts reading the same spout stream, which can then define fields for other bolts to consume and so on.
Example 2-1 contains the complete code for the class (we’ll analyze each part of the code following the example).
Example 2-1. src/main/java/spouts/WordReader.java
package
spouts
;
import
java.io.BufferedReader
;
import
java.io.FileNotFoundException
;
import
java.io.FileReader
;
import
java.util.Map
;
import
backtype.storm.spout.SpoutOutputCollector
;
import
backtype.storm.task.TopologyContext
;
import
backtype.storm.topology.IRichSpout
;
import
backtype.storm.topology.OutputFieldsDeclarer
;
import
backtype.storm.tuple.Fields
;
import
backtype.storm.tuple.Values
;
public
class
WordReader
implements
IRichSpout
{
private
SpoutOutputCollector
collector
;
private
FileReader
fileReader
;
private
boolean
completed
=
false
;
private
TopologyContext
context
;
public
boolean
isDistributed
()
{
return
false
;}
public
void
ack
(
Object
msgId
)
{
System
.
out
.
println
(
"OK:"
+
msgId
);
}
public
void
close
()
{}
public
void
fail
(
Object
msgId
)
{
System
.
out
.
println
(
"FAIL:"
+
msgId
);
}
/**
* The only thing that the methods will do It is emit each
* file line
*/
public
void
nextTuple
()
{
/**
* The nextuple it is called forever, so if we have been readed the file
* we will wait and then return
*/
if
(
completed
){
try
{
Thread
.
sleep
(
1000
);
}
catch
(
InterruptedException
e
)
{
//Do nothing
}
return
;
}
String
str
;
//Open the reader
BufferedReader
reader
=
new
BufferedReader
(
fileReader
);
try
{
//Read all lines
while
((
str
=
reader
.
readLine
())
!=
null
){
/**
* By each line emmit a new value with the line as a their
*/
this
.
collector
.
emit
(
new
Values
(
str
),
str
);
}
}
catch
(
Exception
e
){
throw
new
RuntimeException
(
"Error reading tuple"
,
e
);
}
finally
{
completed
=
true
;
}
}
/**
* We will create the file and get the collector object
*/
public
void
open
(
Map
conf
,
TopologyContext
context
,
SpoutOutputCollector
collector
)
{
try
{
this
.
context
=
context
;
this
.
fileReader
=
new
FileReader
(
conf
.
get
(
"wordsFile"
).
toString
());
}
catch
(
FileNotFoundException
e
)
{
throw
new
RuntimeException
(
"Error reading file ["
+
conf
.
get
(
"wordFile"
)+
"]"
);
}
this
.
collector
=
collector
;
}
/**
* Declare the output field "word"
*/
public
void
declareOutputFields
(
OutputFieldsDeclarer
declarer
)
{
declarer
.
declare
(
new
Fields
(
"line"
));
}
}
The first method called in any spout is public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector)
. The parameters it receives
are the TopologyContext, which contains all our topology data; the conf
object, which is created in the topology definition; and the
SpoutOutputCollector, which enables us to emit the data that will be
processed by the bolts. The following code block is the open method
implementation:
public
void
open
(
Map
conf
,
TopologyContext
context
,
SpoutOutputCollector
collector
)
{
try
{
this
.
context
=
context
;
this
.
fileReader
=
new
FileReader
(
conf
.
get
(
"wordsFile"
).
toString
());
}
catch
(
FileNotFoundException
e
)
{
throw
new
RuntimeException
(
"Error reading file ["
+
conf
.
get
(
"wordFile"
)+
"]"
);
}
this
.
collector
=
collector
;
}
In this method we also create the reader, which is responsible for
reading the files. Next we need to implement public void nextTuple()
, from which we’ll emit
values to be processed by the bolts. In our example, the method will
read the file and emit a value per line.
public
void
nextTuple
()
{
if
(
completed
){
try
{
Thread
.
sleep
(
1
);
}
catch
(
InterruptedException
e
)
{
//Do nothing
}
return
;
}
String
str
;
BufferedReader
reader
=
new
BufferedReader
(
fileReader
);
try
{
while
((
str
=
reader
.
readLine
())
!=
null
){
this
.
collector
.
emit
(
new
Values
(
str
));
}
}
catch
(
Exception
e
){
throw
new
RuntimeException
(
"Error reading tuple"
,
e
);
}
finally
{
completed
=
true
;
}
}
Values is an implementation of ArrayList, where the elements of the list are passed to the constructor.
nextTuple()
is called
periodically from the same loop as the ack()
and fail()
methods. It must release control of the
thread when there is no work to do so that the other methods have a
chance to be called. So the first line of nextTuple checks to see if
processing has finished. If so, it should sleep for at least one
millisecond to reduce load on the processor before returning. If there
is work to be done, each line in the file is read into a value and
emitted.
A tuple is a named list of values, which can be of any type of Java object (as long as the object is serializable). By default, Storm can serialize common types like strings, byte arrays, ArrayList, HashMap, and HashSet.
We now have a spout that reads from a file and emits one
tuple per line. We need to create two bolts to
process these tuples (see Figure 2-1). The bolts
implement the backtype.storm.topology.IRichBolt
interface.
The most important method in the bolt is void execute(Tuple input)
, which is called
once per tuple received. The bolt will emit several tuples for each
tuple received.
A bolt or spout can emit as many tuples as needed. When the
nextTuple
or execute
methods are called, they may emit 0,
1, or many tuples. You’ll learn more about this in Chapter 5.
The first bolt, WordNormalizer
,
will be responsible for taking each line and
normalizing it. It will split the line into words,
convert all words to lowercase, and trim them.
First we need to declare the bolt’s output parameters:
public
void
declareOutputFields
(
OutputFieldsDeclarer
declarer
)
{
declarer
.
declare
(
new
Fields
(
"word"
));
}
Here we declare that the bolt will emit one Field named word.
Next we implement the public void
execute(Tuple input)
method, where the input tuples are
processed:
public
void
execute
(
Tuple
input
)
{
String
sentence
=
input
.
getString
(
0
);
String
[]
words
=
sentence
.
split
(
" "
);
for
(
String
word
:
words
){
word
=
word
.
trim
();
if
(!
word
.
isEmpty
()){
word
=
word
.
toLowerCase
();
//Emit the word
collector
.
emit
(
new
Values
(
word
));
}
}
// Acknowledge the tuple
collector
.
ack
(
input
);
}
The first line reads the value from the tuple. The value can be
read by position or by name. The value is processed and then emitted
using the collector object. After each tuple is processed, the
collector’s ack()
method is called to
indicate that processing has completed successfully. If the tuple could
not be processed, the collector’s fail()
method should be called.
Example 2-2 contains the complete code for the class.
Example 2-2. src/main/java/bolts/WordNormalizer.java
package
bolts
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Map
;
import
backtype.storm.task.OutputCollector
;
import
backtype.storm.task.TopologyContext
;
import
backtype.storm.topology.IRichBolt
;
import
backtype.storm.topology.OutputFieldsDeclarer
;
import
backtype.storm.tuple.Fields
;
import
backtype.storm.tuple.Tuple
;
import
backtype.storm.tuple.Values
;
public
class
WordNormalizer
implements
IRichBolt
{
private
OutputCollector
collector
;
public
void
cleanup
()
{}
/**
* The bolt will receive the line from the
* words file and process it to Normalize this line
*
* The normalize will be put the words in lower case
* and split the line to get all words in this
*/
public
void
execute
(
Tuple
input
)
{
String
sentence
=
input
.
getString
(
0
);
String
[]
words
=
sentence
.
split
(
" "
);
for
(
String
word
:
words
){
word
=
word
.
trim
();
if
(!
word
.
isEmpty
()){
word
=
word
.
toLowerCase
();
//Emit the word
List
a
=
new
ArrayList
();
a
.
add
(
input
);
collector
.
emit
(
a
,
new
Values
(
word
));
}
}
// Acknowledge the tuple
collector
.
ack
(
input
);
}
public
void
prepare
(
Map
stormConf
,
TopologyContext
context
,
OutputCollector
collector
)
{
this
.
collector
=
collector
;
}
/**
* The bolt will only emit the field "word"
*/
public
void
declareOutputFields
(
OutputFieldsDeclarer
declarer
)
{
declarer
.
declare
(
new
Fields
(
"word"
));
}
}
In this class, we see an example of emitting multiple tuples in
a single execute
call. If the
method receives the sentence This is the Storm
book, in a single execute
call, it will emit five new
tuples.
The next bolt, WordCounter
,
will be responsible for counting words. When the topology finishes (when
the cleanup()
method is called),
we’ll show the count for each word.
This is an example of a bolt that emits nothing. In this case, the data is added to a map, but in real life the bolt could store data to a database.
package
bolts
;
import
java.util.HashMap
;
import
java.util.Map
;
import
backtype.storm.task.OutputCollector
;
import
backtype.storm.task.TopologyContext
;
import
backtype.storm.topology.IRichBolt
;
import
backtype.storm.topology.OutputFieldsDeclarer
;
import
backtype.storm.tuple.Tuple
;
public
class
WordCounter
implements
IRichBolt
{
Integer
id
;
String
name
;
Map
<
String
,
Integer
>
counters
;
private
OutputCollector
collector
;
/**
* At the end of the spout (when the cluster is shutdown
* We will show the word counters
*/
@Override
public
void
cleanup
()
{
System
.
out
.
println
(
"-- Word Counter ["
+
name
+
"-"
+
id
+
"] --"
);
for
(
Map
.
Entry
<
String
,
Integer
>
entry
:
counters
.
entrySet
()){
System
.
out
.
println
(
entry
.
getKey
()+
": "
+
entry
.
getValue
());
}
}
/**
* On each word We will count
*/
@Override
public
void
execute
(
Tuple
input
)
{
String
str
=
input
.
getString
(
0
);
/**
* If the word dosn't exist in the map we will create
* this, if not We will add 1
*/
if
(!
counters
.
containsKey
(
str
)){
counters
.
put
(
str
,
1
);
}
else
{
Integer
c
=
counters
.
get
(
str
)
+
1
;
counters
.
put
(
str
,
c
);
}
//Set the tuple as Acknowledge
collector
.
ack
(
input
);
}
/**
* On create
*/
@Override
public
void
prepare
(
Map
stormConf
,
TopologyContext
context
,
OutputCollector
collector
)
{
this
.
counters
=
new
HashMap
<
String
,
Integer
>();
this
.
collector
=
collector
;
this
.
name
=
context
.
getThisComponentId
();
this
.
id
=
context
.
getThisTaskId
();
}
@Override
public
void
declareOutputFields
(
OutputFieldsDeclarer
declarer
)
{}
}
The execute
method uses a
map to collect and count the words. When the
topology terminates, the cleanup()
method is called and prints out the counter map. (This is just an
example, but normally you should use the cleanup()
method to close active connections
and other resources when the topology shuts down.)
In the main class, you’ll create the topology and a LocalCluster
object, which enables you to test
and debug the topology locally. In conjunction with the Config
object, LocalCluster
allows you to try out different
cluster configurations. For example, if a global or class variable was
accidentally used, you would find the error when testing your topology
in configurations with a different number of workers. (You’ll see more
on config objects in Chapter 3.)
All topology nodes should be able to run independently with no shared data between processes (i.e., no global or class variables) because when the topology runs in a real cluster, these processes may run on different machines.
You’ll create the topology using a TopologyBuilder
, which tells Storm how the
nodes are arranged and how they exchange data.
TopologyBuilder
builder
=
new
TopologyBuilder
();
builder
.
setSpout
(
"word-reader"
,
new
WordReader
());
builder
.
setBolt
(
"word-normalizer"
,
new
WordNormalizer
()).
shuffleGrouping
(
"word-reader"
);
builder
.
setBolt
(
"word-counter"
,
new
WordCounter
()).
shuffleGrouping
(
"word-normalizer"
);
The spout and the bolts are connected using
shuffleGrouping
s. This type of grouping tells Storm
to send messages from the source node to target nodes in randomly
distributed fashion.
Next, create a Config
object containing the topology
configuration, which is merged with the cluster configuration at run
time and sent to all nodes with the prepare
method.
Config
conf
=
new
Config
();
conf
.
put
(
"wordsFile"
,
args
[
0
]);
conf
.
setDebug
(
true
);
Set the property wordsFile
to
the name of the file to be read by the spout, and the property debug
to true
because
you’re in development. When debug is true
, Storm prints all messages exchanged
between nodes, and other debug data useful for understanding how the
topology is running.
As explained earlier, you’ll use a LocalCluster
to run the topology. In a
production environment, the topology runs continuously, but for this
example you’ll just run the topology for a few seconds so you can see
the results.
LocalCluster
cluster
=
new
LocalCluster
();
cluster
.
submitTopology
(
"Getting-Started-Toplogie"
,
conf
,
builder
.
createTopology
());
Thread
.
sleep
(
2000
);
cluster
.
shutdown
();
Create and run the topology using createTopology
and submitTopology
, sleep for two seconds (the
topology runs in a different thread), and then stop the topology by
shutting down the cluster.
See Example 2-3 to put it all together.
Example 2-3. src/main/java/TopologyMain.java
import
spouts.WordReader
;
import
backtype.storm.Config
;
import
backtype.storm.LocalCluster
;
import
backtype.storm.topology.TopologyBuilder
;
import
backtype.storm.tuple.Fields
;
import
bolts.WordCounter
;
import
bolts.WordNormalizer
;
public
class
TopologyMain
{
public
static
void
main
(
String
[]
args
)
throws
InterruptedException
{
//Topology definition
TopologyBuilder
builder
=
new
TopologyBuilder
();
builder
.
setSpout
(
"word-reader"
,
new
WordReader
());
builder
.
setBolt
(
"word-normalizer"
,
new
WordNormalizer
())
.
shuffleGrouping
(
"word-reader"
);
builder
.
setBolt
(
"word-counter"
,
new
WordCounter
(),
2
)
.
fieldsGrouping
(
"word-normalizer"
,
new
Fields
(
"word"
));
//Configuration
Config
conf
=
new
Config
();
conf
.
put
(
"wordsFile"
,
args
[
0
]);
conf
.
setDebug
(
false
);
//Topology run
conf
.
put
(
Config
.
TOPOLOGY_MAX_SPOUT_PENDING
,
1
);
LocalCluster
cluster
=
new
LocalCluster
();
cluster
.
submitTopology
(
"Getting-Started-Toplogie"
,
conf
,
builder
.
createTopology
());
Thread
.
sleep
(
1000
);
cluster
.
shutdown
();
}
}
You’re ready to run your first topology! If you create a file at
src/main/resources/words.txt
with one
word per line, you can run the topology with this command:
mvn exec:java -Dexec.mainClass="TopologyMain" -Dexec.args="src/main/resources/words.txt"
For example, if you use the following words.txt file:
Storm test are great is an Storm simple application but very powerful really Storm is great
In the logs, you should see something like the following:
is: 2 application: 1 but: 1 great: 1 test: 1 simple: 1 Storm: 3 really: 1 are: 1 great: 1 an: 1 powerful: 1 very: 1
In this example, you’re only using a single instance of each node.
But what if you have a very large log file? You can easily change the
number of nodes in the system to parallelize the work. In this case,
you’ll create two instances of WordCounter
:
builder
.
setBolt
(
"word-counter"
,
new
WordCounter
(),
2
)
.
shuffleGrouping
(
"word-normalizer"
);
If you rerun the program, you’ll see:
-- Word Counter [word-counter-2] -- application: 1 is: 1 great: 1 are: 1 powerful: 1 Storm: 3 -- Word Counter [word-counter-3] -- really: 1 is: 1 but: 1 great: 1 test: 1 simple: 1 an: 1 very: 1
Awesome! It’s so easy to change the level of parallelism (in real
life, of course, each instance would run on a separate machine). But
there seems to be a problem: the words is and
great have been counted once in each instance of
WordCounter
. Why? When you use
shuffleGrouping
, you are telling
Storm to send each message to an instance of your bolt in randomly
distributed fashion. In this example, it’d be ideal to always send the
same word to the same WordCounter
. To
do so, you can change shuffleGrouping("word-normalizer")
to fieldsGrouping("word-normalizer",new
Fields("word"))
. Try it out and rerun the program to confirm
the results. You’ll see more about groupings and message flow in later
chapters.