Chapter 23. Case Studies

Hive is in use at a multitude of companies and organizations around the world. This case studies chapter details interesting and unique use cases, the problems that were present, and how those issues were solved using Hive as a unique data warehousing tool for petabytes of data.

m6d.com (Media6Degrees)

Data Science at M6D Using Hive and R

by Ori Stitelman

In this case study we examine one of many approaches our data science team, here at m6d, takes toward synthesizing the immense amount of data that we are able to extract using Hive. m6d is a display advertising prospecting company. Our role is to create machine learning algorithms that are specifically tailored toward finding the best new prospects for an advertising campaign. These algorithms are layered on top of a delivery engine that is tied directly into a myriad of real time bidding exchanges that provide a means to purchase locations on websites to display banner advertisements on behalf of our clients. The m6d display advertising engine is involved in billions of auctions a day and tens of millions of advertisements daily. Naturally, such a system produces an immense amount of data. A large portion of the records that are generated by our company’s display advertising delivery system are housed in m6d’s Hadoop cluster and, as a result, Hive is the primary tool our data science team uses to interact with the these logs.

Hive gives our data science team a way to extract and manipulate large amounts of data. In fact, it allows us to extract samples and summarize data that prior to using Hive could not be analyzed as efficiently, or at all, because of the immense size. Despite the fact that Hive allows us access to huge amounts of data at rates many times faster than before, it does not change the fact that most of the tools that we were previously familiar with as data scientists are not always able to analyze data samples of the size we can now produce. In summary, Hive provides us a great tool to extract huge amounts of data; however, the toolbox of data science, or statistical learning, methods that we as data scientists are used to using cannot easily accommodate the new larger data sets without substantial changes.

Many different software packages have been developed or are under development for both supervised and unsupervised learning on large data sets. Some of these software packages are stand alone software implementations, such as Vowpal Wabbit and BBR, while others are implementations within a larger infrastructure such as Mahout for Hadoop or the multitude of “large data” packages for R. A portion of these algorithms take advantage of parallel programing approaches while others rely on different methods to achieve scalability.

The primary tool for statistical learning for several of the data scientists in our team is R. It provides a large array of packages that are able to perform many statistical learning methods. More importantly, we have a lot of experience with it, know how its packages perform, understand their features, and are very familiar with its documentation. However, one major drawback of R is that by default it loads the entire data set into memory. This is a major limitation considering that the majority of the data sets that we extract from Hive and are able to analyze today are much larger than what can fit in memory. Moreover, once the data in R is larger than what is able to fit in memory, the system will start swapping, which leads to the system thrashing and massive decreases in processing speed.[26]

In no way are we advocating ignoring the new tools that are available. Obviously, it is important to take advantage of the best of these scalable technologies, but only so much time can be spent investigating and testing new technology. So now we are left with a choice of either using the new tools that are available for large data sets or downsampling our data to fit into the tools that we are more familiar with. If we decide to use the new tools, we can gain signal by letting our data learn off of more data, and as a result the variance in our estimates will decrease. This is particularly appealing in situations where the outcome is very rare. However, learning these new tools takes time and there is an opportunity cost of using that time to learn new tools rather than answering other questions that have particular value to the company.

Alternatively, we can downsample the data to obtain something that can fit in the old tools we have at our disposal, but must deal with a loss of signal and increased variance in our estimates. However, this allows us to deal with tools with which we are familiar and the features that they provide. Thus, we are able to retain the functionality of our current toolbox at the price of losing some signal. However, these are not the only two possible approaches. In this case study, we highlight a way that we can both retain the functionality of the current toolbox as well as gain signal, or decrease variance, by using a larger sample, or all, of the data available to us.

Figure 23-1 shows the probability of converting versus the score from an algorithm designed to rank prospects for an advertising campaign. Higher scores should indicate a higher probability of conversion. This plot clearly shows that the top individuals are converting at a lower rate than some of the lower scoring browsers. That is, browsers with scores greater than 1 convert at a lower rate than individuals with scores between 0.5 and 1.0. Considering that some campaigns only target a very small percentage of the overall population, it is important the best prospects are among the top scorers.

Probability of conversion versus score

Figure 23-1. Probability of conversion versus score

The line that expresses the relationship between the score and the conversion probability seen in Figure 23-1 is generated using a generalized additive model (GAM) in the statistical programming package R.[27] The details about GAMs will not be presented here. For the purpose of this case study it can be thought of as a black box that produces for each score a prediction of the conversion rate. The browsers can then be re-ranked based on the predicted conversion rate; thus, the predicted conversion rate becomes the new score.

The new ranking can be generated in the following way. First, extract the scores for each individual browser and then follow them for some designated period of time, say five days, and record if they took the desired action, and thus converted. Consider a Hive table called scoretable that has the following information and is partitioned on date and subpartitioned by offer.

NameTypeDescription

score

double

The score is the score generated by the initial algorithm that does not necessarily rank order appropriately.

convert

int

The variable convert is a binary variable that is equal to one if the individual browser takes the desired action in the following five days and equal to zero if not.

date

int

The day that the browser was given the particular score.

offer

int

An ID of an offer.

The following query can then be used to extract a set of data from scoretable for use in R to estimate the GAM line that predicts conversion for different levels of score like in the preceding table:

SELECT score,convert
FROM scoretable
WHERE date >= ()  AND date <= ()
AND offer = ();
1.2347 0
3.2322 1
0.0013 0
0.3441 0

This data is then loaded into R and the following code is used to create the predicted conversion probability versus score, as in the preceding table:

library(mgcv)
g1=gam(convert~s(score),family=binomial,data=[data frame name])

The issue with this approach is that it only can be used for a limited number of days of data because the data set gets too large and R begins thrashing for any more than three days of data. Moreover, it takes approximately 10 minutes of time for each campaign to do this for about three days of data. So, running this analysis for about 300 campaigns for a single scoring method took about 50 hours for three days of data.

By simply extracting the data from Hive in a slightly different way and making use of the feature of the gam function in mgcv that allows for frequency weights, the same analysis may be done using more data, and thus gaining more signal, at a much faster rate. This is done by selecting the data from Hive by rounding the score to the nearest hundredth and getting frequency weights for each rounded score, convert combination by using a GROUP BY query. This is a very common approach for dealing with large data sets and in the case of these scores there should be no loss of signal due to rounding because there is no reason to believe that individuals with scores that differ by less than 0.001 are any different from each other. The following query would select such a data set:

SELECT round(score,2) as score,convert,count(1) AS freq
        FROM scoretable
        WHERE date >= [start.date] and date <= [end.date] and offer = [chosen.offer]
        GROUP BY round(score,2),convert;
1.23 0 500
3.23 1 22
0.00 0 127
0.34 0 36

The resulting data set is significantly smaller than the original approach presented that does not use frequency weights. In fact, the initial data set for each offer consisted of millions of records, and this new data set consists of approximately 6,500 rows per offer. The new data is then loaded into R and the following code may be used to generate the new GAM results:

library(mgcv)
g2=gam(convert~s(score),family=binomial,weights=freq,
 data=[frequency weight data frame name])

(We wrapped the line.)

The previously presented approach took 10 minutes per offer to create the GAM for only three days of data, compared to the frequency-weighted approach which was able to create the GAM based on seven days of data in approximately 10 seconds. Thus, by using frequency weights, the analysis for the 300 campaigns was able to be done in 50 minutes compared to 50 hours using the originally presented approach. This increase in speed was also realized while using more than twice the amount of data resulting in more precise estimates of the predicted conversion probabilities. In summary, the frequency weights allowed for a more precise estimate of the GAM in significantly less time.

In the presented case study, we showed how by rounding the continuous variables and grouping like variables with frequency weights, we were both able to get more precise estimates by using more data and fewer computational resources, resulting in quicker estimates. The example shown was for a model with a single feature, score. In general, this is an approach that will work well for a low number of features or a larger number of sparse features. The above approach may be extended to higher dimensional problems as well using some other small tricks. One way this can be done for a larger number of variables is by bucketing the variables, or features, into binary variables and then using GROUP BY queries and frequency weights for those features. However, as the number of features increases, and those features are not sparse, there is little value gained by such an approach and other alternative methods must be explored, or software designed for larger data sets must be embraced.

M6D UDF Pseudorank

by David Ha and Rumit Patel

Sorting data and identifying the top N elements is straightforward. You order the whole data set by some criteria and limit the result set to N. But there are times when you need to group like elements together and find the top N elements within that group only. For example, identifying the top 10 requested songs for each recording artist or the top 100 best-selling items per product category and country. Several database platforms define a rank() function that can support these scenarios, but until Hive provides an implementation, we can create a user-defined function to produce the results we want. We will call this function p_rank() for psuedorank, leaving the name rank() for the Hive implementation.

Say we have the following product sales data and we want to see the top three items per category and country:

CategoryCountry

Product

Sales

movies

us

chewblanca

100

movies

us

war stars iv

150

movies

us

war stars iii

200

movies

us

star wreck

300

movies

gb

titanus

100

movies

gb

spiderella

150

movies

gb

war stars iii

200

movies

gb

war stars iv

300

office

us

red pens

30

office

us

blue pens

50

office

us

black pens

60

office

us

pencils

70

office

gb

rulers

30

office

gb

blue pens

40

office

gb

black pens

50

office

gb

binder clips

60

In most SQL systems:

SELECT
 category,country,product,sales,rank
FROM (
 SELECT
   category,country,product, sales,
   rank() over (PARTITION BY category, country ORDER BY sales DESC) rank
 FROM p_rank_demo) t
WHERE rank <= 3

To achieve the same result using HiveQL, the first step is partitioning the data into groups, which we can achieve using the DISTRIBUTE BY clause. We must ensure that all rows with the same category and country are sent to the same reducer:

DISTRIBUTE BY
 category,
 country

The next step is ordering the data in each group by descending sales using the SORT BY clause. While ORDER BY effects a total ordering across all data, SORT BY affects the ordering of data on a specific reducer. You must repeat the partition columns named in the DISTRIBUTE BY clause:

SORT BY
 category,
 country,
 sales DESC

Putting everything together, we have:

ADD JAR p-rank-demo.jar;
CREATE TEMPORARY FUNCTION p_rank AS 'demo.PsuedoRank';

SELECT
 category,country,product,sales,rank
FROM (
 SELECT
   category,country,product,sales,
   p_rank(category, country) rank
 FROM (
   SELECT
     category,country,product,
     sales
   FROM p_rank_demo
   DISTRIBUTE BY
     category,country
   SORT BY
     category,country,sales desc) t1) t2
WHERE rank <= 3

The subquery t1 organizes the data so that all data belonging to the same category and country are sorted by descending sales count. The next query t2 then uses p_rank() to assign a rank to each row within the group. The outermost query filters the rank to be in the top three:

CategoryCountry

Product

Sales

Rank

movies

gb

war stars iv

300

1

movies

gb

war stars iii

200

2

movies

gb

spiderella

150

3

movies

us

star wreck

300

1

movies

us

war stars iii

200

2

movies

us

war stars iv

150

3

office

gb

binder clips

60

1

office

gb

black pens

50

2

office

gb

blue pens

40

3

office

us

pencils

70

1

office

us

black pens

60

2

office

us

blue pens

50

3

p_rank() is implemented as a generic UDF whose parameters are all the identifying attributes of the group, which, in this case, are category and country. The function remembers the previous arguments, and so long as the successive arguments match, it increments and returns the rank. Whenever the arguments do not match, the function resets the rank back to 1 and starts over.

This is just one simple example of how p_rank() can be used. You can also find the 10th to 15th bestsellers by category and country. Or, if you precalculate the counts of products in each category and country, you can use p_rank() to calculate percentiles using a join. For example, if there were 1,000 products in the “movies” and “us” group, the 50th, 70th, and 95th quantiles would have rank 500, 700, and 950, respectively. Please know that p_rank() is not a direct substitute for rank() because there will be differences in some circumstances. rank() returns the same value when there are ties, but p_rank() will keep incrementing, so plan accordingly and test with your data.

Lastly, here is the implementation. It is public domain so feel free to use, improve, and modify it to suit your needs:

package demo;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.
  PrimitiveObjectInspectorFactory;

public class PsuedoRank extends GenericUDF {
 /**
  * The rank within the group. Resets whenever the group changes.
  */
 private long rank;

 /**
  * Key of the group that we are ranking. Use the string form
  * of the objects since deferred object and equals do not work
  * as expected even for equivalent values.
  */
 private String[] groupKey;

 @Override
 public ObjectInspector initialize(ObjectInspector[] oi)
     throws UDFArgumentException {
   return PrimitiveObjectInspectorFactory.javaLongObjectInspector;
 }

 @Override
 public Object evaluate(DeferredObject[] currentKey) throws HiveException {
   if (!sameAsPreviousKey(currentKey)) {
     rank = 1;
   }
   return new Long(rank++);
 }

 /**
  * Returns true if the current key and the previous keys are the same.
  * If the keys are not the same, then sets {@link #groupKey} to the
  * current key.
  */
 private boolean sameAsPreviousKey(DeferredObject[] currentKey)
     throws HiveException {
   if (null == currentKey && null == groupKey) {
     return true;
   }
   String[] previousKey = groupKey;
   copy(currentKey);
   if (null == groupKey && null != previousKey) {
     return false;
   }
   if (null != groupKey && null == previousKey) {
     return false;
   }
   if (groupKey.length != previousKey.length) {
     return false;
   }
   for (int index = 0; index < previousKey.length; index++) {
     if (!groupKey[index].equals(previousKey[index])) {
       return false;
     }
   }
   return true;
 }

 /**
  * Copies the given key to {@link #groupKey} for future
  * comparisons.
  */
 private void copy(DeferredObject[] currentKey)
     throws HiveException {
   if (null == currentKey) {
     groupKey = null;
   } else {
     groupKey = new String[currentKey.length];
     for (int index = 0; index < currentKey.length; index++) {
       groupKey[index] = String.valueOf(currentKey[index].get());
     }
   }
 }

 @Override
 public String getDisplayString(String[] children) {
   StringBuilder sb = new StringBuilder();
   sb.append("PsuedoRank (");
   for (int i = 0; i < children.length; i++) {
     if (i > 0) {
       sb.append(", ");
     }
     sb.append(children[i]);
   }
   sb.append(")");
   return sb.toString();
 }
}

M6D Managing Hive Data Across Multiple MapReduce Clusters

Although Hadoop clusters are designed to scale from 10 to 10,000 nodes, sometimes deployment-specific requirements involve running more than one filesystem or JobTracker. At M6D, we have such requirements, for example we have hourly and daily process reports using Hadoop and Hive that are business critical and must complete in a timely manner. However our systems also support data science and sales engineers that periodically run ad hoc reporting. While using the fair share scheduler and capacity scheduler meets many of our requirements, we need more isolation than schedulers can provide. Also, because HDFS has no snapshot or incremental backup type features, we require a solution that will prevent an accidental delete or drop table operations from destroying data.

Our solution is to run two distinct Hadoop deployments. Data can have a replication factor of two or three on the primary deployment and additionally be replicated to a second deployment. This decision allows us to have guaranteed resources dedicated to our time-sensitive production process as well as our ad hoc users. Additionally, we protected against any accidental drop tables or data deletes. This design does incur some overhead in having to administer two deployments and setup and administer the replication processes, but this overhead is justified in our case.

Our two deployments are known as production and research. They each have their own dedicated Data Nodes and Task Trackers. Each NameNode and JobTracker is a failover setup using DRBD and Linux-HA. Both deployments are on the same switching network (Tables 23-1 and 23-2).

Table 23-1. Production

NameNode

hdfs.hadoop.pvt:54310

JobTracker

jt.hadoop.pvt:54311

Table 23-2. Research

NameNode

rs01.hadoop.pvt:34310

JobTracker

rjt.hadoop.pvt:34311

Cross deployment queries with Hive

A given table zz_mid_set exists on Production and we wish to be able to query it from Research without having to transfer the data between clusters using distcp. Generally, we try to avoid this because it breaks our isolation design but it is nice to know that this can be done.

Use the describe extended command to determine the columns of a table as well as its location:

hive> set fs.default.name;
fs.default.name=hdfs://hdfs.hadoop.pvt:54310
hive> set mapred.job.tracker;
mapred.job.tracker=jt.hadoop.pvt:54311
hive> describe extended zz_mid_set;
OK
adv_spend_id    int
transaction_id  bigint
time    string
client_id       bigint
visit_info      string
event_type      tinyint
level   int

location:hdfs://hdfs.hadoop.pvt:54310/user/hive/warehouse/zz_mid_set
Time taken: 0.063 seconds
hive> select count(1) from zz_mid_set;
1795928

On the second cluster, craft a second CREATE TABLE statement with the same columns. Create the second table as EXTERNAL, in this way if the table is dropped on the second cluster the files are not deleted on the first cluster. Notice that for the location we specified a full URI. In fact, when you specify a location as a relative URI, Hive stores it as a full URI:

hive> set fs.default.name;
fs.default.name=hdfs://rs01.hadoop.pvt:34310
hive> set mapred.job.tracker;
mapred.job.tracker=rjt.hadoop.pvt:34311
hive> CREATE TABLE EXTERNAL table_in_another_cluster
( adv_spend_id int, transaction_id bigint, time string, client_id bigint,
visit_info string, event_type tinyint, level int)
LOCATION 'hdfs://hdfs.hadoop.pvt:54310/user/hive/warehouse/zz_mid_set';
hive> select count(*) FROM table_in_another_cluster;
1795928

It is important to note that this cross-deployment access works because both clusters have network access to each other. The TaskTrackers of the deployment we submit the job to will have to be able to access the NameNode and DataNodes of the other deployment. Hadoop was designed to move processing closer to data. This is done by scheduling tasks to run on nodes where the data is located. In this scenario TaskTrackers connect to another cluster’s DataNodes. Which means a general performance decrease and network usage increase.

Replicating Hive data between deployments

Replicating Hadoop and Hive data is easier than replicating a traditional database. Unlike a database running multiple transactions that change the underlying data frequently, Hadoop and Hive data is typically “write once.” Adding new partitions does not change the existing ones, and typically new partitions are added on time-based intervals.

Early iterations of replication systems were standalone systems that used distcp and generated Hive statements to add partitions on an interval. When we wanted to replicate a new table, we could copy an existing program and make changes for different tables and partitions. Over time we worked out a system that could do this in a more automated manner without having to design a new process for each table to replicate.

The process that creates the partition also creates an empty HDFS file named:

/replication/default.fracture_act/hit_date=20110304,mid=3000

The replication daemon constantly scans the replication hierarchy. If it finds a file, it looks up the table and partition in Hive’s metadata. It then uses the results to replicate the partition. On a successful replication the file is then deleted.

Below is the main loop of the program. First, we do some checking to make sure the table is defined in the source and destination metastores:

 public void run(){
    while (goOn){
      Path base = new Path(pathToConsume);
      FileStatus [] children = srcFs.listStatus(base);
      for (FileStatus child: children){
        try {
          openHiveService();
          String db = child.getPath().getName().split("\.")[0];
          String hiveTable = child.getPath().getName().split("\.")[1];
          Table table = srcHive.client.get_table(db, hiveTable);
          if (table == null){
            throw new RuntimeException(db+" "+hiveTable+
              " not found in source metastore");
          }
          Table tableR = destHive.client.get_table(db,hiveTable);
          if (tableR == null){
            throw new RuntimeException(db+" "+hiveTable+
              " not found in dest metastore");
          }

Using the database and table name we can look up the location information inside the metastore. We then do a sanity check to ensure the information does not already exist:

          URI localTable = new URI(tableR.getSd().getLocation());
          FileStatus [] partitions = srcFs.listStatus(child.getPath());
          for (FileStatus partition : partitions){
            try {
              String replaced = partition.getPath().getName()
              .replace(",", "/").replace("'","");
              Partition p = srcHive.client.get_partition_by_name(
              db, hiveTable, replaced);
        URI partUri = new URI(p.getSd().getLocation());
        String path = partUri.getPath();
              DistCp distCp = new DistCp(destConf.conf);
              String thdfile = "/tmp/replicator_distcp";
              Path tmpPath = new Path(thdfile);
        destFs.delete(tmpPath,true);
              if (destFs.exists( new Path(localTable.getScheme()+
                "://"+localTable.getHost()+":"+localTable.getPort()+path) ) ){
                throw new RuntimeException("Target path already exists "
                  +localTable.getScheme()+"://"+localTable.getHost()+
                  ":"+localTable.getPort()+path );
              }

Hadoop DistCP is not necessarily made to be run programmatically. However, we can pass a string array identical to command-line arguments to its main function. After, we check to confirm the returned result was a 0:

              String [] dargs = new String [4];
              dargs[0]="-log";
              dargs[1]=localTable.getScheme()+"://"+localTable.getHost()+":"+
                localTable.getPort()+thdfile;
              dargs[2]=p.getSd().getLocation();
              dargs[3]=localTable.getScheme()+"://"+localTable.getHost()+":"+
                localTable.getPort()+path;
              int result =ToolRunner.run(distCp,dargs);
              if (result != 0){
                throw new RuntimeException("DistCP failed "+ dargs[2] +" "+dargs[3]);
              }

Finally, we re-create the ALTER TABLE statement that adds the partition:

              String HQL = "ALTER TABLE "+hiveTable+
                  " ADD PARTITION ("+partition.getPath().getName()
                +") LOCATION '"+path+"'";
              destHive.client.execute("SET hive.support.concurrency=false");
              destHive.client.execute("USE "+db);
              destHive.client.execute(HQL);
              String [] results=destHive.client.fetchAll();
              srcFs.delete(partition.getPath(),true);
            } catch (Exception ex){
              ex.printStackTrace();
            }
          } // for each partition
        } catch (Exception ex) {
          //error(ex);
          ex.printStackTrace();
        }
      } // for each table
      closeHiveService();
      Thread.sleep(60L*1000L);
    } // end run loop
  } // end run

Outbrain

by David Funk

Outbrain is the leading content-discovery platform.

In-Site Referrer Identification

Sometimes, when you’re trying to aggregate your traffic, it can be tricky to tell where it’s actually coming from, especially for traffic coming from elsewhere in your site. If you have a site with a lot of URLs with different structures, you can’t simply check that the referrer URLs match the landing page.

Cleaning up the URLs

What we want is to correctly group each referrer as either In-site, Direct, or Other. If it’s Other, we’ll just keep the actual URL. That way you can tell your internal traffic apart from Google searches to your site, and so on and so forth. If the referrer is blank or null, we’ll label it as Direct.

From here on out, we’ll assume that all our URLs are already parsed down to the host or domain, whatever level of granularity you’re aiming for. Personally, I like using the domain because it’s a little simpler. That said, Hive only has a host function, but not domain.

If you just have the raw URLs, there are a couple of options. The host, as given below, gives the full host, like news.google.com or www.google.com, whereas the domain would truncate it down to the lowest logical level, like google.com or google.co.uk.

Host = PARSE_URL(my_url, HOST’’)

Or you could just use a UDF for it. Whatever, I don’t care. The important thing is that we’re going to be using these to look for matches, so just make your choice based on your own criteria.

Determining referrer type

So, back to the example. We have, let’s say, three sites: mysite1.com, mysite2.com, and mysite3.com. Now, we can convert each pageview’s URL to the appropriate class. Let’s imagine a table called referrer_identification:

ri_page_url STRING
ri_referrer_url STRING

Now, we can easily add in the referrer type with a query:

SELECT ri_page_url, ri_referrer_url,
  CASE
    WHEN ri_referrer_url is NULL or ri_referrer_url = ‘’ THEN DIRECT
    WHEN ri_referrer_url is in (mysite1.com,mysite2.com,mysite3.com) THEN INSITE
    ELSE ri_referrer_url
  END as ri_referrer_url_classed
FROM
  referrer_identification;

Multiple URLs

This is all pretty simple. But what if we’re an ad network? What if we have hundreds of sites? What if each of the sites could have any number of URL structures?

If that’s the case, we probably also have a table that has each URL, as well as what site it belongs to. Let’s call it site_url, with a schema like:

su_site_id INT
su_url STRING

Let’s also add one more field to our earlier table, referrer_identification:

ri_site_id INT

Now we’re in business. What we want to do is go through each referrer URL and see if it matches with anything of the same site ID. If anything matches, it’s an In-site referrer. Otherwise, it’s something else. So, let’s query for that:

SELECT
  c.c_page_url as ri_page_url,
  c.c_site_id as ri_site_id,
  CASE
    WHEN c.c_referrer_url is NULL or c.c_referrer_url = ‘’ THEN DIRECT
    WHEN c.c_insite_referrer_flags > 0 THEN INSITE
    ELSE c.c_referrer_url
  END as ri_referrer_url_classed
FROM
(SELECT
  a.a_page_url as c_page_url,
  a.a_referrer_url as c_referrer_url,
  a.a_site_id as c_site_id,
  SUM(IF(b.b_url <> ‘’, 1, 0)) as c_insite_referrer_flags
FROM
(SELECT
  ri_page_url as a_page_url,
  ri_referrer_url as a_referrer_url,
  ri_site_id as a_site_id
FROM
  referrer_identification
) a
LEFT OUTER JOIN
(SELECT
  su_site_id as b_site_id,
  su_url as b_url
FROM
  site_url
) b
ON
  a.a_site_id = b.b_site_id and
  a.a_referrer_url = b.b_url
) c

A few small notes about this. We use the outer join in this case, because we expect there to be some external referrers that won’t match, and this will let them through. Then, we just catch any cases that did match, and if there were any, we know they came from somewhere in the site.

Counting Uniques

Let’s say you want to calculate the number of unique visitors you have to your site/network/whatever. We’ll use a ridiculously simple schema for our hypothetical table, daily_users:

du_user_id STRING
du_date STRING

However, if you have too many users and not enough machines in your cluster, it might begin to have trouble counting users over a month:

SELECT
 COUNT(DISTINCT du_user_id)
FROM
 daily_users
WHERE
  du_date >= 2012-03-01 and
  du_date <= 2012-03-31

In all likelihood, your cluster is probably able to make it through the map phase without too much problems, but starts having issues around the reduce phase. The problem is that it’s able to access all the records but it can’t count them all at once. Of course, you can’t count them day by day, either, because there might be some redundancies.

Why this is a problem

Counting uniques is O(n), where n is the number of records, but it has a high constant factor. We could maybe come up with some clever way to cut that down a little bit, but it’s much easier to cut down your n. While it’s never good to have a high O(n), most of the real problems happen further along. If you have something that takes n1.1 time to run, who cares if you only have n=2 versus n=1. It’s slower, sure, but nowhere near the difference between n=1 and n=100.

So, if each day has m entries, and an average of x redundancies, our first query would have n= 31*m. We can reduce this to n=31*(m–x) by building a temp table to save deduped versions for each day.

Load a temp table

First, create the temp table:

CREATE TABLE daily_users_deduped (dud_user_id STRING)
PARTITIONED BY (dud_date STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY t;

Then we write a template version of a query to run over each day, and update it to our temp table. I like to refer to these as “metajobs,” so let’s call this mj_01.sql:

INSERT OVERWRITE TABLE daily_users_deduped
PARTITION (dud_date = :date:)

SELECT DISTINCT
  du_user_id
FROM
  daily_users
WHERE
  du_date = :date:

Next, we write a script that marks this file up, runs it, and repeats it for every date in a range. For this, we have three functions, modify_temp_file, which replaces a variable name with fire_query, which basically runs hive –f on a file, and then a function to delete the file:

start_date = ‘2012-03-01’
end_date = ‘2012-03-31’

for date in date_range(start_date, end_date):
  femp_file = modify_temp_file(‘mj_01.sql’,{‘:date:’:my_date})
  fire_query(temp_file)
  delete(temp_file)

Querying the temp table

Run the script, and you’ve got a table with a n=31*(m-x). Now, you can query the deduped table without as big a reduce step to get through.

SELECT
  COUNT(DISTINCT (dud_uuid)
FROM
  daily_users_deduped

If that’s not enough, you can then dedupe sets of dates, maybe two at a time, whatever the interval that works for you. If you still have trouble, you could hash your user IDs into different classes, maybe based on the first character, to shrink n even further.

The basic idea remains, if you limit the size of your n, a high O(n) isn’t as big of a deal.

Sessionization

For analyzing web traffic, we often want to be able to measure engagement based on various criteria. One way is to break up user behavior into sessions, chunks of activity that represent a single “use.” A user might come to your site several times a day, a few days a month, but each visit is certainly not the same.

So, what is a session? One definition is a string of activity, not separated by more than 30 minutes. That is, if you go to your first page, wait five minutes, go to the second page, it’s the same session. Wait 30 minutes exactly until the third page, still the same session. Wait 31 minutes until that fourth page, and the session will be broken; rather than the fourth pageview, it would be the first page of the second session.

Once we’ve got these broken out, we can look at properties of the session to see what happened. The ubiquitous case is to compare referrers to your page by session length. So, we might want to find out if Google or Facebook give better engagement on your site, which we might measure by session length.

At first glance, this seems perfect for an iterative process. For each pageview, keep counting backwards until you find the page that was first. But Hive isn’t iterative.

You can, however, figure it out. I like to break this into four phases.

  1. Identify which pageviews are the session starters, or “origin” pages.

  2. For every pageview, bucket it in with the correct origin page.

  3. Aggregate all the pageviews for each origin page.

  4. Label each origin page, then calculate engagement for each session.

This leaves a table where each row represents a full session, which you can then query for whatever you want to find out.

Setting it up

Let’s define our table, session_test:

st_user_id STRING
st_pageview_id STRING
st_page_url STRING
st_referrer_url STRING
st_timestamp DOUBLE

Most of this is pretty straightforward, though I will mention that st_pageview_id is basically a unique ID to represent each transaction, in this case a pageview. Otherwise, it could be confusing if you happened to have multiple views of the same page. For the purposes of this example, the timestamp will be in terms of seconds.

Finding origin pageviews

All right, let’s start with step one (shocking!). How do we find which pageviews are the session starters? Well, if we assume any break of more than 30 minutes implies a new session, than any session starter can’t have any activity that precedes it by 30 minutes or less. This is a great case for conditional sums. What we want to do is count up how many times, for each pageview. Then, anything with a count of zero must be an origin case.

In order to do this, we need to compare every pageview that could precede it. This is a pretty expensive move, as it requires performing a cross-product. To prevent this from blowing up to unmanageable size, we should group everything on criteria that limits it as much as possible. In this case, it’s just the user ID, but if you have a large network of independent sites, you might also want to group based on each source, as well:

CREATE TABLE sessionization_step_one_origins AS

SELECT
  c.c_user_id as ssoo_user_id,
  c.c_pageview_id as ssoo_pageview_id,
  c.c_timestamp as ssoo_timestamp
FROM
        (SELECT
    a.a_user_id as c_user_id,
    a.a_pageview_id as c_pageview_id,
    a.a_timestamp as c.c_timestamp,
    SUM(IF(a.a_timestamp + 1800 >= b.b_timestamp AND
        a.a_timestamp < b.b_timestamp,1,0)) AS c_nonorigin_flags
  FROM
    (SELECT
      st_user_id as a_user_id,
      st_pageview_id as a_pageview_id,
      st_timestamp as a_timestamp
    FROM
      session_test
    ) a
    JOIN
    (SELECT
      st_user_id as b_user_id,
      st_timestamp as b_timestamp
    FROM
      session_test
    ) b
    ON
      a.a_user_id = b.b_user_id
  GROUP BY
    a.a_user_id,
    a.a_pageview_id,
    a.a_timestamp
  ) c
WHERE
  c.c_nonorigin_flags

That’s a bit much, isn’t it? The important part is to count the flags that are not of a session origin, which is where we define c_nonorigin_flags. Basically, counting up how many reasons why it isn’t the session starter. Aka, this line:

SUM(IF(a.a_timestamp + 1800 >= b.b_timestamp AND
    a.a_timestamp < b.b_timestamp,1,0)) as c_nonorigin_flags

Let’s break this up, part by part. First, everything is in terms of subquery a. We only use b to qualify those candidates. So, the first part, the a.a_timestamp + 1800 >= b.b_timestamp, is just asking if the candidate timestamp is no more than 30 minutes prior to the qualifying timestamp. The second part, a.a_timestamp < b.b_timestamp adds a check to make sure that it is earlier, otherwise every timestamp that occurred later than it’s qualifier would trigger a false positive. Plus, since this is a cross-product, it prevents a false positive by using the candidate as its own qualifier.

Now, we’re left with a table, sessionization_step_one_origins, with a schema of:

ssoo_user_id STRING
ssoo_pageview_id STRING
ssoo_timestamp DOUBLE

Bucketing PVs to origins

Which is probably a good reason to start on step two, finding which pageview belongs to which origin. It’s pretty simple to do this, every pageview’s origin must be the one immediately prior to it. For this, we take another big join to check for the minimum difference between a pageview’s timestamp and all the potential origin pageviews:

CREATE TABLE sessionization_step_two_origin_identification AS

SELECT
  c.c_user_id as sstoi_user_id,
  c.c_pageview_id as sstoi_pageview_id,
  d.d_pageview_id as sstoi_origin_pageview_id
FROM
(SELECT
  a.a_user_id as c_user_id,
  a.a_pageview_id as c_pageview_id,
  MAX(IF(a.a_timestamp >= b.b_timestamp, b.b_timestamp, NULL)) as c_origin_timestamp
FROM
(SELECT
  st_user_id as a_user_id,
  st_pageview_id as a_pageview_id,
  st_timestamp as a_timestamp
FROM
  session_test
) a
JOIN
(SELECT
  ssoo_user_id as b_user_id,
  ssoo_timestamp as b_timestamp
FROM
  sessionization_step_one_origins
) b
ON
  a.a_user_id = b.b_user_id
GROUP BY
  a.a_user_id,
  a.a_pageview_id
) c
JOIN
(SELECT
  ssoo_usr_id as d_user_id,
  ssoo_pageview_id as d_pageview_id,
  ssoo_timestamp as d_timestamp
FROM
  sessionization_step_one_origins
) d
ON
  c.c_user_id = d.d_user_id and
  c.c_origin_timestamp = d.d_timestamp

There’s a lot to mention here. First, let’s look at this line:

MAX(IF(a.a_timestamp >= b.b_timestamp, b.b_timestamp, NULL)) as c_origin_timestamp

Again, we use the idea of qualifiers and candidates, in this case b are the candidates for every qualifier a. An origin candidate can’t come later than the pageview, so for every case like that, we want to find the absolute latest origin that meets that criteria. The null is irrelevant, because we are guaranteed to have a minimum, because there is always at least one possible origin (even if it’s itself). This doesn’t give us the origin, but it gives us the timestamp, which we can use as a fingerprint for what the origin should be.

From here, it’s just a matter of matching up this timestamp with all the other potential origins, and we know which origin each pageview belongs to. We’re left with the table sessionization_step_two_origin_identification, with the following schema:

sstoi_user_id STRING
sstoi_pageview_id STRING
sstoi_origin_pageview_id STRING

It’s worth mentioning that this isn’t the only way to identify the origin pageviews. You could do it based on the referrer, labeling any external referrer, homepage URL, or blank referrer (indicating direct traffic) as a session origin. You could base it on an action, only measuring activity after a click. There are plenty of options, but the important thing is simply to identify what the session origins are.

Aggregating on origins

At this point, it’s all pretty easy. Step three, where we aggregate on origins, is really, really simple. For each origin, count up how many pageviews match to it:

CREATE TABLE sessionization_step_three_origin_aggregation AS

SELECT
  a.a_user_id as sstoa_user_id,
  a.a_origin_pageview_id as sstoa_origin_pageview_id,
  COUNT(1) as sstoa_pageview_count
FROM
  (SELECT
    ssoo_user_id  as a_user_id
    ssoo_pageview_id as a_origin_pageview_id
  FROM
    sessionization_step_one_origins
  ) a
  JOIN
  (SELECT
    sstoi_user_id as b_user_id,
    sstoi_origin_pageview_id as b_origin_pageview_id
  FROM
    sessionization_step_two_origin_identification
  ) b
ON
  a.a_user_id = b.b_user_id and
  a.a_origin_pageview_id = b.b_origin_pageview_id
GROUP BY
  a.a_user_id,
  a.a_origin_pageview_id

Aggregating on origin type

Now, this last step we could have avoided by keeping all the qualitative info about a pageview, particularly the origins, in one of the earlier steps. However, if you have a lot of details you want to pay attention to, it can sometimes be easier to add it in at the end. Which is step four:

CREATE TABLE sessionization_step_four_qualitative_labeling

SELECT
  a.a_user_id as ssfql_user_id,
  a.a_origin_pageview_id as ssfql_origin_pageview_id,
  b.b_timestamp as ssfql_timestamp,
  b.b_page_url as ssfql_page_url,
  b.b_referrer_url as ssfql_referrer_url,
  a.a_pageview_count as ssqfl_pageview_count
(SELECT
  sstoa_user_id as a_user_id,
  sstoa_origin_pageview_id as a_origin_pageview_id,
  sstoa_pageview_count as a_pageview_count
FROM
  sessionization_step_three_origin_aggregation
) a
JOIN
(SELECT
  st_user_id as b_user_id,
  st_pageview_id as b_pageview_id,
  st_page_url as b_page_url,
  st_referrer_url as b_referrer_url,
  st_timestamp as b_timestamp
FROM
  session_test
) b
ON
  a.a_user_id = b.b_user_id and
  a.a_origin_pageview_id = b.b_pageview_id

Measure engagement

Now, with our final table, we can do whatever we want. Let’s say we want to check the number of sessions, average pageviews per session, weighted average pageviews per session, and the max or min. We could pick whatever criteria we want, or none at all, but in this case, let’s do it by referrer URL so we can find out the answer to which traffic source gives the best engagement. And, just for kicks, let’s also check who gives us the most unique users:

SELECT
  PARSE_URL(ssfql_referrer_url, HOST) as referrer_host,
  COUNT(1) as session_count,
  AVG(ssfql_pageview_count) as avg_pvs_per_session,
  SUM(ssfq_pageview_count)/COUNT(1) as weighted_avg_pvs_per_session,
  MAX(ssfql_pageview_count) as max_pvs_per_session,
  MIN(ssfql_pageview_count) as min_pvs_per_session,
  COUNT(DISTINCT ssfql_usr_id) as unique_users
FROM
  sessionization_step_three_origin_aggregation
GROUP BY
  PARSE_URL(ssfql_referrer_url, HOST) as referrer_host

And there we have it. We could check which page URL gives the best engagement, figure out who the power users are, whatever. Once we’ve got it all in a temp table, especially with a more complete set of qualitative attributes, we can answer all sorts of questions about user engagement.

NASA’s Jet Propulsion Laboratory

The Regional Climate Model Evaluation System

by Chris A. Mattmann, Paul Zimdars, Cameron Goodale, Andrew F. Hart, Jinwon Kim, Duane Waliser, Peter Lean

Since 2009, our team at NASA’s Jet Propulsion Laboratory (JPL) has actively led the development of a Regional Climate Model Evaluation System (RCMES). The system, originally funded under the American Recovery and Reinvestment Act (ARRA) has the following goals:

  • Facilitate the evaluation and analysis of regional climate model simulation outputs via the availability of the reference data sets of quality-controlled observations and assimilations especially from spaceborne sensors, an efficient database structure, a collection of computational tools for calculating the metrics for model evaluation metrics and diagnostics, and relocatable and friendly user interfaces.

  • Easily bring together a number of complex, and heterogeneous software tools and capability for data access, representation, regridding, reformatting, and visualization so that the end product such as a bias plot can be easily delivered to the end user.

  • Support regional assessments of climate variability, and impacts, needed to inform decision makers (e.g., local governments, agriculture, state government, hydrologists) so that they can make critical decisions with large financial and societal impact.

  • Overcome data format and metadata heterogeneity (e.g., NetCDF3/4, CF metadata conventions, HDF4/5, HDF-EOS metadata conventions).

  • Deal with spatial and temporal differences, (e.g., line up the data alongside a 180/80 lat-lon grid—such as converting from, for example, a 360/360 lat-lon grid—and making sure data, that may be originally daily, is properly comparable with monthly data.

  • Elastically scaling up, performing a regional study that requires specific remote sensing data, and climate model output data, performing a series of analyses, and then destroying that particular instance of the system. In other words, supporting transient analyses, and rapid construction/deconstruction of RCMES instances.

Figure 23-2 shows the architecture and data flow of the Regional Climate Model Evaluation System

JPL Architecture Diagram.

Figure 23-2. JPL Architecture Diagram.

In support of these goals, we have constructed a multifaceted system shown in Figure 23-2. Reading the diagram from left to right, available reference data sets from observations and assimilations, especially from satellite-based remote sensing, enters the system according to the desired climate parameters useful for climate model evaluation. Those parameters are stored in various mission data sets, and those data sets are housed in several external repositories, eventually fed into the database component (RCMED: Regional Climate Model Evaluation Database) of RCMES.

As an example, AIRS is NASA’s Atmospheric Infrared Sounder and provides parameters including surface air temperature, temperature, and geopotential; MODIS is NASA’s Moderate Imaging Spectroradiometer and provides parameters including cloud fraction; and TRMM is NASA’s Tropical Rainfall Measurement Mission and provides parameters including monthly precipitation. This information is summarized in our RCMES system website parameter table, http://rcmes.jpl.nasa.gov/rcmed/parameters/, and shown in Figure 23-3.

JPL Physical Architecture Diagram

Figure 23-3. JPL Physical Architecture Diagram

Data sets are loaded into the RCMED using the Apache OODT extractor framework and the desired parameters, their values, spatial and temporal constraints (and optionally height) are loaded and potentially transformed (e.g., normalized, put on same coordinate system, converted from unit values) into a MySQL database. The data loaded into that MySQL database, RCMED, is exposed to external clients via a Space/Time query and subsetting web service; the description of which is a topic of a separate study. For all intents and purposes, it provides the same capabilities that the OPeNDAP technology does.

The right side of the diagram shows the Regional Climate Model Evaluation Toolkit (RCMET). It provides users with the ability to take in the reference data from RCMED and climate model output data produced elsewhere and to re-grid these datasets in order to match them spatially and temporally in preparation for the comparison of the reference and model data for the evaluation of model output against the user-selected reference data. At that point, the system allows for seasonal cycle compositing (e.g., all Januaries, or all summer months for N years), and for preparing the data for eventual metrics calculation, that is, comparison of the values of the model output against the remote sensing data observation parameters and their values. The system supports several metrics, such as bias computation, Root Mean Squared Error (RMSE), and the generation of relevant visualizations, including traditional plots and Taylor diagrams for science use/decision making.

Our Experience: Why Hive?

So, where does Hive come in to play? After loading 6 billion rows of (latitude, longitude, time, data point value, height) tuples into MySQL, the system fell down and experienced data loss. This is probably due in part to our naïve strategy of storing all of the data points in a single table. Over time, we evolved this strategy to break tables down by dataset and by parameter, which helped but added needless overhead that we didn’t want to spend cycles engineering around.

Instead, we decided to experiment with the Apache Hive technology. We installed Hive 0.5+20 using CDHv3 and Apache Hadoop (0.20.2+320). CDHv3 came with a number of other relevant tools including Sqoop, and Hue, which we leveraged in our architecture, shown in the bottom portion of Figure 23-3.

We used Apache Sqoop to dump out the data into Hive, and then wrote an Apache OODT wrapper that queried Hive for the data by Space/Time and provided it back to the RCMET and other users (shown in the middle portion of Figure 23-2). The full architecture for the RCMES cluster is shown in Figure 23-3. We had five machines, including a master/slave configuration as shown in the diagram, connected by a private network running GigE.

Some Challenges and How We Overcame Them

During the migration of data from MySQL to Hive, we experienced slow response times while doing simple tasks such as a count DB query (e.g., hive> select count(datapoint_id) from dataPoint;). We initially loaded up around 2.5 billion data points in a single table and noticed that on our machine configuration, Hive took approximately 5–6 minutes to do a count of these 2.5 billion records (15–17 minutes for the full 6.8 billion records). The reduce portion was fast (we were experiencing a single reduce phase since we were using a count * benchmark) but the map stage took the remainder of the time (~95%). Our system at the time consisted of six (4 x quad-core) systems with approximately 24 GB of RAM each (all of the machines shown in Figure 23-3, plus another “borrowed machine” of similar class from another cluster).

We attempted to add more nodes, increase map tasktrackers (many different #s), change DFS block size (32 M, 64 M, 128 MB, 256 M), leverage LZO compression, and alter many other configuration variables (io.sort.factor, io.sort.mb) without much success in lowering the time to complete the count. We did notice a high I/O wait on the nodes no matter how many task trackers we ran. The size of the database was approximately ~200GB and with MySQL it took a few seconds to do both the 2.5 billion and 6.7 billion count.

Members of the Hive community jumped in and provided us with insight, ranging from mentioning that HDFS read speed is about 60 MB/sec comparing to about 1 GB/sec on local disk, depending of course on network speed, and namenode workload. The numbers suggested by the community member suggested that we needed roughly 16 mappers in the Hadoop job to match with the I/O performance of a local non-Hadoop task. In addition, Hive community members suggested that we increase the number of mappers (increase parallelism) by reducing the split size (input size) for each mapper, noting we should examine the following parameters: mapred.min.split.size, mapred.max.split.size, mapred.min.split.size.per.rack, and mapred.min.split.size.per.node, and suggesting that the parameters should be set to a value of 64 MB. Finally, the community suggested that we look at a benchmark that only counts rows by using count(1) instead of count (datapoint_id), as the latter is faster since no column reference means no decompression and deserialization, e.g., if you store your table in RCFile format.

Based on the above feedback, we were able to tune our Hive cluster for RCMES to respond to a count query benchmark, and to a space/time query from RCMET for billions of rows in under 15 seconds, using the above-mentioned resources, making Hive a viable and great choice for our system architecture.

Conclusion

We have described our use of Apache Hive in the JPL Regional Climate Model Evaluation System. We leveraged Hive during a case study wherein we wanted to explore cloud-based technology alternatives to MySQL, and configuration requirements needed to make it scale to the level of tens of billions of rows, and to elastically destroy and re-create the data stored within.

Hive did a great job of meeting our system needs and we are actively looking for more ways to closely integrate it into the RCMES system.

Photobucket

Photobucket is the largest dedicated photo-hosting service on the Internet. Started in 2003 by Alex Welch and Darren Crystal, Photobucket quickly became one of the most popular sites on the Internet and attracted over one hundred million users and billions of stored and shared media. User and system data is spread across hundreds of MySQL instances, thousands of web servers, and petabytes of filesystem.

Big Data at Photobucket

Prior to 2008, Photobucket didn’t have a dedicated analytics system in-house. Questions from the business users were run across hundreds of MySQL instances and the results aggregated manually in Excel.

In 2008, Photobucket embarked on implementing its first data warehouse dedicated to answering the increasingly complex data questions being asked by a fast-growing company.

The first iteration of the data warehouse was built using an open source system with a Java SQL optimizer and a set of underlying PostGreSQL databases. The previous system worked well into 2009, but the shortcomings in the architecture became quickly evident. Working data sets quickly became larger than the available memory; coupled with the difficulty in repartitioning the data across the PostGreSQL nodes forced us to scale up when we really wanted to scale out.

In 2009, we started to investigate systems that would allow us to scale out, as the amount of data continued to grow and still meet our SLA with the business users. Hadoop quickly became the favorite for consuming and analyzing the terabytes of data generated daily by the system, but the difficulty of writing MapReduce programs for simple ad hoc questions became a negative factor for full implementation. Thankfully, Facebook open sourced Hive a few weeks later and the barriers to efficiently answering ad hoc business questions were quickly smashed.

Hive demonstrates many advantages over the previous warehouse implementation. Here are a few examples of why we chose Hadoop and Hive:

  1. Ability to handle structured and unstructured data

  2. Real-time streaming of data into HDFS from Flume, Scribe, or MountableHDFS

  3. Extend functionality through UDFs

  4. A well-documented, SQL-like interface specifically built for OLAP versus OLTP

What Hardware Do We Use for Hive?

Dell R410, 4 × 2 TB drives with 24 GB RAM for the data nodes, and Dell R610, 2 × 146 GB (RAID 10) drives with 24 GB RAM for the management hardware.

What’s in Hive?

The primary goal of Hive at Photobucket is to provide answers about business functions, system performance, and user activity. To meet these needs, we store nightly dumps of MySQL data sets from across hundreds of servers, terabytes of logfiles from web servers and custom log formats ingested through Flume. This data helps support many groups throughout the company, such as executive management, advertising, customer support, product development, and operations just to name a few. For historical data, we keep the partition of all data created on the first day of the month for MySQL data and 30+ days of log files. Photobucket uses a custom ETL framework for migrating MySQL data into Hive. Log file data is streamed into HDFS using Flume and picked up by scheduled Hive processes.

Who Does It Support?

Executive management relies on Hadoop to provide reports surrounding the general health of the business. Hive allows us to parse structured database data and unstructured click stream data and distill the data into a format requested by the business stakeholder.

Advertising operations uses Hive to sift through historical data for forecast and define quotas for ad targeting.

Product development is far and away the group generating the largest number of ad hoc queries. As with any user base, segments change and evolve over time. Hive is important because it allows us to run A/B tests across current and historical data to gauge relevancy of new products in a quickly changing user environment.

Providing our users with a best-in-class system is the most important goal at Photobucket. From an operations perspective, Hive is used to generate rollup data partitioned across multiple dimensions. Knowing the most popular media, users, and referring domains is important for many levels across the company. Controlling expenses is important to any organization. A single user can quickly consume large amounts of system resources, significantly increasing monthly expenditures. Hive is used to identify and analyze rogue users; to determine which ones are within our Terms of Service and which are not. Operations also uses Hive to run A/B tests defining new hardware requirements and generating ROI calculations. Hive’s ability to abstract users from underlying MapReduce code means questions can be answered in hours or days instead of weeks.

SimpleReach

by Eric Lubow

At SimpleReach, we use Cassandra to store our raw data from all of our social network polling. The format of the row key is an account ID (which is a MongoDB ObjectId) and a content item ID (witha MD5 hash of the URL of the content item being tracked) separated by an underscore which we split on to provide that data in the result set. The columns in the row are composite columns that look like the ones below:

4e87f81ca782f3404200000a_8c825814de0ac34bb9103e2193a5b824
=> (column=meta:published-at, value=1330979750000, timestamp=1338919372934628)
=> (column=hour:1338876000000_digg-diggs, value=84, timestamp=1338879756209142)
=> (column=hour:1338865200000_googleplus-total, value=12, timestamp=1338869007737888)

In order for us to be able to query on composite columns, we need to know the hex value of the column name. In our case, we want to know the hex value of the column name (meta:'published-at').

The hex equivalent is below: 00046D65746100000C7075626C69736865642D617400 = meta:published-at

Once the column name is converted to hexadecimal format, Hive queries are run against it. The first part of the query is the LEFT SEMI JOIN, which is used to mimic a SQL subselect. All the references to SUBSTR and INSTR are to handle the case of composite columns. Since it is known in advance that characters 10–23 of the “hour:*” columns (i.e., SUBSTR(r.column_name,10,13)) is a timestamp and therefore we can crop it out and use it in the returned data or for matching. The INSTR is used to match column names and ensure the result set always has the same columns in the same place in the output. The SUBSTR is used for matching as part of the Ruby function. The SUBSTR returns a timestamp (long) in milliseconds since epoch and the start_date and end_date are also a timestamp in milliseconds since epoch. This means that the passed in values can be matched to a part of the column name.

The goal of this query is to export our data from Cassandra into a CSV file to give aggregated data dumps to our publishers. It is done via a Resque (offline) job that is kicked off through our Rails stack. Having a full CSV file means that all columns in the header must be accounted for in the Hive query (meaning that zeros need to be put to fill places where there is no data). We do that by pivoting our wide rows into fixed column tables using the CASE statement.

Here is the HiveQL for the CSV file:

SELECT CAST(SUBSTR(r.column_name, 10, 13) AS BIGINT) AS epoch,
SPLIT(r.row_key, '_')[0] AS account_id,
SPLIT(r.row_key, '_')[1] AS id,
SUM(CAST(CASE WHEN INSTR(r.column_name, 'pageviews-total') > 0
THEN r.value ELSE '0' END AS INT)) AS pageviews,
SUM(CAST(CASE WHEN INSTR(r.column_name, 'digg-digg') > 0
THEN r.value ELSE '0' END AS INT)) AS digg,
SUM(CAST(CASE WHEN INSTR(r.column_name, 'digg-referrer') > 0
THEN r.value ELSE '0' END AS INT)) AS digg_ref,
SUM(CAST(CASE WHEN INSTR(r.column_name, 'delicious-total') > 0
THEN r.value ELSE '0' END AS INT)) AS delicious,
SUM(CAST(CASE WHEN INSTR(r.column_name, 'delicious-referrer') > 0
THEN r.value ELSE '0' END AS INT)) AS delicious_ref,
SUM(CAST(CASE WHEN INSTR(r.column_name, 'googleplus-total') > 0
THEN r.value ELSE '0' END AS INT)) AS google_plus,
SUM(CAST(CASE WHEN INSTR(r.column_name, 'googleplus-referrer') > 0
THEN r.value ELSE '0' END AS INT)) AS google_plus_ref,
SUM(CAST(CASE WHEN INSTR(r.column_name, 'facebook-total') > 0
THEN r.value ELSE '0' END AS INT)) AS fb_total,
SUM(CAST(CASE WHEN INSTR(r.column_name, 'facebook-referrer') > 0
THEN r.value ELSE '0' END AS INT)) AS fb_ref,
SUM(CAST(CASE WHEN INSTR(r.column_name, 'twitter-tweet') > 0
THEN r.value ELSE '0' END AS INT)) AS tweets,
SUM(CAST(CASE WHEN INSTR(r.column_name, 'twitter-referrer') > 0
THEN r.value ELSE '0' END AS INT)) AS twitter_ref,
SUM(CAST(CASE WHEN INSTR(r.column_name, 'linkedin-share') > 0
THEN r.value ELSE '0' END AS INT)) AS linkedin,
SUM(CAST(CASE WHEN INSTR(r.column_name, 'linkedin-referrer') > 0
THEN r.value ELSE '0' END AS INT)) AS linkedin_ref,
SUM(CAST(CASE WHEN INSTR(r.column_name, 'stumbleupon-total') > 0
THEN r.value ELSE '0' END AS INT)) AS stumble_total,
SUM(CAST(CASE WHEN INSTR(r.column_name, 'stumbleupon-referrer') > 0
THEN r.value ELSE '0' END AS INT)) AS stumble_ref,
SUM(CAST(CASE WHEN INSTR(r.column_name, 'social-actions') > 0
THEN r.value ELSE '0' END AS INT)) AS social_actions,
SUM(CAST(CASE WHEN INSTR(r.column_name, 'referrer-social') > 0
THEN r.value ELSE '0' END AS INT)) AS social_ref,
MAX(CAST(CASE WHEN INSTR(r.column_name, 'score-realtime') > 0
THEN r.value ELSE '0.0' END AS DOUBLE)) AS score_rt
FROM content_social_delta r
LEFT SEMI JOIN (SELECT row_key
FROM content
WHERE HEX(column_name) = '00046D65746100000C7075626C69736865642D617400'
AND CAST(value AS BIGINT) >= #{start_date}
AND CAST(value AS BIGINT) <= #{end_date}
) c ON c.row_key = SPLIT(r.row_key, '_')[1]
WHERE INSTR(r.column_name, 'hour') > 0
AND CAST(SUBSTR(r.column_name, 10, 13) AS BIGINT) >= #{start_date}
AND CAST(SUBSTR(r.column_name, 10, 13) AS BIGINT) <= #{end_date}
GROUP BY CAST(SUBSTR(r.column_name, 10, 13) AS BIGINT),
SPLIT(r.row_key, '_')[0],
SPLIT(r.row_key, '_')[1]

The output of the query is a comma-separated value (CSV) file, an example of which is below (wrapped for length with a blank line between each record for clarity):

epoch,account_id,id,pageviews,digg,digg_ref,delicious,delicious_ref,
google_plus,google_plus_ref,fb_total,fb_ref,tweets,twitter_ref,
linkedin,linkedin_ref,stumble_total,stumble_ref,social_actions,social_ref,score_rt

1337212800000,4eb331eea782f32acc000002,eaff81bd10a527f589f45c186662230e,
39,0,0,0,0,0,0,0,2,0,20,0,0,0,0,0,22,0

1337212800000,4f63ae61a782f327ce000007,940fd3e9d794b80012d3c7913b837dff,
101,0,0,0,0,0,0,44,63,11,16,0,0,0,0,55,79,69.64308064

1337212800000,4f6baedda782f325f4000010,e70f7d432ad252be439bc9cf1925ad7c,
260,0,0,0,0,0,0,8,25,15,34,0,0,0,0,23,59,57.23718477

1337216400000,4eb331eea782f32acc000002,eaff81bd10a527f589f45c186662230e,
280,0,0,0,0,0,0,37,162,23,15,0,0,0,2,56,179,72.45877173

1337216400000,4ebd76f7a782f30c9b000014,fb8935034e7d365e88dd5be1ed44b6dd,
11,0,0,0,0,0,0,0,1,1,4,0,0,0,0,0,5,29.74849901

Experiences and Needs from the Customer Trenches

A Karmasphere Perspective

By Nanda Vijaydev

Introduction

For over 18 months, Karmasphere has been engaged with a fast-growing number of companies who adopted Hadoop and immediately gravitated towards Hive as the optimal way for teams of analysts and business users to use existing SQL skills with the Hadoop environment. The first part of this chapter provides use case techniques that we’ve seen used repeatedly in customer environments to advance Hive-based analytics.

The use case examples we cover are:

  • Optimal data formatting for Hive

  • Partitions and performance

  • Text analytics with Hive functions including Regex, Explode and Ngram

As companies we’ve worked with plan for and move into production use of Hive, they look for incremental capabilities that make Hive-based access to Hadoop even easier to use, more productive, more powerful, and available to more people in their organization. When they wire Hadoop and Hive into their existing data architectures, they also want to enable results from Hive queries to be systematized, shared and integrated with other data stores, spreadsheets, BI tools, and reporting systems.

In particular, companies have asked for:

  • Easier ways to ingest data, detect raw formats, and create metadata

  • Work collaboratively in an integrated, multi-user environment

  • Explore and analyze data iteratively

  • Preserved and reusable paths to insight

  • Finer-grain control over data, table, and column security, and compartmentalized access to different lines of business

  • Business user access to analytics without requiring SQL skills

  • Scheduling of queries for automated result generation and export to non-Hadoop data stores

  • Integration with Microsoft Excel, Tableau, Spotfire, and other spreadsheet, reporting systems, dashboards, and BI tools

  • Ability to manage Hive-based assets including queries, results, visualizations, and standard Hive components such as UDFs and SerDes

Use Case Examples from the Customer Trenches

Customer trenches #1: Optimal data formatting for Hive

One recurring question from many Hive users revolves around the format of their data and how to make that available in Hive.

Many data formats are supported out-of-the-box in Hive but some custom proprietary formats are not. And some formats that are supported raise questions for Hive users about how to extract individual components from within a row of data. Sometimes, writing a standard Hive SerDe that supports a custom data format is the optimal approach. In other cases, using existing Hive delimiters and exploiting Hive UDFs is the most convenient solution. One representative case we worked on was with a company using Hadoop and Hive to provide personalization services from the analysis of multiple input data streams. They were receiving logfiles from one of their data providers in a format that could not easily be split into columns. They were trying to figure out a way to parse the data and run queries without writing a custom SerDe.

The data had top header level information and multiple detailed information. The detailed section was a JSON nested within the top level object, similar to the data set below:

{ "top" : [
{"table":"user",
  "data":{
    "name":"John Doe","userid":"2036586","age":"74","code":"297994","status":1}},
{"table":"user",
  "data":{
    "name":"Mary Ann","userid":"14294734","age":"64","code":"142798","status":1}},
{"table":"user",
  "data":{
    "name":"Carl Smith","userid":"13998600","age":"36","code":"32866","status":1}},
{"table":"user",
  "data":{
    "name":"Anil Kumar":"2614012","age":"69","code":"208672","status":1}},
{"table":"user",
  "data":{
    "name":"Kim Lee","userid":"10471190","age":"53","code":"79365","status":1}}
]}

After talking with the customer, we realized they were interested in splitting individual columns of the detailed information that was tagged with “data” identifier in the above sample.

To help them proceed, we used existing Hive function get_json_object as shown below:

First step is to create a table using the sample data:

CREATE TABLE user (line string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '
'
STORED AS TEXTFILE
LOCATION  hdfs://hostname/user/uname/tablefolder/

Then using Hive functions such as get_json_object, we could get to the nested JSON element and parse it using UDFs:

SELECT get_json_object(col0, '$.name') as name,  get_json_object(col0, '$.userid') as uid,
get_json_object(col0, '$.age') as age, get_json_object(col0, '$.code') as code,  
    get_json_object(col0, '$.status') as status
FROM
  (SELECT get_json_object(user.line, '$.data') as col0
   FROM user
   WHERE get_json_object(user.line, '$.data') is not null) temp;

Query details include:

  • Extract the nested JSON object identified by data in the inner query as col0.

  • Then the JSON object is split into appropriate columns using their names in the name value pair.

The results of the query are given below, with header information saved, as a CSV file:

"name","uid","age","code","status"
"John Doe","2036586","74","297994","1"
"Mary Ann","14294734","64","142798","1"
"Carl Smith","13998600","36","32866","1"
"Kim Lee","10471190","53","79365","1"

Customer trenches #2: Partitions and performance

Using partitions with data being streamed or regularly added to Hadoop is a use case we see repeatedly, and a powerful and valuable way of harnessing Hadoop and Hive to analyze various kinds of rapidly additive data sets. Web, application, product, and sensor logs are just some of the types of data that Hive users often want to perform ad hoc, repeated, and scheduled queries on.

Hive partitions, when set up correctly, allow users to query data only in specific partitions and hence improves performance significantly. To set up partitions for a table, files should be located in directories as given in this example:

hdfs://user/uname/folder/"yr"=2012/"mon"=01/"day"=01/file1, file2, file3
                        /"yr"=2012/"mon"=01/"day"=02/file4, file5
                        …......
                        /"yr"=2012/"mon"=05/"day"=30/file100, file101

With the above structure, tables can be set up with partition by year, month, and day. Queries can use yr, mon, and day as columns and restrict the data accessed to specific values during query time. If you notice the folder names, partitioned folders have identifiers such as yr= , mon=, and day=.

Working with one high tech company, we discovered that their folders did not have this explicit partition naming and they couldn’t change their existing directory structure. But they still wanted to benefit from having partitions. Their sample directory structure is given below:

hdfs://user/uname/folder/2012/01/01/file1, file2, file3
                        /2012/01/02/file4, file5
                        …….
                        /2012/05/30/file100, file101

In this case, we can still add partitions by explicitly adding the location of the absolute path to the table using ALTER TABLE statements. A simple external script can read the directory and add the literal yr=, mon=, day= to an ALTER TABLE statement and provide the value of the folder (yr=2012, mon=01,...) to ALTER TABLE statements. The output of the script is a set of Hive SQL statements generated using the existing directory structure and captured into a simple text file.

ALTER TABLE tablename
ADD PARTITION (yr=2012, mon=01, day=01) location '/user/uname/folder/2012/01/01/';

ALTER TABLE tablename
ADD PARTITION (yr=2012, mon=01, day=02) location '/user/uname/folder/2012/01/02/';

...
ALTER TABLE tablename
ADD PARTITION (yr=2012, mon=05, day=30) location '/user/uname/folder/2012/05/30/';

When these statements are executed in Hive, the data in the specified directories automatically become available under defined logical partitions created using ALTER TABLE statements.

Note

You should make sure that your table is created with PARTITIONED BY columns for year, month, and day.

Customer trenches #3: Text analytics with Regex, Lateral View Explode, Ngram, and other UDFs

Many companies we work with have text analytics use cases which vary from simple to complex. Understanding and using Hive regex functions, n-gram functions and other string functions can address a number of those use cases.

One large manufacturing customer we worked with had lot of machine-generated compressed text data being ingested into Hadoop. The format of this data was:

  1. Multiple rows of data in each file and a number of such files in time-partitioned buckets.

  2. Within each row there were a number of segments separated by /r/n (carriage return and line feed).

  3. Each segment was in the form of a “name: value” pair.

The use case requirement was to:

  1. Read each row and separate individual segments as name-value pairs.

  2. Zero in on specific segments and look for word counts and word patterns for analyzing keywords and specific messages.

The sample below illustrates this customer’s data (text elided for space):

name:Mercury
description:Mercury is the god of commerce, ...
type:Rocky planet
name:Venus
description:Venus is the goddess of love...
type:Rocky planet
name:Earch
description:Earth is the only planet ...
type:Rocky planet
name:Mars
description: Mars is the god of War...
type:Rocky planet
name:Jupiter
description:Jupiter is the King of the Gods...
type:Gas planet
name:Saturn
description:Saturn is the god of agriculture...
type:Gas planet
name:Uranus
description:Uranus is the God of the Heavens...
type:Gas planet
name:Neptune
description:Neptune was the god of the Sea...
type:Gas planet

The data contains:

  1. Planet names and their description with type.

  2. Each row of data is separated by a delimiter.

  3. Within each row there are three subsections, including “name,” “description,” and “type” separated by /r/n.

  4. Description is a large text.

First step is to create the initial table with this sample data:

CREATE TABLE planets (col0 string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '
'
STORED AS TEXTFILE
LOCATION 'hdfs://hostname/user/uname/planets/'

In the following, we run a series of queries, starting with a simple query and adding functions as we iterate. Note that the requirement can be met with queries written in several different ways. The purpose of the queries shown below is to demonstrate some of the key capabilities in Hive around text parsing.

First, we use a split function to separate each section of data into an array of individual elements:

SELECT split(col0, '(\\r\\n)') AS splits FROM planets;

Next, we explode the splits (array) into individual lines using the LATERAL VIEW EXPLODE function. Results of this query will have name-value pairs separated into individual rows. We select only those rows that start with description. The function LTRIM is also used to remove left spaces.

SELECT ltrim(splits) AS pairs FROM planets
LATERAL VIEW EXPLODE(split(col0, '(\\r\\n)')) col0 AS splits
WHERE ltrim(splits) LIKE 'desc%'

Now we separate the description line into name-value pair and select only the value data. This can be done in different ways. We use split by : and choose the value parameter:

SELECT (split(pairs, ':'))[1] AS txtval FROM (
SELECT ltrim(splits) AS pairs FROM planets
LATERAL VIEW EXPLODE(split(col0, '(\\r\\n)')) col0 AS splits
WHERE ltrim(splits) LIKE 'desc%')tmp1;

Notice the use of temporary identifiers tmp1 for the inner query. This is required when you use the output of a subquery as the input to outer query. At the end of step three, we have the value of the description segment within each row.

In the next step, we use ngrams to show the top 10 bigrams (2-gram) words from the description of planets. You could also use functions such as context_ngram, find_in_set, regex_replace, and others to perform various text-based analyses:

SELECT ngrams(sentences(lower(txtval)), 2, 10) AS bigrams FROM (
SELECT (split(pairs, ':'))[1] AS txtval FROM (
       SELECT ltrim(splits) AS pairs FROM planets
       LATERAL VIEW EXPLODE(split(col0, '(\\r\\n)')) col0 AS splits
       WHERE ltrim(splits) LIKE 'desc%') tmp1) tmp2;

Notice that we have used functions such as lower to convert to lowercase and sentences to tokenize each word in the text.

For additional information about the text analytics capabilities of Hive, see the functions listed in Chapter 3.

Apache Hive in production: Incremental needs and capabilities

Hive adoption continues to grow, as outlined by the use cases defined above. Companies across different industry segments and various sizes have benefited immensely by leveraging Hive in their Hadoop environments. A strong and active community of contributors and significant investments in Hive R&D efforts by leading Hadoop vendors ensures that Hive, already the SQL-based standard for Hadoop, will become the SQL-based standard within organizations that are leveraging Hadoop for Big Data analysis.

As companies invest significant resources and time in understanding and building Hive resources, in many cases we find they look for additional capabilities that enable them to build on their initial use of Hive and extend its reach faster and more broadly within their organizations. From working with these customers looking to take Hive to the next level, a common set of requirements have emerged.

These requirements include:

Collaborative multiuser environments

Hadoop enables new classes of analysis that were prohibitive computationally and economically with traditional RDBMS technologies. Hadoop empowers organizations to break down the data and people silos, performing analysis on every byte of data they can get their hands on, doing this all in a way that enables them to share their queries, results, and insights with other individuals, teams, and systems in the organization. This model implies that users with deep understanding of these different data sets need to collaborate in discovery, in the sharing of insights, and the availability of all Hive-based analytic assets across the organization.

Productivity enhancements

The current implementation of Hive offers a serial batch environment on Hadoop to run queries. This implies that once a user submits a query for job execution to the Hadoop cluster, they have to wait for the query to complete execution before they can execute another query against the cluster. This can limit user productivity.

One major reason for companies adopting Hive is that it enables their SQL-skilled data professionals to move faster and more easily to working with Hadoop. These users are usually familiar with graphical SQL editors in tools and BI products. They are looking for similar productivity enhancements like syntax highlighting and code completion.

Managing Hive assets

A recent McKinsey report predicted significant shortage of skilled workers to enable organizations to profit from their data. Technologies like Hive promise to help bridge that skills shortage by allowing people with an SQL skillset to perform analysis on Hadoop. However, organizations are realizing that just having Hive available to their users is not enough. They need to be able to manage Hive assets like queries (history and versions), UDFs, SerDes for later share and reuse. Organizations would like to build this living knowledge repository of Hive assets that is easily searchable by users.

Extending Hive for advanced analytics

Many companies are looking to re-create analysis they perform in the traditional RDBMS world in Hadoop. While not all capabilities in the SQL environment easily translate into Hive functions, due to inherent limitations of how data is stored, there are some advanced analytics functions like RANKING, etc., that are Hadoop-able. In addition, organizations have spent tremendous resources and time in building analytical models using traditional tools like SAS and SPSS and would like the ability to score these models on Hadoop via Hive queries.

Extending Hive beyond the SQL skill set

As Hadoop is gaining momentum in organizations and becoming a key fabric of data processing and analytics within IT infrastructure, it is gaining popularity amongst users with different skill sets and capabilities. While Hive is easily adopted by users with SQL skill sets, other less SQL savvy users are also looking for drag-and-drop capabilities like those available in traditional BI tools to perform analysis on Hadoop using Hive. The ability to support interactive forms on top of Hive, where a user is prompted to provide column values via simple web-based forms is an often-asked for capability.

Data exploration capabilities

Traditional database technologies provide data exploration capabilities; for example, a user can view min, max values for an integer column. In addition, users can also view visualizations of these columns to understand the data distribution before they perform analysis on the data. As Hadoop stores hundreds of terabytes of data, and often petabytes, similar capabilities are being requested by customers for specific use cases.

Schedule and operationalize Hive queries

As companies find insights using Hive on Hadoop, they are also looking to operationalize these insights and schedule them to run on a regular interval. While open source alternatives are currently available, these sometimes fall short when companies also want to manage the output of Hive queries; for example, moving result sets into a traditional RDBMS system or BI stack. To manage certain use cases, companies often have to manually string together various different open source tools or rely on poor performing JDBC connectors.

About Karmasphere

Karmasphere is a software company, based in Silicon Valley California, focused exclusively on bringing native Hadoop Big Data Analytics capabilities to teams of analysts and business users. Their flagship product, Karmasphere 2.0, is based on Apache Hive, extending it in a multi-user graphical workspace to enable:

  • Reuse of standard Hive-based tables, SerDes and UDFs

  • Social, project-based big data analytics for teams of analysts and business users

  • Easy data ingestion to the cluster

  • Heuristic-based recognition and table creation of many popular data formats

  • Visual and iterative data exploration and analysis

  • Graphical exploration of all Hive-based analytic assets

  • Sharing and scheduling of queries, results and visualizations

  • Easy integration with traditional spreadsheets, reporting, dashboard, and BI tools

Figure 23-4 shows a screenshot of Karmasphere 2.0’s Hive-based Big Data Analytics Environment.

Screenshot of Karmasphere 2.0

Figure 23-4. Screenshot of Karmasphere 2.0

Hive features survey

We’d like to get feedback on the importance of these needs and share them back with the growing Hive community. If you are interested in seeing what others think and would like to participate, please visit:

http://karmasphere.com/hive-features-survey.html



[27] For more details on the use of generalized additive models (GAM), see Hastie et al. 2001. The R package used to implement the GAM for the purpose of the analysis presented here is the mgcv package available at http://cran.r-project.org/.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset