Storm persistence

Now that we know Storm and its internals very well, let's wire in persistence to Storm. Well we have done all the computation and code, so now it's very important to store the computed results or intermediate references into a database or some persistence store. You have the choice of writing your own JDBC bolts or you can actually use the implementation provided for using Storm persistence.

Let's start with writing our own JDBC persistence first. Once we have touched upon the nitty gritty, then we can look at and appreciate what Storm provides for. Let's say we are setting up software system at toll gates that could monitor the emission rates of vehicles and track the details of the vehicles that are running at emissions beyond the prescribed limit.

Storm persistence

Here, all the vehicle details and their emissions are captured in the file that is being read by the file reader spout. The spout then reads the record and feeds it into the topology where it is consumed by the parser bolt, which converts the record into POJO and hands it off to the database bolt. This bolt checks the emission thresholds and if it's beyond the prescribed limit, the record is persisted into the database.

The following snippets show what the input to the topology in the file would look like:

#Vehicle Registration number, emission rate, state
UP14 123, 60, UP
DL12 2653, 70, DELHI
DL2C 2354, 40, DELHI

Here's the snippet that handles the database part and persistence by setting up the connection to the database. Here, we used MySQL database for simplicity, and Storm works well with SQL stores like Oracle or SQL server as well as NoSQL options such as Cassandra. The code is as follows:

try
{
connection driverManager.getConnection( 
"jdbc:mysql://"+databaseIP+":"+databasePort+"/"+databaseName, userName, pwd);
connection.prepareStatement("DROP TABLE IF EXISTS "+tableName).execute();

StringBuilder createQuery = new StringBuilder(
"CREATE TABLE IF NOT EXISTS "+tableName+"(");
for(Field fields : tupleInfo.getFieldList())
{
if(fields.getColumnType().equalsIgnoreCase("String"))
createQuery.append(fields.getColumnName()+" VARCHAR(500),");
   
   

connection.prepareStatement(createQuery.toString()).execute();

// Insert Query
StringBuilder insertQuery = new StringBuilder("INSERT INTO "+tableName+"(");
String tempCreateQuery = new String();
for(Field fields : tupleInfo.getFieldList())
{
insertQuery.append(fields.getColumnName()+",");
}
…

prepStatement = connection.prepareStatement(insertQuery.toString());
}

In the preceding snippet, we are actually creating a prepared statement using a query builder and adding fields to the template.

The next step actually captures the snippet where we format and execute the insert query by filling the template with actual values from the tuple, forming a batch of events, and writing them to the database once the batch size is attained. The code is as follows:

if(tuple!=null)
{
List<Object> inputTupleList = (List<Object>) tuple.getValues();
int dbIndex=0;
for(int i=0;i<tupleInfo.getFieldList().size();i++)
{
Field field = tupleInfo.getFieldList().get(i);
…
if(field.getColumnType().equalsIgnoreCase("String"))
prepStatement.setString(dbIndex, inputTupleList.get(i).toString());
else if(field.getColumnType().equalsIgnoreCase("int"))
prepStatement.setInt(dbIndex,
Integer.parseInt(inputTupleList.get(i).toString()));
…
…
else if(field.getColumnType().equalsIgnoreCase("boolean"))
prepStatement.setBoolean(dbIndex, 
Boolean.parseBoolean(inputTupleList.get(i).toString()));
…
…

Date now = new Date();          
try
{
prepStatement.setTimestamp(dbIndex+1, new java.sql.Timestamp(now.getTime()));
prepStatement.addBatch();
counter.incrementAndGet();
if (counter.get()== batchSize) 
executeBatch();
} 
…


}

public void executeBatch() throws SQLException
{
batchExecuted=true;
prepStatement.executeBatch();
counter = new AtomicInteger(0);
}

The next step is to wire in all the spouts and bolts together so that we can actually see the topology in action:

MyFileSpout myFileSpout = new MyFileSpout ();

ParserBolt parserBolt = new ParserBolt ();
DBWriterBolt dbWriterBolt = new DBWriterBolt();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", myFileSpout, 1);
builder.setBolt("parserBolt ", parserBolt,1).shuffleGrouping("spout");
builder.setBolt("dbWriterBolt",dbWriterBolt,1).shuffleGrouping("thresholdBolt");

The preceding snippets are actually included with the intention to serve as a do-it-yourself template so that readers can try executing this topology themselves.

Storm's JDBC persistence framework

In the previous section, we have wired in the Storm persistence ourselves. We will now discuss Storm's persistence provider framework so that we, as developers, are saved a lot of work. Using this template-based framework, we can quickly wire persistence into our Storm topology.

Some of the key components of this framework are as follows:

  • The ConnectionProvider interface: This facilitates the usage of a user-driven connection pooling. By default, the Storm persistence framework supports an implementation of HikariCP.
  • The JdbcMapper interface: This is the main component that basically maps the tuple to columns of the table. SimpleJDBCMapper is an out-of-the-box, simple implementation of the same. The following snippet from the Storm sample emphasis more on the same:
    Map hikariConfigMap = Maps.newHashMap();
    hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
    hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
    hikariConfigMap.put("dataSource.user","root");
    hikariConfigMap.put("dataSource.password","password");
    ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
    String tableName = "user_details";
    JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);

    Then, we have JDBCLookupbolt to read from the database and JDBCInsertBolt for inserting data into the database. Here's how all is wired in together with the help of some code snippet:

    JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(
    connectionProvider, SELECT_QUERY, this.jdbcLookupMapper);
    // must specify column schema when providing custom query.
    List<Column> schemaColumns = Lists.newArrayList(new Column(
    "create_date", Types.DATE), new Column("dept_name",
    Types.VARCHAR), new Column("user_id", Types.INTEGER),
    new Column("user_name", Types.VARCHAR));
    JdbcMapper mapper = new SimpleJdbcMapper(schemaColumns);
    JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(
    connectionProvider, mapper)
    .withInsertQuery("insert into user (create_date, dept_name, user_id, user_name) values (?,?,?,?)");
    // userSpout ==> jdbcBolt
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout(USER_SPOUT, this.userSpout, 1);
    builder.setBolt(LOOKUP_BOLT, departmentLookupBolt, 1).shuffleGrouping(
    USER_SPOUT);
    builder.setBolt(PERSISTANCE_BOLT, userPersistanceBolt, 1)
    .shuffleGrouping(LOOKUP_BOLT);

    Here, we have provided for a code snippet that simply uses the bolts provided by the Storm persistence framework and writes them into the database. The spout emits the tuple to insert the bolt where the tuple values are mapped into fields of the insert query, then the lookup bolt fetches the department values and the persistence bolt finally inserts the data into the table.

    Tip

    Downloading the example code

    You can download the example code files for this book from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.

    You can download the code files by following these steps:

    • Log in or register to our website using your e-mail address and password.
    • Hover the mouse pointer on the SUPPORT tab at the top.
    • Click on Code Downloads & Errata.
    • Enter the name of the book in the Search box.
    • Select the book for which you're looking to download the code files.
    • Choose from the drop-down menu where you purchased this book from.
    • Click on Code Download.

      Once the file is downloaded, please make sure that you unzip or extract the folder using the latest version of:

      • WinRAR / 7-Zip for Windows
      • Zipeg / iZip / UnRarX for Mac
      • 7-Zip / PeaZip for Linux
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset