Chapter 2. Getting Started

In this chapter, we’ll create a Storm project and our first Storm topology.

Tip

The following assumes that you have at least version 1.6 of the Java Runtime Environment (JRE) installed. Our recommendation is to use the JRE provided by Oracle, which can be found at http://www.java.com/downloads/.

Operation Modes

Before we start, it’s important to understand Storm operation modes. There are two ways to run Storm.

Local Mode

In Local Mode, Storm topologies run on the local machine in a single JVM. This mode is used for development, testing, and debugging because it’s the easiest way to see all topology components working together. In this mode, we can adjust parameters that enable us to see how our topology runs in different Storm configuration environments. To run topologies in Local Mode, we’ll need to download the Storm development dependencies, which are all the things that we need to develop and test our topologies. We’ll see how soon, when we create our first Storm project.

Tip

Running a topology in Local Mode is similar to running it in a Storm cluster. However it’s important to make sure that all components are thread safe, because when they are deployed in Remote Mode they may run in different JVMs or on different physical machines without direct communication or shared memory.

In all of the examples in this chapter, we’ll work in Local Mode.

Remote Mode

In Remote Mode, we submit our topology to the Storm cluster, which is composed of many processes, usually running on different machines. Remote Mode doesn’t show debugging information, which is why it’s considered Production Mode. However, it is possible to create a Storm cluster on a single development machine, and it’s a good idea to do so before deploying to production, to make sure there won’t be any problems running the topology in a production environment.

You’ll learn more about Remote Mode in Chapter 6, and I’ll show how to install a cluster in Appendix B.

Hello World Storm

For this project, we’ll create a simple topology to count words. We can consider this the “Hello World” of Storm topologies. However, it’s a very powerful topology because it can scale to virtually infinite size, and with some small modifications we could even use it to create a statistical system. For example, we could modify the project to find trending topics on Twitter.

To create the topology, we’ll use a spout that will be responsible for reading words, a first bolt to normalize words, and a second bolt to count words, as we can see in Figure 2-1.

Getting started topology

Figure 2-1. Getting started topology

You can download the source code of the example as a ZIP file at https://github.com/storm-book/examples-ch02-getting_started/zipball/master.

Tip

If you use git (a distributed revision control and source code management), you can run git clone :storm-book/examples-ch02-getting_started.git into the directory where you want to download the source code.

Checking Java Installation

The first step to set up the environment is to check which version of Java you are running. Open a terminal window and run the command java -version. We should see something similar to the following:

java -version


java version "1.6.0_26"

Java(TM) SE Runtime Environment (build 1.6.0_26-b03)

Java HotSpot(TM) Server VM (build 20.1-b02, mixed mode)

If not, check your Java installation. (See http://www.java.com/download/.)

Creating the Project

To start the project, create a folder in which to place the application (as you would for any Java application). This folder will contain the project source code.

Next we need to download the Storm dependencies: a set of jars that we’ll add to the application classpath. You can do so in one of two ways:

  • Download the dependencies, unpack them, and add them to the classpath

  • Use Apache Maven

Tip

Maven is a software project management and comprehension tool. It can be used to manage several aspects of a project development cycle, from dependencies to the release build process. In this book we’ll use it extensively. To check if maven is installed, run the command mvn. If not you can download it from http://maven.apache.org/download.html.

Although is not necessary to be a Maven expert to use Storm, it’s helpful to know the basics of how Maven works. You can find more information on the Apache Maven website (http://maven.apache.org/).

To define the project structure, we need to create a pom.xml (project object model) file, which describes dependencies, packaging, source code, and so on. We’ll use the dependencies and Maven repository set up by nathanmarz (https://github.com/nathanmarz/). These dependencies can be found at https://github.com/nathanmarz/storm/wiki/Maven.

Tip

The Storm Maven dependencies reference all the libraries required to run Storm in Local Mode.

Using these dependencies, we can write a pom.xml file with the basic components necessary to run our topology:

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
         http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>
  <groupId>storm.book</groupId>
  <artifactId>Getting-Started</artifactId>
  <version>0.0.1-SNAPSHOT</version>

  <build>
   <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>2.3.2</version>
        <configuration>
          <source>1.6</source>
          <target>1.6</target>
          <compilerVersion>1.6</compilerVersion>
        </configuration>
      </plugin>
         </plugins>
  </build>

  <repositories>

        <!-- Repository where we can found the storm dependencies  -->
        <repository>
            <id>clojars.org</id>
            <url>http://clojars.org/repo</url>
        </repository>

  </repositories>

  <dependencies>

        <!-- Storm Dependency -->
        <dependency>
          <groupId>storm</groupId>
          <artifactId>storm</artifactId>
          <version>0.6.0</version>
       </dependency>

  </dependencies>

</project>

The first few lines specify the project name and version. Then we add a compiler plug-in, which tells Maven that our code should be compiled with Java 1.6. Next we define the repositories (Maven supports multiple repositories for the same project). clojars is the repository where Storm dependencies are located. Maven will automatically download all subdependencies required by Storm to run in Local Mode.

The application will have the following structure, typical of a Maven Java project:

our-application-folder/
        ├── pom.xml
        └── src
            └── main
                └── java
                |   ├── spouts
                |   └── bolts
                └── resources

The folders under Java will contain our source code and we’ll put our Word files into the resource folder to process.

Tip

mkdir -p creates all required parent directories.

Creating Our First Topology

To build our first topology, we’ll create all classes required to run the word count. It’s possible that some parts of the example may not be clear at this stage, but we’ll explain them further in subsequent chapters.

Spout

The WordReader spout is a class that implements IRichSpout. We’ll see more detail in Chapter 4. WordReader will be responsible for reading the file and providing each line to a bolt.

Tip

A spout emits a list of defined fields. This architecture allows you to have different kinds of bolts reading the same spout stream, which can then define fields for other bolts to consume and so on.

Example 2-1 contains the complete code for the class (we’ll analyze each part of the code following the example).

Example 2-1. src/main/java/spouts/WordReader.java

package spouts;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class WordReader implements IRichSpout {

        private SpoutOutputCollector collector;
        private FileReader fileReader;
        private boolean completed = false;
        private TopologyContext context;
        public boolean isDistributed() {return false;}
        public void ack(Object msgId) {
                System.out.println("OK:"+msgId);
        }
        public void close() {}
        public void fail(Object msgId) {
                System.out.println("FAIL:"+msgId);
        }

        /**
         * The only thing that the methods will do It is emit each
         * file line
         */
        public void nextTuple() {
                /**
                 * The nextuple it is called forever, so if we have been readed the file
                 * we will wait and then return
                 */
                if(completed){
                        try {
                                Thread.sleep(1000);
                        } catch (InterruptedException e) {
                                //Do nothing
                        }
                        return;
                }
                String str;
                //Open the reader
                BufferedReader reader = new BufferedReader(fileReader);
                try{
                        //Read all lines
                        while((str = reader.readLine()) != null){
                                /**
                                 * By each line emmit a new value with the line as a their
                                 */
                                this.collector.emit(new Values(str),str);
                        }
                }catch(Exception e){
                        throw new RuntimeException("Error reading tuple",e);
                }finally{
                        completed = true;
                }
        }

        /**
         * We will create the file and get the collector object
         */
        public void open(Map conf, TopologyContext context,
                        SpoutOutputCollector collector) {
                try {
                        this.context = context;
                        this.fileReader = new FileReader(conf.get("wordsFile").toString());
                } catch (FileNotFoundException e) {
                        throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
                }
                this.collector = collector;
        }

        /**
         * Declare the output field "word"
         */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("line"));
        }
}

The first method called in any spout is public void open(Map conf, TopologyContext context, SpoutOutputCollector collector). The parameters it receives are the TopologyContext, which contains all our topology data; the conf object, which is created in the topology definition; and the SpoutOutputCollector, which enables us to emit the data that will be processed by the bolts. The following code block is the open method implementation:

    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        try {
            this.context = context;
            this.fileReader = new FileReader(conf.get("wordsFile").toString());
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
        }
        this.collector = collector;
    }

In this method we also create the reader, which is responsible for reading the files. Next we need to implement public void nextTuple(), from which we’ll emit values to be processed by the bolts. In our example, the method will read the file and emit a value per line.

public void nextTuple() {
    if(completed){
        try {
                Thread.sleep(1);
        } catch (InterruptedException e) {
                //Do nothing
        }
        return;
    }
    String str;
    BufferedReader reader = new BufferedReader(fileReader);
    try{
        while((str = reader.readLine()) != null){
                this.collector.emit(new Values(str));
        }
    }catch(Exception e){
        throw new RuntimeException("Error reading tuple",e);
    }finally{
        completed = true;
    }
}

Tip

Values is an implementation of ArrayList, where the elements of the list are passed to the constructor.

nextTuple() is called periodically from the same loop as the ack() and fail() methods. It must release control of the thread when there is no work to do so that the other methods have a chance to be called. So the first line of nextTuple checks to see if processing has finished. If so, it should sleep for at least one millisecond to reduce load on the processor before returning. If there is work to be done, each line in the file is read into a value and emitted.

Tip

A tuple is a named list of values, which can be of any type of Java object (as long as the object is serializable). By default, Storm can serialize common types like strings, byte arrays, ArrayList, HashMap, and HashSet.

Bolts

We now have a spout that reads from a file and emits one tuple per line. We need to create two bolts to process these tuples (see Figure 2-1). The bolts implement the backtype.storm.topology.IRichBolt interface.

The most important method in the bolt is void execute(Tuple input), which is called once per tuple received. The bolt will emit several tuples for each tuple received.

Tip

A bolt or spout can emit as many tuples as needed. When the nextTuple or execute methods are called, they may emit 0, 1, or many tuples. You’ll learn more about this in Chapter 5.

The first bolt, WordNormalizer, will be responsible for taking each line and normalizing it. It will split the line into words, convert all words to lowercase, and trim them.

First we need to declare the bolt’s output parameters:

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

Here we declare that the bolt will emit one Field named word.

Next we implement the public void execute(Tuple input) method, where the input tuples are processed:

public void execute(Tuple input) {
    String sentence = input.getString(0);
    String[] words = sentence.split(" ");
    for(String word : words){
        word = word.trim();
        if(!word.isEmpty()){
            word = word.toLowerCase();
            //Emit the word
            collector.emit(new Values(word));
        }
    }
    // Acknowledge the tuple
    collector.ack(input);
}

The first line reads the value from the tuple. The value can be read by position or by name. The value is processed and then emitted using the collector object. After each tuple is processed, the collector’s ack() method is called to indicate that processing has completed successfully. If the tuple could not be processed, the collector’s fail() method should be called.

Example 2-2 contains the complete code for the class.

Example 2-2. src/main/java/bolts/WordNormalizer.java

package bolts;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class WordNormalizer implements IRichBolt {

        private OutputCollector collector;

        public void cleanup() {}

        /**
         * The bolt will receive the line from the
         * words file and process it to Normalize this line
         *
         * The normalize will be put the words in lower case
         * and split the line to get all words in this
         */
    public void execute(Tuple input) {
        String sentence = input.getString(0);
        String[] words = sentence.split(" ");
        for(String word : words){
            word = word.trim();
            if(!word.isEmpty()){
                word = word.toLowerCase();
                //Emit the word
                List a = new ArrayList();
                a.add(input);
                collector.emit(a,new Values(word));
            }
        }
        // Acknowledge the tuple
        collector.ack(input);
    }
        public void prepare(Map stormConf, TopologyContext context,
                        OutputCollector collector) {
                this.collector = collector;
        }

        /**
         * The bolt will only emit the field "word"
         */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("word"));
        }

}

Tip

In this class, we see an example of emitting multiple tuples in a single execute call. If the method receives the sentence This is the Storm book, in a single execute call, it will emit five new tuples.

The next bolt, WordCounter, will be responsible for counting words. When the topology finishes (when the cleanup() method is called), we’ll show the count for each word.

Tip

This is an example of a bolt that emits nothing. In this case, the data is added to a map, but in real life the bolt could store data to a database.

package bolts;

import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class WordCounter implements IRichBolt {

        Integer id;
        String name;
        Map<String, Integer> counters;
        private OutputCollector collector;

        /**
         * At the end of the spout (when the cluster is shutdown
         * We will show the word counters
         */
        @Override
        public void cleanup() {
                System.out.println("-- Word Counter ["+name+"-"+id+"] --");
                for(Map.Entry<String, Integer> entry : counters.entrySet()){
                        System.out.println(entry.getKey()+": "+entry.getValue());
                }
        }

        /**
         * On each word We will count
         */
        @Override
        public void execute(Tuple input) {
                String str = input.getString(0);
                /**
                 * If the word dosn't exist in the map we will create
                 * this, if not We will add 1
                 */
                if(!counters.containsKey(str)){
                        counters.put(str, 1);
                }else{
                        Integer c = counters.get(str) + 1;
                        counters.put(str, c);
                }
                //Set the tuple as Acknowledge
                collector.ack(input);
        }

        /**
         * On create
         */
        @Override
        public void prepare(Map stormConf, TopologyContext context,
                        OutputCollector collector) {
                this.counters = new HashMap<String, Integer>();
                this.collector = collector;
                this.name = context.getThisComponentId();
                this.id = context.getThisTaskId();
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {}
}

The execute method uses a map to collect and count the words. When the topology terminates, the cleanup() method is called and prints out the counter map. (This is just an example, but normally you should use the cleanup() method to close active connections and other resources when the topology shuts down.)

The Main Class

In the main class, you’ll create the topology and a LocalCluster object, which enables you to test and debug the topology locally. In conjunction with the Config object, LocalCluster allows you to try out different cluster configurations. For example, if a global or class variable was accidentally used, you would find the error when testing your topology in configurations with a different number of workers. (You’ll see more on config objects in Chapter 3.)

Tip

All topology nodes should be able to run independently with no shared data between processes (i.e., no global or class variables) because when the topology runs in a real cluster, these processes may run on different machines.

You’ll create the topology using a TopologyBuilder, which tells Storm how the nodes are arranged and how they exchange data.

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader",new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("word-normalizer");

The spout and the bolts are connected using shuffleGroupings. This type of grouping tells Storm to send messages from the source node to target nodes in randomly distributed fashion.

Next, create a Config object containing the topology configuration, which is merged with the cluster configuration at run time and sent to all nodes with the prepare method.

Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(true);

Set the property wordsFile to the name of the file to be read by the spout, and the property debug to true because you’re in development. When debug is true, Storm prints all messages exchanged between nodes, and other debug data useful for understanding how the topology is running.

As explained earlier, you’ll use a LocalCluster to run the topology. In a production environment, the topology runs continuously, but for this example you’ll just run the topology for a few seconds so you can see the results.

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
Thread.sleep(2000);
cluster.shutdown();

Create and run the topology using createTopology and submitTopology, sleep for two seconds (the topology runs in a different thread), and then stop the topology by shutting down the cluster.

See Example 2-3 to put it all together.

Example 2-3. src/main/java/TopologyMain.java

import spouts.WordReader;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import bolts.WordCounter;
import bolts.WordNormalizer;


public class TopologyMain {
        public static void main(String[] args) throws InterruptedException {

        //Topology definition
                TopologyBuilder builder = new TopologyBuilder();
                builder.setSpout("word-reader",new WordReader());
                builder.setBolt("word-normalizer", new WordNormalizer())
                        .shuffleGrouping("word-reader");
                builder.setBolt("word-counter", new WordCounter(),2)
                        .fieldsGrouping("word-normalizer", new Fields("word"));

        //Configuration
                Config conf = new Config();
                conf.put("wordsFile", args[0]);
                conf.setDebug(false);
        //Topology run
                conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("Getting-Started-Toplogie", conf, 
                    builder.createTopology());
                Thread.sleep(1000);
                cluster.shutdown();
        }
}

See It In Action

You’re ready to run your first topology! If you create a file at src/main/resources/words.txt with one word per line, you can run the topology with this command:

mvn exec:java -Dexec.mainClass="TopologyMain" -Dexec.args="src/main/resources/words.txt"

For example, if you use the following words.txt file:

Storm
test
are
great
is
an
Storm
simple
application
but
very
powerful
really
Storm
is
great

In the logs, you should see something like the following:

is: 2
application: 1
but: 1
great: 1
test: 1
simple: 1
Storm: 3
really: 1
are: 1
great: 1
an: 1
powerful: 1
very: 1

In this example, you’re only using a single instance of each node. But what if you have a very large log file? You can easily change the number of nodes in the system to parallelize the work. In this case, you’ll create two instances of WordCounter:

builder.setBolt("word-counter", new WordCounter(),2)
            .shuffleGrouping("word-normalizer");

If you rerun the program, you’ll see:

-- Word Counter [word-counter-2] --
application: 1
is: 1
great: 1
are: 1
powerful: 1
Storm: 3
-- Word Counter [word-counter-3] --
really: 1
is: 1
but: 1
great: 1
test: 1
simple: 1
an: 1
very: 1

Awesome! It’s so easy to change the level of parallelism (in real life, of course, each instance would run on a separate machine). But there seems to be a problem: the words is and great have been counted once in each instance of WordCounter. Why? When you use shuffleGrouping, you are telling Storm to send each message to an instance of your bolt in randomly distributed fashion. In this example, it’d be ideal to always send the same word to the same WordCounter. To do so, you can change shuffleGrouping("word-normalizer") to fieldsGrouping("word-normalizer",new Fields("word")). Try it out and rerun the program to confirm the results. You’ll see more about groupings and message flow in later chapters.

Conclusion

We’ve discussed the difference between Storm’s Local and Remote operation modes, and the power and ease of development with Storm. You also learned more about some basic Storm concepts, which we’ll explain in depth in the following chapters.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset