Chapter 6. A Real-Life Example

The idea of this chapter is to illustrate a typical web analytics solution, a problem that is often solved using a Hadoop batch job. Unlike a Hadoop implementation, a Storm-based solution will show results that are refreshed in real time.

Our example has three main components (see Figure 6-1):

  • A Node.js web application, to test the system

  • A Redis server, to persist the data

  • A Storm topology, for real-time distributed data processing

Architecture overview

Figure 6-1. Architecture overview

Tip

If you want to go through this chapter while playing with the example, you should first read Appendix C.

The Node.js Web Application

We have mocked up a simple e-commerce website with three pages: a home page, a product page, and a product statistics page. This application is implemented using the Express Framework and Socket.io Framework to push updates to the browser. The idea of the application is to let you play with the cluster and see the results, but it’s not the focus of this book, so we won’t go into any more detail than a description of the pages it has.

The Home Page

This page provides links to all the products available on the platform to ease navigation between them. It lists all the items and reads them from the Redis Server. The URL for this page is http://localhost:3000/. (See Figure 6-2.)

Home page

Figure 6-2. Home page

The Product Page

The Product Page shows information related to a specific product, such as price, title, and category. The URL for this page is http://localhost:3000/product/:id. (See Figure 6-3.)

Product page

Figure 6-3. Product page

The Product Stats Page

This page shows the information computed by the Storm cluster, which is collected as users navigate the website. It can be summarized as follows: users that viewed this Product looked at Products in those Categories n times. The URL for this page is http://localhost:3000/product/:id/stats. (See Figure 6-4.)

Product stats view

Figure 6-4. Product stats view

Starting the Node.js Web Application

After starting the Redis server, start the web application by running the following command on the project’s path:

node webapp/app.js

The web application will automatically populate Redis with some sample products for you to play with.

The Storm Topology

The goal of the Storm topology in this system is to update the product stats in real time while users navigate the website. The Product Stats Page is shows a list of categories with an associated counter, showing the number of users that visited other products in the same category. This helps sellers to understand their customers’ needs. The topology receives a navigation log and updates the product stats as shown in the Figure 6-5.

Storm topology inputs and outputs

Figure 6-5. Storm topology inputs and outputs

Our Storm Topology has five components: one spout to feed it and four bolts to get the job done.

UsersNavigationSpout

Reads from the users navigation queue and feeds the topology

GetCategoryBolt

Reads the product information from the Redis Server and adds its category to the stream

UserHistoryBolt

Reads the products previously navigated by the user and emits Product:Category pairs to update the counters in the next step

ProductCategoriesCounterBolt

Keeps track of the number of times that users viewed a product of a specific category

NewsNotifierBolt

Tells the web application to update the user interface immediately

Here’s how the topology is created (see Figure 6-6):

package storm.analytics;
...
public class TopologyStarter {
    public static void main(String[] args) {
        Logger.getRootLogger().removeAllAppenders();

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("read-feed", new UsersNavigationSpout(), 3);

        builder.setBolt("get-categ", new GetCategoryBolt(), 3)
                        .shuffleGrouping("read-feed");

        builder.setBolt("user-history", new UserHistoryBolt(), 5)
               .fieldsGrouping("get-categ", new Fields("user"));

        builder.setBolt("product-categ-counter", new ProductCategoriesCounterBolt(), 5)
                        .fieldsGrouping("user-history", new Fields("product"));

        builder.setBolt("news-notifier", new NewsNotifierBolt(), 5)
                        .shuffleGrouping("product-categ-counter");

        Config conf = new Config();
        conf.setDebug(true);

        conf.put("redis-host", REDIS_HOST);
        conf.put("redis-port", REDIS_PORT);
        conf.put("webserver", WEBSERVER);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("analytics", conf, builder.createTopology());
    }
}
Storm topology

Figure 6-6. Storm topology

UsersNavigationSpout

The UsersNavigationSpout is in charge of feeding the topology with navigation entries. Each navigation entry is a reference to a product page viewed by one user. They are stored in the Redis Server by the web application. We’ll go into more detail on that in a moment.

To read entries from the Redis server, you’ll be using https://github.com/xetorthio/jedis, a blazingly small and simple Redis client for Java.

Note

Only the relevant part of the code is shown in the following code block.

package storm.analytics;

public class UsersNavigationSpout extends BaseRichSpout {
    Jedis jedis;

    ...

    @Override
    public void nextTuple() {
        String content = jedis.rpop("navigation");
        if(content==null || "nil".equals(content)) {
            try { Thread.sleep(300); } catch (InterruptedException e) {}
        } else {
            JSONObject obj=(JSONObject)JSONValue.parse(content);
            String user = obj.get("user").toString();
            String product = obj.get("product").toString();
            String type = obj.get("type").toString();
            HashMap<String, String> map = new HashMap<String, String>();
            map.put("product", product);
            NavigationEntry entry = new NavigationEntry(user, type, map);
            collector.emit(new Values(user, entry));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("user", "otherdata"));
    }
}

First the spout calls jedis.rpop("navigation") to remove and return the right-most element in the “navigation” list on the Redis server. If the list is already empty, sleep for 0.3 seconds so as not to block the server with a busy wait loop. If an entry is found, parse the content (the content is JSON) and map it to a NavigationEntry object, which is just a POJO containing the entry information:

  • The user that was navigating.

  • The type of page that the user browsed.

  • Additional page information that depends on the type. The “PRODUCT” page type has an entry for the product ID being browsed.

The spout emits a tuple containing this information by calling collector.emit(new Values(user, entry)). The content of this tuple is the input to the next bolt in the topology: The GetCategoryBolt.

GetCategoryBolt

This is a very simple bolt. Its sole responsibility is to deserialize the content of the tuple emitted by the previous spout. If the entry is about a product page, then it loads the product information from the Redis server by using the ProductsReader helper class. Then, for each tuple in the input, it emits a new tuple with further product specific information:

  • The user

  • The product

  • The category of the product

package storm.analytics;

public class GetCategoryBolt extends BaseBasicBolt {
    private ProductsReader reader;

    ...
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        NavigationEntry entry = (NavigationEntry)input.getValue(1);
        if("PRODUCT".equals(entry.getPageType())){
            try {
                String product = (String)entry.getOtherData().get("product");

                // Call the items API to get item information
                Product itm = reader.readItem(product);
                if(itm ==null)
                    return ;

                String categ = itm.getCategory();

                collector.emit(new Values(entry.getUserId(), product, categ));

            } catch (Exception ex) {
                System.err.println("Error processing PRODUCT tuple"+ ex);
                ex.printStackTrace();
            }
        }
    }
    ...
}

As mentioned earlier, use the ProductsReader helper class to read the product specific information.

package storm.analytics.utilities;
...
public class ProductsReader {
    ...
    public Product readItem(String id) throws Exception{
        String content= jedis.get(id);
        if(content == null  || ("nil".equals(content)))
            return null;
        Object obj=JSONValue.parse(content);
        JSONObject product=(JSONObject)obj;
        Product i= new Product((Long)product.get("id"),
                               (String)product.get("title"),
                               (Long)product.get("price"),
                               (String)product.get("category"));
        return i;
    }
    ...
}

UserHistoryBolt

The UserHistoryBolt is the core of the application. It’s responsible for keeping track of the products navigated by each user and determining the result pairs that should be incremented.

You’ll use the Redis server to store product history by user, and you’ll also keep a local copy for performance reasons. You hid the data access details in the methods getUserNavigationHistory(user) and addProductToHistory(user,prodKey) for read and write access, respectively.

package storm.analytics;
...
public class UserHistoryBolt extends BaseRichBolt{
    @Override
    public void execute(Tuple input) {
        String user = input.getString(0);
        String prod1 = input.getString(1);
        String cat1 = input.getString(2);

        // Product key will have category information embedded.
        String prodKey = prod1+":"+cat1;

        Set<String> productsNavigated = getUserNavigationHistory(user);

        // If the user previously navigated this item -> ignore it
        if(!productsNavigated.contains(prodKey)) {

            // Otherwise update related items
            for (String other : productsNavigated) {
                String [] ot = other.split(":");
                String prod2 = ot[0];
                String cat2 = ot[1];
                collector.emit(new Values(prod1, cat2));
                collector.emit(new Values(prod2, cat1));
            }
            addProductToHistory(user, prodKey);
        }
    }
}

Note that the desired output of this bolt is to emit the products whose categories relations should be incremented.

Take a look at the source code. The bolt keeps a set of the products navigated by each user. Note that the set contains product:category pairs rather than just products. That’s because you’ll need the category information in future calls and it will perform better if you don’t need to get them from the database each time. This is possible because the products have only one category, and it won’t change during the product’s lifetime.

After reading the set of the user’s previously navigated products (with their categories), check if the current product has been visited previously. If so, the entry is ignored. If this is the first time the user has visited this product, iterate through the user’s history and emit a tuple for the product being navigated and the categories of all the products in the history with collector.emit(new Values(prod1, cat2)), and a second tuple for the other products and the category of the product being navigated with collector.emit(new Values(prod2, cat1)). Finally, add the product and its category to the set.

For example, assume that the user John has the following navigation history:

User#Category

John

0

Players

John

2

Players

John

17

TVs

John

21

Mounts

And the following navigation entry needs to be processed:

User#Category

John

8

Phones

The user hasn’t yet looked at product 8, so you need to process it.

Therefore the emited tuples will be:

#Category

8

Players

8

Players

8

TVs

8

Mounts

0

Phones

2

Phones

17

Phones

21

Phones

Note that the relation between the products on the left and the categories on the right should be incremented in one unit.

Now, let’s explore the persistence used by the Bolt.

public class UserHistoryBolt extends BaseRichBolt{
    ...
    private Set<String> getUserNavigationHistory(String user) {
        Set<String> userHistory = usersNavigatedItems.get(user);
        if(userHistory == null) {
            userHistory = jedis.smembers(buildKey(user));
            if(userHistory == null)
                userHistory = new HashSet<String>();
            usersNavigatedItems.put(user, userHistory);
        }
        return userHistory;
    }

    private void addProductToHistory(String user, String product) {
        Set<String> userHistory = getUserNavigationHistory(user);
        userHistory.add(product);
        jedis.sadd(buildKey(user), product);
    }
    ...
}

The getUserNavigationHistory method returns the set of products that the user has visited. First, attempt to get the user’s history from local memory with usersNavigatedItems.get(user), but if it’s not there, read from the Redis server using jedis.smembers(buildKey(user)) and add the entry to the memory structure usersNavigatedItems.

When the user navigates to a new product, call addProductToHistory to update both the memory structure with userHistory.add(product) and the Redis server structure with jedis.sadd(buildKey(user), product).

Note that as long as the bolt keeps information in memory by user, it’s very important that when you parallelize it you use fieldsGrouping by user in the first degree, otherwise different copies of the user history will get out of synch.

ProductCategoriesCounterBolt

The ProductCategoriesCounterBolt class is in charge of keeping track of all the product-category relationships. It receives the product-category pairs emitted by the UsersHistoryBolt and updates the counters.

The information about the number of occurrences of each pair is stored on the Redis server. A local cache for reads and a write buffer are used for performance reasons. The information is sent to Redis in a background thread.

This bolt also emits a tuple with the updated counter for the input pair to feed the next bolt in the topology, the NewsNotifierBolt, which is in charge of broadcasting the news to the final users for real-time updates.

public class ProductCategoriesCounterBolt extends BaseRichBolt {
    ...
    @Override
    public void execute(Tuple input) {
        String product = input.getString(0);
        String categ = input.getString(1);
        int total = count(product, categ);
        collector.emit(new Values(product, categ, total));
    }
    ...
    private int count(String product, String categ) {
        int count = getProductCategoryCount(categ, product);
        count ++;
        storeProductCategoryCount(categ, product, count);
        return count;
    }
    ...
}

Persistence in this bolt is hidden in the getProductCategoryCount and storeProductCategoryCount methods. Let’s take a look inside them:

package storm.analytics;
...
public class ProductCategoriesCounterBolt extends BaseRichBolt {
    // ITEM:CATEGORY -> COUNT
    HashMap<String, Integer> counter = new HashMap<String, Integer>();

    // ITEM:CATEGORY -> COUNT
    HashMap<String, Integer> pendingToSave = new HashMap<String, Integer>();

    ...
    public int getProductCategoryCount(String categ, String product) {
        Integer count = counter.get(buildLocalKey(categ, product));
        if(count == null) {
            String sCount = jedis.hget(buildRedisKey(product), categ);
            if(sCount == null || "nil".equals(sCount)) {
                count = 0;
            } else {
                count = Integer.valueOf(sCount);
            }
        }
        return count;
    }
    ...
    private void storeProductCategoryCount(String categ, String product, int count) {
        String key = buildLocalKey(categ, product);
        counter.put(key , count);
        synchronized (pendingToSave) {
            pendingToSave.put(key, count);
        }
    }
    ...
}

The getProductCategoryCount method first looks in memory cache counter. If the information is not available there, it gets it from the Redis server.

The storeProductCategoryCount method updates the counter cache and the pendingToSave buffer. The buffer is persisted by the following background thread:

package storm.analytics;

public class ProductCategoriesCounterBolt extends BaseRichBolt {
...
    private void startDownloaderThread() {
        TimerTask t = new TimerTask() {
            @Override
            public void run() {
                HashMap<String, Integer> pendings;
                synchronized (pendingToSave) {
                    pendings = pendingToSave;
                    pendingToSave = new HashMap<String, Integer>();
                }

                for (String key : pendings.keySet()) {
                    String[] keys = key.split(":");
                    String product = keys[0];
                    String categ = keys[1];
                    Integer count = pendings.get(key);
                    jedis.hset(buildRedisKey(product), categ, count.toString());
                }
            }
        };
        timer = new Timer("Item categories downloader");
        timer.scheduleAtFixedRate(t, downloadTime, downloadTime);
    }
...
}

The download thread locks pendingToSave, and creates a new empty buffer for the other threads to use while it sends the old one to Redis. This code block runs each downloadTime milliseconds and is configurable through the download-time topology configuration parameter. The longer the download-time is, the fewer writes to Redis are performed because consecutive adds to a pair are written just once.

Keep in mind that again, as in the previous bolt, it is extremely important to apply the correct fields grouping when assigning sources to this bolt, in this case grouping by product. That’s because it stores in-memory copies of the information by product, and if several copies of the cache and the buffer exist there will be inconsistencies.

NewsNotifierBolt

The NewsNotifierBolt is in charge of notifying the web application of changes in the statistics, in order for users to be able to view changes in real time. The notification is made by HTTP POST using Apache HttpClient, to the URL configured in the web server parameter of the topology configuration. The POST body is encoded in JSON.

This bolt is removed from the topology when testing.

package storm.analytics;
...
public class NewsNotifierBolt extends BaseRichBolt {
    ...
    @Override
    public void execute(Tuple input) {
        String product = input.getString(0);
        String categ = input.getString(1);
        int visits = input.getInteger(2);

        String content = "{ "product": ""+product+"", "categ":""+categ+"", "visits":"+visits+" }";

        HttpPost post = new HttpPost(webserver);
        try {
            post.setEntity(new StringEntity(content));
            HttpResponse response = client.execute(post);
            org.apache.http.util.EntityUtils.consume(response.getEntity());
        } catch (Exception e) {
            e.printStackTrace();
            reconnect();
        }
    }
    ...
}

The Redis Server

Redis is an advanced in-memory Key Value Store with support for persistence (see http://redis.io). Use it to store the following information:

  • The product information, used to serve the website.

  • The User Navigation Queue, used to feed the Storm Topology.

  • The Storm Topology Intermediate Data, used by the Topology to recover from failures.

  • The Storm Topology Results, used to store the desired results.

Product Information

The Redis Server stores the products using the product ID for the key and a JSON object containing all the product information as the value.

> redis-cli
redis 127.0.0.1:6379> get 15
"{"title":"Kids smartphone cover","category":"Covers","price":30,"id":15}"

User Navigation Queue

The user navigation queue is stored in a Redis list named navigation and organized as a first-in-first-out (FIFO) queue. The server adds an entry to the left side of the list each time a user visits a product page, indicating which user viewed which product. The storm cluster constantly removes elements from the right side of the list to process the information.

redis 127.0.0.1:6379> llen navigation
(integer) 5
redis 127.0.0.1:6379> lrange navigation 0 4
1) "{"user":"59c34159-0ecb-4ef3-a56b-99150346f8d5","product":"1","type":"PRODUCT"}"
2) "{"user":"59c34159-0ecb-4ef3-a56b-99150346f8d5","product":"1","type":"PRODUCT"}"
3) "{"user":"59c34159-0ecb-4ef3-a56b-99150346f8d5","product":"2","type":"PRODUCT"}"
4) "{"user":"59c34159-0ecb-4ef3-a56b-99150346f8d5","product":"3","type":"PRODUCT"}"
5) "{"user":"59c34159-0ecb-4ef3-a56b-99150346f8d5","product":"5","type":"PRODUCT"}"

Intermediate Data

The cluster needs to store the history of each user separately. In order to do so, it saves a set in the Redis server with all the products and their categories that were navigated by each user.

redis 127.0.0.1:6379> smembers history:59c34159-0ecb-4ef3-a56b-99150346f8d5
1) "1:Players"
2) "5:Cameras"
3) "2:Players"
4) "3:Cameras"

Results

The cluster generates useful data about the customers viewing a specific product and stores them in a Redis Hash named “prodcnt:” followed by the product ID.

redis 127.0.0.1:6379> hgetall prodcnt:2
1) "Players"
2) "1"
3) "Cameras"
4) "2"

Testing the Topology

In order to test the topology, use the provided LocalCluster and a local Redis server (see Figure 6-7). You’ll populate the products database on init and mock the insertion of navigation logs in the Redis server. Our assertions will be performed by reading the topology outputs to the Redis server. Tests are written in Java and Groovy.

The testing architecture

Figure 6-7. The testing architecture

Test Initialization

Initialization consists of three steps:

Start the LocalCluster and submit the Topology. Initialization is implemented in the AbstractAnalyticsTest, which is extended by all tests. A static flag called topologyStarted is used to avoid initializing more than once when multiple AbstractAnalyticsTest subclasses are instantiated.

Note that the sleep is there to allow the LocalCluster to start correctly before attempting to retrieve results from it.

public abstract class AbstractAnalyticsTest extends Assert {
    def jedis
    static topologyStarted = false
    static sync= new Object()

    private void reconnect() {
        jedis = new Jedis(TopologyStarter.REDIS_HOST, TopologyStarter.REDIS_PORT)
    }

    @Before
    public void startTopology(){
        synchronized(sync){
            reconnect()
            if(!topologyStarted){
                jedis.flushAll()
                populateProducts()
                TopologyStarter.testing = true
                TopologyStarter.main(null)
                topologyStarted = true
                sleep 1000
            }
        }
    }

    ...
    public void populateProducts() {
        def testProducts = [
            [id: 0, title:"Dvd player with surround sound system",
                 category:"Players", price: 100],
            [id: 1, title:"Full HD Bluray and DVD player",
                 category:"Players", price:130],
            [id: 2, title:"Media player with USB 2.0 input",
                 category:"Players", price:70],
            ...
            [id: 21, title:"TV Wall mount bracket 50-55 Inches",
                 category:"Mounts", price:80]
        ]

        testProducts.each() { product ->
            def val =
    "{ "title": "${product.title}" , "category": "${product.category}"," +
    " "price": ${product.price}, "id": ${product.id} }"
            println val
            jedis.set(product.id.toString(), val.toString())
        }
    }
    ...
}

Implement a method called navigate in the AbstractAnalyticsTest class. In order for the different tests to have a way to emulate the behavior of a user navigating the website, this step inserts navigation entries in the Redis server navigation queue.

public abstract class AbstractAnalyticsTest extends Assert {
    ...
    public void navigate(user, product) {
        String nav =
 "{"user": "${user}", "product": "${product}", "type": "PRODUCT"}".toString()
        println "Pushing navigation: ${nav}"
        jedis.lpush('navigation', nav)
    }
    ...
}

Provide a method called getProductCategoryStats in the AbstractAnalyticsTest that reads a specific relation from the Redis server. Different tests will also need to assert against the statistics results in order to check if the topology is behaving as expected.

public abstract class AbstractAnalyticsTest extends Assert {
    ...
    public int getProductCategoryStats(String product, String categ) {
        String count = jedis.hget("prodcnt:${product}", categ)
        if(count == null || "nil".equals(count))
            return 0
        return Integer.valueOf(count)
    }
    ...
}

A Test Example

In the next snippet, you’ll emulate a few product navigations of user “1”, then check the results. Note that you wait two seconds before asserting to be sure that the results have been stored to Redis. (Remember that the ProductCategoriesCounterBolt has an in-memory copy of the counters and sends them to Redis in the background.)

package functional

class StatsTest extends AbstractAnalyticsTest {
    @Test
    public void testNoDuplication(){
        navigate("1", "0") // Players
        navigate("1", "1") // Players
        navigate("1", "2") // Players
        navigate("1", "3") // Cameras

        Thread.sleep(2000) // Give two seconds for the system to process the data.

        assertEquals 1, getProductCategoryStats("0", "Cameras")
        assertEquals 1, getProductCategoryStats("1", "Cameras")
        assertEquals 1, getProductCategoryStats("2", "Cameras")
        assertEquals 2, getProductCategoryStats("0", "Players")
        assertEquals 3, getProductCategoryStats("3", "Players")
    }
}

Notes on Scalability and Availability

The architecture of this solution has been simplified to fit into a single chapter of the book. For that reason, you avoided some complexity that would be necessary for this solution to scale and have high availability. There are a couple of major issues with this approach.

The Redis server in this architecture is not only a single point of failure but also a bottleneck. You’ll be able to receive only as much traffic as the Redis server can handle. The Redis layer can be scaled by using sharding, and its availability can be improved by using a Master/Slave configuration, which would require changes to the sources of both the topology and the web application.

Another weakness is that the web application does not scale proportionately by adding servers in a round robin fashion. This is because it needs to be notified when some product statistic changes and to notify all interested browsers. This “notification to browser” bridge is implemented using Socket.io, but it requires that the listener and the notifier be hosted on the same web server. This is achievable only if you shard the GET /product/:id/stats traffic and the POST /news traffic, both with same criteria, ensuring that requests referencing the same product will end up on the same server.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset