Chapter 2. Tools and Architecture for Big Data

In this chapter, Evan Chan performs a storage and query cost-analysis on various analytics applications, and describes how Apache Cassandra stacks up in terms of ad hoc, batch, and time-series analysis. Next, Federico Castanedo discusses how using distributed frameworks to scale R can help solve the problem of storing large and ever-growing data sets in RAM. Daniel Whitenack then explains how a new programming language from Google—Go—could help data science teams overcome common obstacles such as integrating data science in an engineering organization. Whitenack also details the many tools, packages, and resources that allow users to perform data cleansing, visualization, and even machine learning in Go. Finally, Nicolas Seyvet and Ignacio Mulas Viela describe how the telecom industry is navigating the current data analytics environment. In their use case, they apply both Kappa architecture and a Bayesian anomaly detection model to a high-volume data stream originating from a cloud monitoring system.

Apache Cassandra for Analytics: A Performance and Storage Analysis

You can read this post on oreilly.com here.

This post is about using Apache Cassandra for analytics. Think time series, IoT, data warehousing, writing, and querying large swaths of data—not so much transactions or shopping carts. Users thinking of Cassandra as an event store and source/sink for machine learning/modeling/classification would also benefit greatly from this post.

Two key questions when considering analytics systems are:

  1. How much storage do I need (to buy)?
  2. How fast can my questions get answered?

I conducted a performance study, comparing different storage layouts, caching, indexing, filtering, and other options in Cassandra (including FiloDB), plus Apache Parquet, the modern gold standard for analytics storage. All comparisons were done using Spark SQL. More importantly than determining data modeling versus storage format versus row cache or DeflateCompressor, I hope this post gives you a useful framework for predicting storage cost and query speeds for your own applications.

I was initially going to title this post “Cassandra Versus Hadoop,” but honestly, this post is not about Hadoop or Parquet at all. Let me get this out of the way, however, because many people, in their evaluations of different technologies, are going to think about one technology stack versus another. Which is better for which use cases? Is it possible to lower total cost of ownership (TCO) by having just one stack for everything? Answering the storage and query cost questions are part of this analysis.

To be transparent, I am the author of FiloDB. While I do have much more vested on one side of this debate, I will focus on the analysis and let you draw your own conclusions. However, I hope you will realize that Cassandra is not just a key-value store; it can be—and is being—used for big data analytics, and it can be very competitive in both query speeds and storage costs.

Wide Spectrum of Storage Costs and Query Speeds

Figure 2-1 summarizes different Cassandra storage options, plus Parquet. Farther to the right denotes higher storage densities, and higher up the chart denotes faster query speeds. In general, you want to see something in the upper-right corner.

Figure 2-1. Storage costs versus query speed in Cassandra and Parquet. Credit: Evan Chan.

Here is a brief introduction to the different players used in the analysis:

  • Regular Cassandra version 2.x CQL tables, in both narrow (one record per partition) and wide (both partition and clustering keys, many records per partition) configurations
  • COMPACT STORAGE tables, the way all of us Cassandra old timers did it before CQL (0.6, baby!)
  • Caching Cassandra tables in Spark SQL
  • FiloDB, an analytical database built on C* and Spark
  • Parquet, the reference gold standard

What you see in Figure 2-1 is a wide spectrum of storage efficiency and query speed, from CQL tables at the bottom to FiloDB, which is up to 5x faster in scan speeds than Parquet and almost as efficient storage-wise. Keep in mind that the chart has a log scale on both axes. Also, while this article will go into the tradeoffs and details about different options in depth, we will not be covering the many other factors people choose CQL tables for, such as support for modeling maps, sets, lists, custom types, and many other things.

Summary of Methodology for Analysis

Query speed was computed by averaging the response times for three different queries:

df.select(count(“numarticles”)).show
SELECT Actor1Name, AVG(AvgTone) as tone FROM gdelt GROUP BY 
Actor1Name ORDER BY tone DESC
SELECT AVG(avgtone), MIN(avgtone), MAX(avgtone) FROM gdelt WHERE 
monthyear=198012

The first query is an all-table-scan simple count. The second query measures a grouping aggregation. And the third query is designed to test filtering performance with a record count of 43.4K items, or roughly 1% of the original data set. The data set used for each query is the GDELT public data set: 1979–1984, 57 columns x 4.16 million rows, recording geopolitical events worldwide. The source code for ingesting the Cassandra tables and instructions for reproducing the queries are available in my cassandra-gdelt repo.

The storage cost for Cassandra tables is computed by running compaction first, then taking the size of all stable files in the data folder of the tables.

To make the Cassandra CQL tables more performant, shorter column names were used (for example, a2code instead of Actor2Code).

All tests were run on my MacBook Pro 15-inch, mid-2015, SSD/16 GB. Specifics are as follows:

  • Cassandra 2.1.6, installed using CCM
  • Spark 1.4.0 except where noted, run with master = ‘local[1]’ and spark.sql.shuffle.partitions=4
  • Spark-Cassandra-Connector 1.4.0-M3

Running all the tests essentially single threaded was done partly out of simplicity and partly to form a basis for modeling performance behavior (see “A Formula for Modeling Query Performance”).

Scan Speeds Are Dominated by Storage Format

OK, let’s dive into details! The key to analytics query performance is the scan speed, or how many records you can scan per unit time. This is true for whole table scans, and it is true when you filter data, as we’ll see later. Figure 2-2 shows the data for all query times, which are whole table scans, with relative speed factors for easier digestion.

query times with relative speed factors
Figure 2-2. All query times with relative speed factors. All query times run on Spark 1.4/1.5 with local[1]; C* 2.1.6 with 512 MB row cache. Credit: Evan Chan.
Note

To get more accurate scan speeds, one needs to subtract the baseline latency in Spark, but this is left out for simplicity. This actually slightly disfavors the fastest contestants.

Cassandra’s COMPACT STORAGE gains an order-of-magnitude improvement in scan speeds simply due to more efficient storage. FiloDB and Parquet gain another order of magnitude due to a columnar layout, which allows reading only the columns needed for analysis, plus more efficient columnar blob compression. Thus, storage format makes the biggest difference in scan speeds. More details follow, but for regular CQL tables, the scan speed should be inversely proportional to the number of columns in each record, assuming simple data types (not collections).

Part of the speed advantage of FiloDB over Parquet has to do with the InMemory option. You could argue this is not fair; however, when you read Parquet files repeatedly, most of that file is most likely in the OS cache anyway. Yes, having in-memory data is a bigger advantage for networked reads from Cassandra, but I think part of the speed increase is because FiloDB’s columnar format is optimized more for CPU efficiency, rather than compact size. Also, when you cache Parquet files, you are caching an entire file or blocks thereof, compressed and encoded; FiloDB relies on small chunks, which can be much more efficiently cached (on a per-column basis, and allows for updates). Folks at Databricks have repeatedly told me that caching Parquet files in-memory did not result in significant speed gains, and this makes sense due to the format and compression.

Wide-row CQL tables are actually less efficient than narrow-row due to additional overhead of clustering column-name prefixing. Spark’s cacheTable should be nearly as efficient as the other fast solutions but suffers from partitioning issues.

Storage Efficiency Generally Correlates with Scan Speed

In Figure 2-2, you can see that these technologies list in the same order for storage efficiency as for scan speeds, and that’s not an accident. Storing tables as COMPACT STORAGE and FiloDB yields a roughly 7–8.5x improvement in storage efficiency over regular CQL tables for this data set. Less I/O = faster scans!

Cassandra CQL wide-row tables are less efficient, and you’ll see why in a minute. Moving from LZ4 to Deflate compression reduces storage footprint by 38% for FiloDB and 50% for the wide-row CQL tables, so it’s definitely worth considering. DeflateCompressor actually sped up wide-row CQL scans by 15%, but slowed down the single partition query slightly.

Why Cassandra CQL tables are inefficient

Let’s say a Cassandra CQL table has a primary key that looks like (pk, ck1, ck2, ck3) and other columns designated c1, c2, c3, c4 for creativity. This is what the physical layout looks like for one partition (“physical row”):

Column header ck1:ck2:ck3a:c1 ck1:ck2:ck3a:c2 ck1:ck2:ck3a:c3 ck1:ck2:ck3a:c4
pk : value v1 v2 v3 v4

Cassandra offers ultimate flexibility in terms of updating any part of a record, as well as inserting into collections, but the price paid is that each column of every record is stored in its own cell, with a very lengthy column header consisting of the entire clustering key, plus the name of each column. If you have 100 columns in your table (very common for data warehouse fact tables), then the clustering key ck1:ck2:ck3 is repeated 100 times. It is true that compression helps a lot with this, but not enough. Cassandra 3.x has a new, trimmer storage engine that does away with many of these inefficiencies, at a reported space savings of up to 4x.

COMPACT STORAGE is the way that most of us who used Cassandra prior to CQL stored our data: as one blob per record. It is extremely efficient. That model looks like this:

Column header ck1:ck2:ck3 ck1:ck2:ck3a
pk value1_blob value2_blob

You lose features such as secondary indexing, but you can still model your data for efficient lookups by partition key and range scans of clustering keys.

FiloDB, on the other hand, stores data by grouping columns together, and then by clumping data from many rows into its own efficient blob format. The layout looks like this:

  Column 1 Column 2
pk Chunk 1 Chunk 2 Chunk 1 Chunk 2

Columnar formats minimize I/O for analytical queries, which select a small subset of the original data. They also tend to remain compact, even in-memory. FiloDB’s internal format is designed for fast random access without the need to deserialize. On the other hand, Parquet is designed for very fast linear scans, but most encoding types require the entire page of data to be deserialized—thus, filtering will incur higher I/O costs.

A Formula for Modeling Query Performance

We can model the query time for a single query using a simple formula:

Predicted queryTime = Expected number of records / (# cores * scan speed)

Basically, the query time is proportional to how much data you are querying, and inversely proportional to your resources and raw scan speed. Note that the scan speed previously mentioned is single-core scan speed, such as was measured using my benchmarking methodology. Keep this model in mind when thinking about storage formats, data modeling, filtering, and other effects.

Can Caching Help? A Little Bit.

If storage size leads partially to slow scan speeds, what about taking advantage of caching options to reduce I/O? Great idea. Let’s review the different options.

  • Cassandra row cache: I tried row cache of 512 MB for the narrow CQL table use case—512 MB was picked as it was a quarter of the size of the data set on disk. Most of the time, your data won’t fit in cache. This increased scan speed for the narrow CQL table by 29%. If you tend to access data at the beginning of your partitions, row cache could be a huge win. What I like best about this option is that it’s really easy to use and ridiculously simple, and it works with your changing data.
  • DSE has an in-memory tables feature. Think of it, basically, as keeping your SSTables in-memory instead of on disk. It seems to me to be slower than row cache (since you still have to decompress the tables), and I’ve been told it’s not useful for most people.
  • Finally, in Spark SQL you can cache your tables (CACHE TABLE in spark-sql, sqlContext.cacheTable in spark-shell) in an on-heap, in-memory columnar format. It is really fast (44x speedup over base case above), but suffers from multiple problems: the entire table has to be cached, it cannot be updated, and it is not high availability (if any executor or the app dies, ka-boom!). Furthermore, you have to decide what to cache, and the initial read from Cassandra is still really slow.

None of these options is anywhere close to the wins that better storage format and effective data modeling will give you. As my analysis shows, FiloDB, without caching, is faster than all Cassandra caching options. Of course, if you are loading data from different data centers or constantly doing network shuffles, then caching can be a big boost, but most Spark on Cassandra setups are collocated.

The Future: Optimizing for CPU, Not I/O

For Spark queries over regular Cassandra tables, I/O dominates CPU due to the storage format. This is why the storage format makes such a big difference, and also why technologies like SSD have dramatically boosted Cassandra performance. Due to the dominance of I/O costs over CPU, it may be worth it to compress data more. For formats like Parquet and FiloDB, which are already optimized for fast scans and minimized I/O, it is the opposite—the CPU cost of querying data actually dominates over I/O. That’s why the Spark folks are working on code-gen and Project Tungsten.

If you look at the latest trends, memory is getting cheaper; NVRAM, 3DRAM, and very cheap, persistent DRAM technologies promise to make I/O bandwidth no longer an issue. This trend obliterates decades of database design based on the assumption that I/O is much, much slower than CPU, and instead favors CPU-efficient storage formats. With the increase in IOPs, optimizing for linear reads is no longer quite as important.

Filtering and Data Modeling

Remember our formula for predicting query performance:

Predicted queryTime = Expected number of records / (# cores * scan speed)

Correct data modeling in Cassandra deals with the first part of that equation—enabling fast lookups by reducing the number of records that need to be looked up. Denormalization, writing summaries instead of raw data, and being smart about data modeling all help reduce the number of records. Partition- and clustering-key filtering are definitely the most effective filtering mechanisms in Cassandra. Keep in mind, though, that scan speeds are still really important, even for filtered data—unless you are really only doing single-key lookups.

Look back at Figure 2-2. What do you see? Using partition-key filtering on wide-row CQL tables proved very effective—100x faster than scanning the whole wide-row table on 1% of the data (a direct plugin in the formula of reducing the number of records to 1% of original). However, since wide rows are a bit inefficient compared to narrow tables, some speed is lost. You can also see in Figure 2-2 that scan speeds still matter. FiloDB’s in-memory execution of that same filtered query was still 100x faster than the Cassandra CQL table version—taking only 30 milliseconds as opposed to nearly three seconds. Will this matter? For serving concurrent, web-speed queries, it will certainly matter.

Note that I only modeled a very simple equals predicate, but in reality, many people need much more flexible predicate patterns. Due to the restrictive predicates available for partition keys (= only for all columns except last one, which can be IN), modeling with regular CQL tables will probably require multiple tables, one each to match different predicate patterns (this is being addressed in C* version 2.2 a bit, maybe more in version 3.x). This needs to be accounted for in the storage cost and TOC analysis. One way around this is to store custom index tables, which allows application-side custom scan patterns. FiloDB uses this technique to provide arbitrary filtering of partition keys.

Some notes on the filtering and data modeling aspect of my analysis:

  • The narrow rows layout in CQL is one record per partition key, thus partition-key filtering does not apply. See discussion of secondary indices in the following section.
  • Cached tables in Spark SQL, as of Spark version 1.5, only does whole table scans. There might be some improvements coming, though—see SPARK-4849 in Spark version 1.6.
  • FiloDB has roughly the same filtering capabilities as Cassandra—by partition key and clustering key—but improvements to the partition-key filtering capabilities of C are planned.
  • It is possible to partition your Parquet files and selectively read them, and it is supposedly possible to sort your files to take advantage of intra-file filtering. That takes extra effort, and since I haven’t heard of anyone doing the intra-file sort, I deemed it outside the scope of this study. Even if you were to do this, the filtering would not be anywhere near as granular as is possible with Cassandra and FiloDB—of course, your comments and enlightenment are welcome here.

Cassandra’s Secondary Indices Usually Not Worth It

How do secondary indices in Cassandra perform? Let’s test that with two count queries with a WHERE clause on Actor1CountryCode, a low cardinality field with a hugely varying number of records in our portion of the GDELT data set:

  • WHERE Actor1CountryCode = ‘USA’: 378k records (9.1% of records)
  • WHERE Actor1CountryCode = ‘ALB’: 5,005 records (0.1% of records)
  Large country Small country 2i scan rate
Narrow CQL table 28s / 6.6x 0.7s / 264x 13.5k records/sec
CQL wide rows 143s / 1.9x 2.7s / 103x 2,643 records/sec

If secondary indices were perfectly efficient, one would expect query times to reduce linearly with the drop in the number of records. Alas, this is not so. For the CountryCode = USA query, one would expect a speedup of around 11x, but secondary indices proved very inefficient, especially in the wide-rows case. Why is that? Because for wide rows, Cassandra has to do a lot of point lookups on the same partition, which is very inefficient and results in only a small drop in the I/O required (in fact, much more random I/O), compared to a full table scan.

Secondary indices work well only when the number of records is reduced to such a small amount that the inefficiencies do not matter and Cassandra can skip most partitions. There are also other operational issues with secondary indices, and they are not recommended for use when the cardinality goes above 50,000 items or so.

Predicting Your Own Data’s Query Performance

How should you measure the performance of your own data and hardware? It’s really simple, actually:

  1. Measure your scan speed for your base Cassandra CQL table. Number of records/time to query, single-threaded.
  2. Use the formula given earlier—Predicted queryTime = Expected number of records/(# cores * scan speed).
  3. Use relative speed factors for predictions.

The relative factors in the preceding table are based on the GDELT data set with 57 columns. The more columns you have (data warehousing applications commonly have hundreds of columns), the greater you can expect the scan speed boost for FiloDB and Parquet. (Again, this is because, unlike for regular CQL/row-oriented layouts, columnar layouts are generally insensitive to the number of columns.) It is true that concurrency (within a single query) leads to its own inefficiencies, but in my experience, that is more like a 2x slowdown, and not the order-of-magnitude differences we are modeling here.

User concurrency can be modeled by dividing the number of available cores by the number of users. You can easily see that in FAIR scheduling mode, Spark will actually schedule multiple queries at the same time (but be sure to modify fair-scheduler.xml appropriately). Thus, the formula becomes:

Predicted queryTime = Expected number of records * # users / (# cores * scan speed)

There is an important case where the formula needs to be modified, and that is for single-partition queries (for example, where you have a WHERE clause with an exact match for all partition keys, and Spark pushes down the predicate to Cassandra). The formula assumes that the queries are spread over the number of nodes you have, but this is not true for single-partition queries. In that case, there are two possibilities:

  1. The number of users is less than the number of available cores. Then, the query time = number_of_records/scan_speed.
  2. The number of users is >= the number of available cores. In that case, the work is divided amongst each core, so the original query time formula works again.

Conclusions

Apache Cassandra is one of the most widely used, proven, and robust distributed databases in the modern big data era. The good news is that there are multiple options for using it in an efficient manner for ad hoc, batch, time-series analytics applications.

For (multiple) order-of-magnitude improvements in query and storage performance, consider the storage format carefully, and model your data to take advantage of partition and clustering key filtering/predicate pushdowns. Both effects can be combined for maximum advantage—using FiloDB plus filtering data improved a three-minute CQL table scan to response times less than 100 ms. Secondary indices are helpful only if they filter your data down to, say, 1% or less—and even then, consider them carefully. Row caching, compression, and other options offer smaller advantages up to about 2x.

If you need a lot of individual record updates or lookups by individual record but don’t mind creating your own blob format, the COMPACT STORAGE/single column approach could work really well. If you need fast analytical query speeds with updates, fine-grained filtering and a web-speed in-memory option, FiloDB could be a good bet. If the formula previously given shows that regular Cassandra tables, laid out with the best data-modeling techniques applied, are good enough for your use case, kudos to you!

Scalable Data Science with R

You can read this post on oreilly.com here.

R is among the top five data-science tools in use today, according to O’Reilly research; the latest KDnuggets survey puts it in first; and IEEE Spectrum ranks it as the fifth most popular programming language.

The latest Rexer Data Science Survey revealed that in the past eight years, there has been an three-fold increase in the number of respondents using R, and a seven-fold increase in the number of analysts/scientists who have said that R is their primary tool.

Despite its popularity, the main drawback of vanilla R is its inherently “single-threaded” nature and its need to fit all the data being processed in RAM. But nowadays, data sets are typically in the range of GBs, and they are growing quickly to TBs. In short, current growth in data volume and variety is demanding more efficient tools by data scientists.

Every data-science analysis starts with preparing, cleaning, and transforming the raw input data into some tabular data that can be further used in machine-learning models.

In the particular case of R, data size problems usually arise when the input data do not fit in the RAM of the machine and when data analysis takes a long time because parallelism does not happen automatically. Without making the data smaller (through sampling, for example), this problem can be solved in two different ways:

  1. Scaling-out vertically, by using a machine with more available RAM. For some data scientists leveraging cloud environments like AWS, this can be as easy as changing the instance type of the machine (for example, AWS recently provided an instance with 2 TB of RAM). However, most companies today are using their internal data infrastructure that relies on commodity hardware to analyze data—they’ll have more difficulty increasing their available RAM.
  2. Scaling-out horizontally: in this context, it is necessary to change the default R behavior of loading all required data in memory and access the data differently by using a distributed or parallel schema with a divide-and-conquer (or in R terms, split-apply-combine) approach like MapReduce.

While the first approach is obvious and can use the same code to deal with different data sizes, it can only scale to the memory limits of the machine being used. The second approach, by contrast, is more powerful, but it is also more difficult to set up and adapt to existing legacy code.

There is a third approach. Scaling-out horizontally can be solved by using R as an interface to the most popular distributed paradigms:

  • Hadoop: through using the set of libraries or packages known as RHadoop. These R packages allow users to analyze data with Hadoop through R code. They consist of rhdfs to interact with HDFS systems; rhbase to connect with HBase; plyrmr to perform common data transformation operations over large data sets; rmr2 that provides a map-reduce API; and ravro that writes and reads avro files.
  • Spark: with SparkR, it is possible to use Spark’s distributed computation engine to enable large-scale data analysis from the R shell. It provides a distributed data frame implementation that supports operations like selection, filtering, and aggregation. on large data sets.
  • Programming with Big Data in R: (pbdR) is based on MPI and can be used on high-performance computing (HPC) systems, providing a true parallel programming environment in R.

Novel distributed platforms also combine batch and stream processing, providing a SQL-like expression language—for instance, Apache Flink. There are also higher levels of abstraction that allow you to create a data processing language, such as the recently open-sourced project Apache Beam from Google. However, these novel projects are still under development, and so far do not include R support.

After the data preparation step, the next common data science phase consists of training machine-learning models, which can also be performed on a single machine or distributed among different machines. In the case of distributed machine-learning frameworks, the most popular approaches using R, are the following:

  • Spark MLlib: through SparkR, some of the machine-learning functionalities of Spark are exported in the R package. In particular, the following machine-learning models are supported from R: generalized linear model (GLM), survival regression, naive Bayes, and k-means.
  • H2O framework: a Java-based framework that allows building scalable machine-learning models in R or Python. It can run as standalone platform or with an existing Hadoop or Spark implementation. It provides a variety of supervised learning models, such as GLM, gradient boosting machine (GBM), deep learning, Distributed Random Forest, naive Bayes, and unsupervised learning implementations like PCA and k-means.

Sidestepping the coding and customization issues of these approaches, you can seek out a commercial solution that uses R to access data on the frontend but uses its own big-data-native processing under the hood:

  • Teradata Aster R is a massively parallel processing (MPP) analytic solution that facilitates the data preparation and modeling steps in a scalable way using R. It supports a variety of data sources (text, numerical, time series, graphs) and provides an R interface to Aster’s data science library that scales by using a distributed/parallel environment, avoiding the technical complexities to the user. Teradata also has a partnership with Revolution Analytics (now Microsoft R) where users can execute R code inside of Teradata’s platform.
  • HP Vertica is similar to Aster, but it provides On-Line Analytical Processing (OLAP) optimized for large fact tables, whereas Teradata provides On-Line Transaction Processing (OLTP) or OLAP that can handle big volumes of data. To scale out R applications, HP Vertica relies on the open source project Distributed R.
  • Oracle also includes an R interface in its advanced analytics solution, known as Oracle R Advanced Analytics for Hadoop (ORAAH), and it provides an interface to interact with HDFS and access to Spark MLlib algorithms.

Teradata has also released an open source package in CRAN called toaster that allows users to compute, analyze, and visualize data with (on top of) the Teradata Aster database. It allows computing data in Aster by taking advantage of Aster distributed and parallel engines, and then creates visualizations of the results directly in R. For example, it allows users to execute K-Means or run several cross-validation iterations of a linear regression model in parallel.

Also related is MADlib, an open source library for scalable in-database analytics currently in incubator at Apache. There are other open source CRAN packages to deal with big data, such as biglm, bigpca, biganalytics, bigmemory, or pbdR—but they are focused on specific issues rather than addressing the data science pipeline in general.

Big data analysis presents a lot of opportunities to extract hidden patterns when you are using the right algorithms and the underlying technology that will help to gather insights. Connecting new scales of data with familiar tools is a challenge, but tools like Aster R offer a way to combine the beauty and elegance of the R language within a distributed environment to allow processing data at scale.

This post was a collaboration between O’Reilly Media and Teradata. View our statement of editorial independence.

Data Science Gophers

You can read this post on oreilly.com here.

If you follow the data science community, you have very likely seen something like “language wars” unfold between Python and R users. They seem to be the only choices. But there might be a somewhat surprising third option: Go, the open source programming language created at Google.

In this post, we are going to explore how the unique features of Go, along with the mindset of Go programmers, could help data scientists overcome common struggles. We are also going to peek into the world of Go-based data science to see what tools are available, and how an ever-growing group of data science gophers are already solving real-world data science problems with Go.

Go, a Cure for Common Data Science Pains

Data scientists are already working in Python and R. These languages are undoubtedly producing value, and it’s not necessary to rehearse their virtues here, but looking at the community of data scientists as a whole, certain struggles seem to surface quite frequently. The following pains commonly emerge as obstacles for data science teams working to provide value to a business:

  1. Difficulties building “production-ready” applications or services: Unfortunately, the very process of interactively exploring data and developing code in notebooks, along with the dynamically typed, single-threaded languages commonly used in data science, cause data scientists to produce code that is almost impossible to productionize. There could be a huge amount of effort in transitioning a model off of a data scientist’s laptop into an application that could actually be deployed, handle errors, be tested, and log properly. This barrier of effort often causes data scientists’ models to stay on their laptops or, possibly worse, be deployed to production without proper monitoring, testing, etc. Jeff Magnussen at Stitchfix and Robert Chang at Twitter have each discussed these sorts of cases.
  2. Applications or services that don’t behave as expected: Dynamic typing and convenient parsing functionality can be wonderful, but these features of languages like Python or R can turn their back on you in a hurry. Without a great deal of forethought into testing and edge cases, you can end up in a situation where your data science application is behaving in a way you did not expect and cannot explain (e.g., because the behavior is caused by errors that were unexpected and unhandled). This is dangerous for data science applications whose main purpose is to provide actionable insights within an organization. As soon as a data science application breaks down without explanation, people won’t trust it and thus will cease making data-driven decisions based on insights from the application. The Cookiecutter Data Science project is one notable effort at a “logical, reasonably standardized but flexible project structure for doing and sharing data science work” in Python—but the static typing and nudges toward clarity of Go make these workflows more likely.
  3. An inability to integrate data science development into an engineering organization: Often, data engineers, DevOps engineers, and others view data science development as a mysterious process that produces inefficient, unscalable, and hard-to-support applications. Thus, data science can produce what Josh Wills at Slack calls an “infinite loop of sadness” within an engineering organization.

Now, if we look at Go as a potential language for data science, we can see that, for many use cases, it alleviates these struggles:

  1. Go has a proven track record in production, with widespread adoption by DevOps engineers, as evidenced by game-changing tools like Docker, Kubernetes, and Consul being developed in Go. Go is just plain simple to deploy (via static binaries), and it allows developers to produce readable, efficient applications that fit within a modern microservices architecture. In contrast, heavyweight Python data science applications may need readability-killing packages like Twisted to fit into modern event-driven systems and will likely rely on an ecosystem of tooling that takes significant effort to deploy. Go itself also provides amazing tooling for testing, formatting, vetting, and linting (gofmt, go vet, etc.) that can easily be integrated in your workflow (see here for a starter guide with Vim). Combined, these features can help data scientists and engineers spend most of their time building interesting applications and services, without a huge barrier to deployment.
  2. Next, regarding expected behavior (especially with unexpected input) and errors, Go certainly takes a different approach, compared to Python and R. Go code uses error values to indicate an abnormal state, and the language’s design and conventions encourage you to explicitly check for errors where they occur. Some might take this as a negative (as it can introduce some verbosity and a different way of thinking). But for those using Go for data science work, handling errors in an idiomatic Go manner produces rock-solid applications with predictable behavior. Because Go is statically typed and because the Go community encourages and teaches handling errors gracefully, data scientists exploiting these features can have confidence in the applications and services they deploy. They can be sure that integrity is maintained over time, and they can be sure that, when something does behave in an unexpected way, there will be errors, logs, or other information helping them understand the issue. In the world of Python or R, errors may hide themselves behind convenience. For example, Python pandas will return a maximum value or a merged dataframe to you, even when the underlying data experiences a profound change (e.g., 99% of values are suddenly null, or the type of a column used for indexing is unexpectedly inferred as float). The point is not that there is no way to deal with issues (as readers will surely know). The point is that there seem to be a million of these ways to shoot yourself in the foot when the language does not force you to deal with errors or edge cases.
  3. Finally, engineers and DevOps developers already love Go. This is evidenced by the growing number of small and even large companies developing the bulk of their technology stack in Go. Go allows them to build easily deployable and maintainable services (see points 1 and 2 in this list) that can also be highly concurrent and scalable (important in modern microservices environments). By working in Go, data scientists can be unified with their engineering organization and produce data-driven applications that fit right in with the rest of their company’s architecture.

Note a few things here. The point is not that Go is perfect for every scenario imaginable, so data scientists should use Go, or that Go is fast and scalable (which it is), so data scientists should use Go. The point is that Go can help data scientists produce deliverables that are actually useful in an organization and that they will be able to support. Moreover, data scientists really should love Go, as it alleviates their main struggles while still providing them the tooling to be productive, as we will see next (with the added benefits of efficiency, scalability, and low memory usage).

The Go Data Science Ecosystem

OK, you might buy into the fact that Go is adored by engineers for its clarity, ease of deployment, low memory use, and scalability, but can people actually do data science with Go? Are there things like pandas, numpy, etc. in Go? What if I want to train a model—can I do that with Go?

Yes, yes, and yes! In fact, there are already a great number of open source tools, packages, and resources for doing data science in Go, and communities and organization such as the high energy physics community and The Coral Project are actively using Go for data science. I will highlight some of this tooling shortly (and a more complete list can be found here). However, before I do that, let’s take a minute to think about what sort of tooling we actually need to be productive as data scientists.

Contrary to popular belief, and as evidenced by polls and experience (see here and here, for example), data scientists spend most of their time (around 90%) gathering data, organizing data, parsing values, and doing a lot of basic arithmetic and statistics. Sure, they get to train a machine-learning model on occasion, but there are a huge number of business problems that can be solved via some data gathering/organization/cleaning and aggregation/statistics. Thus, in order to be productive in Go, data scientists must be able to gather data, organize data, parse values, and do arithmetic and statistics.

Also, keep in mind that, as gophers, we want to produce clear code over being clever (a feature that also helps us as scientists or data scientists/engineers) and introduce a little copying rather than a little dependency. In some cases, writing a for loop may be preferable over importing a package just for one function. You might want to write your own function for a chi-squared measure of distance metric (or just copy that function into your code) rather than pulling in a whole package for one of those things. This philosophy can greatly improve readability and give your colleagues a clear picture of what you are doing.

Nevertheless, there are occasions where importing a well-understood and well-maintained package saves considerable effort without unnecessarily reducing clarity. The following provides something of a “state of the ecosystem” for common data science/analytics activities. See here for a more complete list of active/maintained Go data science tools, packages, libraries, etc.

Data Gathering, Organization, and Parsing

Thankfully, Go has already proven itself useful at data gathering and organization, as evidenced by the number and variety of databases and datastores written in Go, including InfluxDB, Cayley, LedisDB, Tile38, Minio, Rend, and CockroachDB. Go also has libraries or APIs for all of the commonly used datastores (Mongo, Postgres, etc.).

However, regarding parsing and cleaning data, you might be surprised to find out that Go also has a lot to offer here as well. To highlight just a few:

  • GJSON—quick parsing of JSON values
  • ffjson—fast JSON serialization
  • gota—data frames
  • csvutil—registering a CSV file as a table and running SQL statements on the CSV file
  • scrape—web scraping
  • go-freeling—NLP

Arithmetic and Statistics

This is an area where Go has greatly improved over the last couple of years. The Gonum organization provides numerical functionality that can power a great number of common data-science-related computations. There is even a proposal to add multidimensional slices to the language itself. In general, the Go community is producing some great projects related to arithmetic, data analysis, and statistics. Here are just a few:

Exploratory Analysis and Visualization

Go is a compiled language, so you can’t do exploratory data analysis, right? Wrong. In fact, you don’t have to abandon certain things you hold dear like Jupyter when working with Go. Check out these projects:

In addition to this, it is worth noting that Go fits in so well with web development that powering visualizations or web apps (e.g., utilizing D3) via custom APIs, etc. can be extremely successful.

Machine Learning

Even though the preceding tooling makes data scientists productive about 90% of the time, data scientists still need to be able to do some machine learning (and let’s face it, machine learning is awesome!). So when/if you need to scratch that itch, Go does not disappoint:

And, of course, you can integrate with any number of machine-learning frameworks and APIs (such as H2O or IBM Watson) to enable a whole host of machine-learning functionality. There is also a Go API for Tensorflow in the works.

Get Started with Go for Data Science

The Go community is extremely welcoming and helpful, so if you are curious about developing a data science application or service in Go, or if you just want to experiment with data science using Go, make sure you get plugged into community events and discussions. The easiest place to start is on Gophers Slack, the golang-nuts mailing list (focused generally on Go), or the gopherds mailing list (focused more specifically on data science). The #data-science channel is extremely active and welcoming, so be sure to introduce yourself, ask questions, and get involved. Many larger cities have Go meetups as well.

Thanks to Sebastien Binet for providing feedback on this post.

Applying the Kappa Architecture to the Telco Industry

You can read this post on oreilly.com here.

Ever-growing volumes of data, shorter time constraints, and an increasing need for accuracy are defining the new analytics environment. In the telecom industry, traditional user and network data co­exists with machine-­to-­machine (M2M) traffic, media data, social activities, and so on. In terms of volume, this can be referred to as an “explosion” of data. This is a great business opportunity for telco operators and a key angle to take full advantage of current infrastructure investments (4G, LTE).

In this blog post, we will describe an approach to quickly ingest and analyze large volumes of streaming data, the Kappa architecture, as well as how to build a Bayesian online-­learning model to detect novelties in a complex environment. Note that novelty does not necessarily imply an undesired situation; it indicates a change from previously known behaviors.

We apply both Kappa and the Bayesian model to a use case using a data stream originating from a telco cloud-monitoring system. The stream is composed of telemetry and log events. It is high­ volume, as many physical servers and virtual machines are monitored simultaneously.

The proposed method quickly detects anomalies with high accuracy while adapting (learning) over time to new system normals, making it a desirable tool for considerably reducing maintenance costs associated with the operability of large computing infrastructures.

What Is Kappa Architecture?

In a 2014 blog post, Jay Kreps accurately coined the term Kappa architecture by pointing out the pitfalls of the Lambda architecture and proposing a potential software evolution. To understand the differences between the two, let’s first observe what the Lambda architecture looks like, shown in Figure 2-3.

Lambda architecture
Figure 2-3. Lambda architecture. Credit: Ignacio Mulas Viela and Nicolas Seyvet.

As shown in Figure 2-3, the Lambda architecture is composed of three layers: a batch layer, real­-time (or streaming) layer, and serving layer. Both the batch and real­-time layers receive a copy of the event, in parallel. The serving layer then aggregates and merges computation results from both layers into a complete answer.

The batch layer (aka, historical layer) has two major tasks: managing historical data and re­computing results such as machine-learning models. Computations are based on iterating over the entire historical data set. Since the data set can be large, this produces accurate results at the cost of high latency due to high computation time.

The real-time layer (speed layer, streaming layer) provides low-latency results in near real-­time fashion. It performs updates using incremental algorithms, thus significantly reducing computation costs, often at the expense of accuracy.

The Kappa architecture simplifies the Lambda architecture by removing the batch layer and replacing it with a streaming layer. To understand how this is possible, one must first understand that a batch is a data set with a start and an end (bounded), while a stream has no start or end and is infinite (unbounded). Because a batch is a bounded stream, one can conclude that batch processing is a subset of stream processing. Hence, the Lambda batch layer results can also be obtained by using a streaming engine. This simplification reduces the architecture to a single streaming engine capable of ingesting the needed volumes of data to handle both batch and real-time processing. Overall system complexity significantly decreases with Kappa architecture. See Figure 2-4.

Kappa architecture
Figure 2-4. Kappa architecture. Credit: Ignacio Mulas Viela and Nicolas Seyvet.

Intrinsically, there are four main principles in the Kappa architecture:

  1. Everything is a stream: batch operations become a subset of streaming operations. Hence, everything can be treated as a stream.
  2. Immutable data sources: raw data (data source) is persisted and views are derived, but a state can always be recomputed, as the initial record is never changed.
  3. Single analytics framework: keep it short and simple (KISS) principle. A single analytics engine is required. Code, maintenance, and upgrades are considerably reduced.
  4. Replay functionality: computations and results can evolve by replaying the historical data from a stream.

In order to respect principle four, the data pipeline must guarantee that events stay in order from generation to ingestion. This is critical to guarantee consistency of results, as this guarantees deterministic computation results. Running the same data twice through a computation must produce the same result.

These four principles do, however, put constraints on building the analytics pipeline.

Building the Analytics Pipeline

Let’s start concretizing how we can build such a data pipeline and identify the sorts of components required.

The first component is a scalable, distributed messaging system with events ordering and at-least-once delivery guarantees. Kafka can connect the output of one process to the input of another via a publish­-subscribe mechanism. Using it, we can build something similar to the Unix pipe systems where the output produced by one command is the input to the next.

The second component is a scalable stream analytics engine. Inspired by Google’s “Dataflow Model” paper, Flink, at its core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. One of its most interesting API features allows usage of the event timestamp to build time windows for computations.

The third and fourth components are a real-­time analytics store, Elasticsearch, and a powerful visualization tool, Kibana. Those two components are not critical, but they’re useful to store and display raw data and results.

Mapping the Kappa architecture to its implementation, Figure 2-5 illustrates the resulting data pipeline.

Kappa architecture reflected in a data pipeline
Figure 2-5. Kappa architecture reflected in a data pipeline. Credit: Ignacio Mulas Viela and Nicolas Seyvet.

This pipeline creates a composable environment where outputs of different jobs can be re­used as inputs to another. Each job can thus be reduced to a simple, well-defined role. The composability allows for fast development of new features. In addition, data ordering and delivery are guaranteed, making results consistent. Finally, event timestamps can be used to build time windows for computations.

Applying the above to our telco use case, each physical host and virtual machine (VM) telemetry and log event is collected and sent to Kafka. We use collectd on the hosts, and ceilometer on the VMs for telemetry, and logstash-forwarder for logs. Kafka then delivers this data to different Flink jobs that transform and process the data. This monitoring gives us both the physical and virtual resource views of the system.

With the data pipeline in place, let’s look at how a Bayesian model can be used to detect novelties in a telco cloud.

Incorporating a Bayesian Model to Do Advanced Analytics

To detect novelties, we use a Bayesian model. In this context, novelties are defined as unpredicted situations that differ from previous observations. The main idea behind Bayesian statistics is to compare statistical distributions and determine how similar or different they are. The goal here is to:

  1. Determine the distribution of parameters to detect an anomaly.
  2. Compare new samples for each parameter against calculated distributions and determine if the obtained value is expected or not.
  3. Combine all parameters to determine if there is an anomaly.

Let’s dive into the math to explain how we can perform this operation in our analytics framework. Considering the anomaly A, a new sample z, θ observed parameters, P(θ) the probability distribution of the parameter, A(z|θ) the probability that z is an anomaly, and X the samples, the Bayesian Principal Anomaly can be written as:

A (z | X) = ∫A(θ)P(θ|X)

A principal anomaly as defined is valid also for multi­variate distributions. The approach taken evaluates the anomaly for each variable separately, and then combines them into a total anomaly value.

An anomaly detector considers only a small part of the variables, and typically only a single variable with a simple distribution like Poisson or Gauss, can be called a micro­model. A micromodel with Gaussian distribution will look like Figure 2-6.

Micromodel with gaussian distribution
Figure 2-6. Micromodel with Gaussian distribution. Credit: Ignacio Mulas Viela and Nicolas Seyvet.

An array of micro­models can then be formed, with one micro­model per variable (or small set of variables). Such an array can be called a component. The anomaly values from the individual detectors then have to be combined into one anomaly value for the whole component. The combination depends on the use case. Since accuracy is important (avoid false positives) and parameters can be assumed to be fairly independent from one another, then the principal anomaly for the component can be calculated as the maximum of the micro­model anomalies, but scaled down to meet the correct false alarm rate (i.e., weighted influence of components to improve the accuracy of the principal anomaly detection).

However, there may be many different “normal” situations. For example, the normal system behavior may vary within weekdays or time of day. Then, it may be necessary to model this with several components, where each component learns the distribution of one cluster. When a new sample arrives, it is tested by each component. If it is considered anomalous by all components, it is considered anomalous. If any component finds the sample normal, then it is normal.

Applying this to our use case, we used this detector to spot errors or deviations from normal operations in a telco cloud. Each parameter θ is any of the captured metrics or logs resulting in many micromodels. By keeping a history of past models and computing a principal anomaly for the component, we can find statistically relevant novelties. These novelties could come from configuration errors, a new error in the infrastructure, or simply a new state of the overall system (i.e., a new set of virtual machines).

Using the number of generated logs (or log frequency) appears to be the most significant feature to detect novelties. By modeling the statistical function of generated logs over time (or log frequency), the model can spot errors or novelties accurately. For example, let’s consider the case where a database becomes unavailable. At that time, any applications depending on it start logging recurring errors, (e.g., “Database X is unreachable...”). This raises the log frequency, which triggers a novelty in our model and detector.

The overall data pipeline, combining the transformations mentioned previously, will look like Figure 2-7.

Data pipeline with combination of analytics and Bayesian anomaly detector
Figure 2-7. Data pipeline with combination of analytics and Bayesian anomaly detector. Credit: Ignacio Mulas Viela and Nicolas Seyvet.

This data pipeline receives the raw data, extracts statistical information (such as log frequencies per machine), applies the Bayesian anomaly detector over the interesting features (statistical and raw), and outputs novelties whenever they are found.

Conclusion

In this blog post, we have presented an approach using the Kappa architecture and a self-training (online) Bayesian model to yield quick, accurate analytics.

The Kappa architecture allows us to develop a new generation of analytics systems. Remember, this architecture has four main principles: data is immutable, everything is a stream, a single stream engine is used, and data can be replayed. It simplifies both the software systems and the development and maintenance of machine-learning models. Those principles can easily be applied to most use cases.

The Bayesian model quickly detects novelties in our cloud. This type of online learning has the advantage of adapting over time to new situations, but one of its main challenges is a lack of ready­-to-­use algorithms. However, the analytics landscape is evolving quickly, and we are confident that a richer environment can be expected in the near future.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset