Appendix. Capturing data on the web

As you’ve learned in this book, intelligent applications are those that can change their behavior based on information. It follows, then, that we must have a mechanism for the capture and access of data. Because we’re talking about web-scale processing, it stands to reason that we may need a system designed with the following in mind:

  • Volume— Our system should be capable of dealing with web-scale data.
  • Scalability— Our system should be configurable with changing load.
  • Durability— Outages or network blips shouldn’t affect the eventual consistent state of data.
  • Latency— We shouldn’t expect to wait long periods of time between data being generated and data being processed.
  • Flexibility— Access to the data should be flexible, allowing multiple services to read and write from the data platform, each at different states of progress.

Typically, in the internet industry, these are issues relating to logging in the sense that when an event occurs, it has traditionally been written down in a log, or log file. In the coming sections, we’ll discuss in detail the implications of the log file before providing an alternative, which we can assess against the previous points. To set the scene, and to provide an illustrative example to refer to throughout the remainder of this appendix, we introduce a use case from the world of online advertising.

A motivating example: showing ads online

Although many may despair at the prevalence of ads when browsing the web, nobody can deny that ads are here to stay! With revenues in the billions,[1] these numbers look set to increase steadily as the number of people using electronic devices to consume media continues to grow.

1

Internet Advertising Bureau, “Digital Ad Revenues Surge 19%, Climbing to $27.5 Billion in First Half of 2015,” October 21, 2015, http://mng.bz/Ud70.

At its core, the concept of online advertising is simple, but behind this simplicity hides significant complexity. In its simplest form, advertisers either pay a publisher to show an ad or pay to have an ad shown if it’s interacted with. There are more complicated models, but we don’t want to confuse matters here. The first case is paid for on a cost per mille (CPM) basis; that is, for every thousand ads shown, an amount is paid from the advertiser to the publisher. The latter is charged on a cost per click (CPC) basis, where payment is exchanged every time a consumer sees and subsequently clicks an ad.

The complexity arises in the process by which this all occurs. Rather than advertisers working with publishers directly, these interactions can occur on an open market, or exchange. In these cases, the ad opportunity is commoditized, encouraging competition. This in turn drives more relevant ads and an arguably better experience for the web user.

You may have noticed that in the CPC case, there exists an opportunity for arbitrage. Operating on the demand (advertiser’s) side, you could operate a service that uses the data collected from users and their interaction habits to target ad opportunities in order to improve the interaction or click-through rate (CTR). Such parties can buy ad opportunities cheaply on a CPM basis and use their data about web and ad interaction to provide an above-average performance. These ad placements can then be sold on a CPC basis at a profit. Figure A.1 illustrates this process. What we’ve just described is a simplified version of a demand-side platform (DSP) operating an intelligent algorithm, discussed in chapter 5. This example will also serve us well to demonstrate the world of data collection.

Figure A.1. Overview of a demand-side platform. DSPs use their knowledge of the user to provide better interactions. Consequently, they can arbitrage content bought on a CPM basis, selling this on a CPC basis.

Data available for online advertising

As you know, intelligent algorithms run on data, so let’s delve a little deeper into the data that can be collected for ad targeting. Every time our fictional DSP interacts with an exchange, it performs cookie synchronization. That is, either the exchange will pass the DSP a user ID known to the DSP, or the exchange will pass its own ID and the DSP will have to look up its own ID accordingly. The reasons for this have to do with the security of the page and the scope of the identifiers; an exact explanation lies outside the scope of this example. Suffice to say that the DSP can obtain an ID in the DSP’s format to look up information regarding user behavior.

What information does the DSP store? Essentially, any interaction with associated users, sites, and ads. To illustrate, let’s work through an example. Say you visit Adidas online, while Adidas is working with our imaginary DSP. You become associated with the DSP in rather a complicated way. All interactions with that site and any partner sites become accessible to the DSP. So if you look at the running-shoe collection, a message is sent back to the DSP informing it so. If you also look at the casual footwear selection, similar information is transmitted. If you’re shown an ad by the DSP, or if you click one of its ads, the DSP is similarly notified. Where it gets interesting is that these messages aren’t limited in scope to Adidas alone. If you visit another of the DSP’s partners—say, Walmart—this information is also sent back to the DSP and stored against your identifier. We talk more about how this information is used to perform segmentation in chapter 5; for now, all you need to know is that for every interaction on the web, a lot of information is being stored about you. For a DSP, this presents a significant challenge in collection and storage.

Data collection: a naïve approach

As always, there’s a really simple approach we could use to collect and process data, but as you may expect, it’s suboptimal. To understand what’s happening under the hood, let’s look at the hypothetical naïve data-processing architecture of a simple DSP. Figure A.2 shows a visual overview.

Figure A.2. Overview of the log-processing architecture of a typical DSP. Web page interactions are recorded by the browser and sent back to the DSP’s server(s) via HTTP. Depending on the interaction type, these will be written to different files and stored in different locations. Over time, these files are collated and imported to a database for downstream processing.

Interactions are recorded by the browser and sent back to the DSP’s servers for processing. As shown, the receiving server will have many concurrent connections and will have to be configured to deal with a high connection load. Once interactions are received, a number of things need to happen. First, as you can see, depending on the interaction type, the event is logged into different folders. The reason has to do with priority. Click logs equate to money, so minimizing the latency of this data makes good business sense.

For the purposes of log shipping, where log files are sent on to the database, it makes sense to periodically finalize files and start new ones so that we can send the data as soon as possible. Consequently, files need to be continually created, finalized, and recycled while data is entering and flowing through the system.

At this stage, finalized data is shipped to the database for import. Here we must deal gracefully with the several possible failure cases. First, corrupt files must be ring-fenced for manual intervention. Processed files must be separated for archival, and transmission failures must be retried. It’s imperative that data be received by the database in a consistent state, because without this, billing and reporting can’t happen effectively.

Managing data collection at scale

What we’ve just described is an extremely simple and hypothetical example of a DSP log-processing pipeline. In reality, significant complexity is added in several areas. Let’s return to our required properties as stated at the start of this appendix and investigate the suitability of this solution.

To achieve high volume in such a system, the connection thresholds of the receiving machine need to be maximized; but going beyond this, we must find a way to parallelize log processing at the receiver. There are many patterns by which this can be achieved. Processing can be performed by machines with local affinity (to the site or to the user) or that are randomly balanced. Although this allows for horizontal scaling, files must be recombined at some point. This can happen either during an intermediate stage or at the database after shipping. The problem with such an approach is it introduces several points of failure. It also introduces a layer of coordination between the nodes and the database. For example, the database may need to wait for a failed node to come up before files can be combined and imported.

Although such a system is scalable, how this is achieved is closely tied to the chosen architecture. For example, it’s much easier to add a processing node to a system that’s randomly balanced, rather than one that has been carefully built to manage a particular load profile (for example, where site locality is considered). Scalability certainly isn’t guaranteed or automatic.

Durability has to be designed into the system. We’ve already mentioned transmission failure and corruption, and as you can see, this requires a high degree of logic and coordination between different stages of the processing pipeline. Although not impossible to get correct, the problem is complex enough that it requires some care in implementation.

Latency in this system is closely related to several aspects of its design. In order for an event to be shipped to the database, it must first have reached the file writer and been written to a file, and that file must be closed. When this happens will depend on the load and queuing capability of the receiving server as well as the maximum file size for a log file. After shipping, the file must again wait, to be imported by the database application.

In terms of flexibility, there’s only a single consumer of the data: the database. Multiple consumers would result in an increase in transmission bandwidth, because log files would have to be sent to all consumers. This would have an associated impact on the logic required to keep the data consistent. If consumers were allowed to be at different states of progress, enabling logic would have to be borne by the log-shipping code.

Hopefully, this simple example is illustrative enough to convince you that data capture at web scale isn’t easy! This, of course, all depends on your application and its requirements, but striving to achieve the properties at the start of this appendix will lay the groundwork for the rest of your application. Each property has an associated knock-on effect to your intelligent algorithm and its deployment lifecycle. For example, if your training data reaches your algorithm with low latency, you can train and deploy more-relevant models more quickly. If access is flexible, multiple algorithms can consume the data in different ways in parallel. Wouldn’t it be nice if there were a system that was designed and built for this purpose?

Introducing Kafka

As it happens, there is a system built for this purpose! Apache Kafka is a distributed log-processing platform for dealing with high volumes at low latency. At its core, Kafka is a distributed cluster of brokers. Producers of data can publish messages to a topic (a stream of messages of a given type) that are stored at a set of brokers. Consumers can then subscribe to one or more topics and consume data directly from the brokers. Before delving further into the details of this powerful framework, let’s get our hands dirty—it’s time for some code!

First, you’ll need to get the latest version of Apache Kafka up and running on your system. At the time of writing, version 0.10.0.0 is the current stable release and can be downloaded directly from the Kafka project home page (http://kafka.apache.org/). Be sure you download the binary version built for the version of Scala you have installed. For this appendix, we’ve tested against Scala 2.10.4. Once you’ve downloaded the file, unzip it to a convenient location and navigate to the root of the archive. You can then start Kafka using the default configuration with the following two commands:

./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
./bin/kafka-server-start.sh ./config/server.properties

This will start up Kafka and will by default bind to port 9092, as specified in the server.properties file. You’ll also notice that something called Zookeeper is started and bound to port 2181. This is a centralized service that’s used to share Kafka configuration—more on this later. To communicate with Kafka, you’ll need bindings in your particular language of choice. Because we’re using Python throughout this book, we’ll use David Arthur’s kafka-python[2] to connect to our Kafka instance (v0.9.5). In order to run the examples in this appendix, we’re going to assume that you’ve made this available to your favorite Python environment already. More details on installation can be found in the book’s associated requirements document, included with the code available at the book’s website (www.manning.com/books/algorithms-of-the-intelligent-web-second-edition). The following listing provides the Python code to send your first set of messages to your Kafka instance.

2

David Arthur, kafka-python, https://github.com/dpkp/kafka-python.git.

Listing A.1. Kafka simple publish

Run each line of listing A.1 in a Python prompt. All being well, you should see output that resembles the following:

[ProduceResponse(topic='test', partition=0, error=0, offset=1)]
[ProduceResponse(topic='test', partition=0, error=0, offset=2)]
[ProduceResponse(topic='test', partition=0, error=0, offset=3)]

So, what just happened? Let’s go through this line by line. After initial imports from the kafka-python module, a KafkaClient object is created. We then instantiate a Simple-Producer using this and send three messages to the “test” topic before exiting.

Let’s investigate this a little further. The send_messages method returns a list of ProduceResponse objects, one for each message submitted by the method. Each object has topic, partition, error, and offset attributes. We’ve already mentioned topic, but what about the other attributes? Figure A.3 shows the relationship among topics, partitions, and offsets.

Figure A.3. Anatomy of a topic, taken from the Kafka documentation.[3] Topics consist of a partitioned log, which can only be appended to. The partitions are immutable: they can’t be changed once committed.

3

Apache Kafka Documentation, Apache Projects, http://kafka.apache.org/documentation.html.

Topics comprise multiple partitions. Each partition must be held on the same broker, but the partitions for a single topic can be spread over multiple brokers. This serves several purposes. First, it allows topics to scale beyond the size of a single server. Although each partition must physically fit on the machine that hosts it, the logical unit—the topic—can grow to be much larger than a single machine. It also provides a level of redundancy and parallelism. Duplicate partitions can be held, and multiple partitions can be read from and serviced by different machines. The number contained in the partition illustrates the offset in figure A.3. This uniquely identifies an element in the partition; hence the immutability constraint.

The cluster keeps all messages published for a length of time that is configurable by topic, after which logs are deleted to free up space. This means any consumer can go back in time to data that has already been processed. This is achievable because state regarding log processing can be held by the consumer itself; it equates to the offset in each partition.

Returning to our response from the send_messages method, we can now make sense of these attributes. The topic and partition attributes are the topics and partitions written to. The offset provides the datum’s place in the partition. Any errors occurring while writing will be captured in the error attribute. Now that you understand the internals of Kafka a little more, let’s revisit our example and see if you can find those messages. The following listing provides the code to achieve this.

Listing A.2. Kafka simple subscribe

Here we create a SimpleConsumer, which takes as arguments a KafkaClient as well as two string arguments. The first is known as a consumer group, and the second relates to the topic the consumer is to consume from. The concept of a consumer group allows multiple consumers to coordinate, such that a single message published to a topic is only ever delivered to a single member of a consumer group. In this example, there’s only a single consumer in our group, titled mygroup, and it will receive all messages destined for this group.

There’s something else you should know about the SimpleConsumer that may not seem obvious at first. Notice that we haven’t specified any partitions or offsets from which to start reading. This is because SimpleConsumer takes care of this for us. For each partition in the topic, and for each consumer and consumer group, the offset to which that consumer reached is recorded. This is stored in Zookeeper, which if you recall was started at the same time as Kafka. Thus, multiple executions of the previous code won’t start from offset zero but instead will pick up at the position after the last successful read. Try it and see!

Replication in Kafka

One of Kafka’s features is its ability to hold replica partitions, for the purposes of both parallelism and durability. Let’s now store our log data with a replication level of 3. That is, for each partition, the data is stored on three separate brokers.

This requires a bit of setup when starting Kafka. In order to achieve a replication level of 3, we need to have at least three brokers running. In our initial example, we used the stock server.properties file. Now we need to modify this and create two additional configuration files. See the next listing for the sections in server.properties that need to be changed.

Listing A.3. Configuration changes in server.properties for broker 2

We need to change three separate parameters in order to use the properties file to create multiple Kafka instances. First, we need to change broker.id. This is the unique identifier of the broker in the Kafka cluster. Second, because each of the three brokers will run on the same host, we must change the port. Finally, each instance needs an exclusive location in which to store its data. The location of this data is specified by the log.dirs parameter. The example in listing A.3 provides the changes for the relevant sections in the case of the second broker. We’ll leave it as an exercise for you to create the third server.properties file. Once it’s created, you can add two more brokers to your cluster using the following commands, assuming you’ve matched the naming convention and location of our new configuration files:

./bin/kafka-server-start.sh ./config/server-1.properties
./bin/kafka-server-start.sh ./config/server-2.properties

Note that because we haven’t modified Zookeeper details in server.properties, they will all communicate with the same Zookeeper instance. No further configuration is required to ensure that they work as part of the same cluster. You’ll require some configuration to create a topic with a new replication level, however. Navigate to your Kafka folder, and input the following:

bin/kafka-topics.sh –-create –-zookeeper localhost 2181 
    -–replication-factor 3 –-partitions 1 –-topic test-replicated-topic

This will create a new topic called test-replicated-topic. This topic has one partition and a replication level of 3. To confirm this, issue the following command:

./bin/kafka-topics.sh --describe --zookeeper localhost:2181

All being well, you should see some output that looks remarkably like the following listing.

Listing A.4. Kafka topic summary
Topic:test   
PartitionCount:1   
ReplicationFactor:1    
Configs:
         Topic: test  
         Partition: 0    Leader: 0    Replicas: 0    Isr: 0

Topic:test-replicated-topic   
PartitionCount:1   
ReplicationFactor:3   
Configs:
         Topic: test-replicated-topic   
         Partition: 0    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2

Let’s take a minute to step through this, because it will help further on in this appendix. The output of this command summarizes what we’ve done to date. In listing A.1, we automatically created the test topic, with the default parameters as specified in the server.properties file. At the time of writing, this is a default of one partition per topic and a replication factor of 1: no replication. The final column heading, Isr, stands for In-Sync-Replicas, and we’ll discuss this, along with leaders, in the next section. You also see the recently added test-replicated-topic topic. As expected, this has only a single partition but with a replication level of 3. In order to publish to our new topic, let’s modify the existing code from listing A.1.

Listing A.5. Kafka publish to cluster

We provide an alternative construction of SimpleProducer. Here we specify synchronous communication, blocking until the entire cluster has acknowledged a new message before SimpleProducer returns a success. Note that it’s also possible to set async=True, in which case SimpleProducer won’t wait for a response.

After running listing A.5, kill the broker that’s using the server-1.properties file, and rerun listing A.2, replacing "test" with "test-replicated-topic". What do you notice? In this scenario, you’ll still receive the three new messages, starting with “Hello Kafka Cluster!” But why? Let’s delve a little deeper. By querying the topics (via the ./bin/kafka-topics.sh command issued previously), you should now see an updated summary as given in the next listing.

Listing A.6. Kafka topic summary (dead replica)
Topic:test  
PartitionCount:1  
ReplicationFactor:1  
Configs:
    Topic: test   Partition: 0   Leader: 0   Replicas: 0   Isr: 0

Topic:test-replicated-topic   
PartitionCount:1  
ReplicationFactor:3  
Configs:
    Topic: test-replicated-topic   Partition: 0   Leader: 0  Replicas:
0,1,2   Isr: 0,2

You can see that Isr has gone from 0,1,2 to 0,2. You might also notice (although it isn’t shown here) that the leader has changed. Both of these are a direct consequence of the unavailability of broker 1, which was just killed. We’ll discuss this in more detail in the following sections.

Message acknowledgement

Before we continue to discuss the low-level details of replication, let’s take a little time to understand message transmission and the different levels of acknowledgments available as we produce our cluster. To start with, we have two types of communication: asynchronous and synchronous. The implication of operating synchronously is that messages are dispatched on the same thread that’s calling the producer. The producer has the opportunity to block, waiting for a response before sending the next message. Conversely, with asynchronous communication, new threads are fired up to dispatch messages in parallel without the overhead of blocking. Although this is the least-safe way to produce messages, it allows for extremely high throughput (messages are batched to amortize communication overhead); thus, where some failure can be tolerated, asynchronous communication may be the most appropriate configuration.

For this example, let’s assume we’re going to operate synchronously. We then have three options available when instantiating SimpleProducer:[4]

4

  • ACK_NOT_REQUIREDSimpleProducer isn’t dispatching multiple threads for delivery; neither is it waiting for acknowledgement from the leader of the topic being written to. This almost offers the worst of all worlds, because the lack of acknowledgment means no delivery guarantees can be made, whereas the lack of asynchronous delivery provides the majority of load on a single thread.
  • ACK_AFTER_LOCAL_WRITE—Guarantees that the leader has written the data to its local log (although not committed it) before sending a response to the producer. Note that this doesn’t guarantee consistency across the replicas, because should this leader immediately fail after the record has been acknowledged, replicas will never receive the new data. This is an intermediate guarantee that provides some trade-off between write speed and durability.
  • ACK_AFTER_CLUSTER_COMMIT—Using this ensures that the leader will return an acknowledgment only when its local message has been committed: that is, the full set of in-sync replicas (note that this differs from the full set of replicas) has responded to acknowledge their committed data, and the data has been committed locally. This offers the strongest guarantee of availability, but, predictably, the latency between producer message submit and acknowledgement is higher. Provided at least one of the in-sync replicas remains available, this guarantees data availability.

Figure A.4 provides a graphical overview of these acknowledgement modes.

Figure A.4. The different levels of acknowledgments in Kafka. From left to right, they offer increasing degrees of availability at the cost of acknowledgement latency.

Under the hood: replication, leaders, and in-sync replicas

We’ve already touched on Kafka’s replication model, but now it’s time to delve further into the details. So far, we know that topics are made up of partitions, and each of these must reside on a single broker. We also know that topics can be created with a replication factor, which controls the level of data redundancy.

Although replication may be specified at the topic level, it’s controlled at the partition level. If you refer back to listing A.6, you’ll see an entry per partition. It just so happens that our replicated topic has only a single partition for demonstration purposes. Here, partition 0 has three replicas, but only two are in sync. The leader is replica 0. Let’s look at this state from the broker’s perspective.

For each partition, a leader is assigned. This is, in some sense, the owner of that partition. All read/write operations for this partition are mediated by the leader. Thus, when a producer publishes a message to a partition, it’s sent first to the leader, which may or may not wait for commits to its own log and other replicas’ logs before issuing a response. We described the protocol and options for this earlier.

Note that the concept of message acknowledgment and the internal mechanisms of synchronization are different but related concepts. These two principles are decoupled in the sense that any level of message acknowledgement can be requested, but this will have no effect on the attempts made by the cluster to achieve consistency. They do, however, allow the producer to block on different events within the replication protocol and modify downstream behavior dependent on certain outcomes. How replica synchronization is achieved, even in the case of temporary or permanent network failure, is best demonstrated with an example of in-sync replicas (ISRs) that we reproduce from a paper by Jun Rao[5] and show in figure A.5.

5

Jun Rao, “Intra-cluster Replication in Apache Kafka,” February 2, 2013, http://mng.bz/9Z99.

Figure A.5. Kafka replication, reproduced from Rao. In this example, we have four brokers and one topic with two partitions. Broker 1 leads partition 1, and broker 4 leads partition 2.

In figure A.5, we present a Kafka cluster of four brokers. This cluster manages a single topic with two partitions. The replication factor of this topic is set to 3. Imagine that we’re going to write some data to partition 1. Data is forwarded on to broker 1, and the three brokers, 1, 2, and 3, achieve consistency through the use of in-sync replica sets.

In-sync replicas are those whose commit log is up to date with respect to the leader. Consequently, the leader always has to be in the ISR. When a message is published to the leader, it sends the message to the followers and waits until all replicas in the ISR have committed the message. If any of the replicas fail to commit, those replicas will be dropped from the ISR, and the commit will complete with under-replication.

To take a concrete example from figure A.5, if our producer is to write to topic1-part1, it will first communicate with broker 1. The type of acknowledgement the producer will block for will depend on the acknowledgement level; but regardless of this, the replica set will now proceed to obtain synchronization. Under normal operation, the ISR will be equal to the replica set for that partition. Thus, broker 2 and broker 3 will communicate with broker 1 to transfer, write, and commit the entering data. Once this is done, broker 1 will commit this message locally, and the operation will conclude. If, however, broker 3 can’t be reached (say, some timeout is hit), then broker 1 will remove broker 3 from the ISR, commit the message locally and at broker 2, and terminate. In this scenario, brokers 1 and 2 have commit logs that are in sync, but broker 3 is out of sync.

Should broker 3 reemerge, it will need to catch up with the rest of the replica set. It does this by communicating with the leader, broker 1, to see what it missed. For this purpose, the leader maintains a high watermark (HW), which is a pointer to the last committed message in the ISR. Broker 3 will truncate its commit log at this point, catch up with the leader, and be added back to the ISR. There’s another possibility here: that the leader fails. Because Zookeeper is used to detect failures and manage the configuration of the cluster, it’s relied on to reelect new leaders in the event that the original leader failed. If this occurs, Zookeeper must inform all replicas of the new leader so they know which broker to follow.

It’s interesting to note that data loss can happen. For example, should a leader become unavailable, any data written but not committed would be lost upon election of a new leader. Kafka was designed for high throughput and primarily for redundancy within a data center, because its approach to synchronization makes it unsuitable where high latencies exist between replicas. Consequently, the likelihood of ISR set change and/or leader reassignment is low. A small amount of data loss is therefore acceptable for most logging applications given the throughput that can be achieved. The article from which this example is taken provides a much more comprehensive treatment of replication, so we refer you to Rao for more information.

Consumer groups, balancing, and ordering

In listing A.2, we introduced the concept of Kafka consumer groups. This abstraction allows Kafka to be used either as a queue, whereby multiple consumers process items from the head, with each message going to a single consumer, or as a publish-subscribe system, in which each message is broadcast to all consumers.

Recall that a single message published to a topic is guaranteed to be delivered only to a single consumer of a consumer group. So where we have one topic and one group, this emulates the behavior of a queue. Where each consumer is a member of its own group, this provides the same behavior as a publish-subscribe system—each group receives the same set of messages.

Each partition is assigned to a single consumer in a group, such that this is the only consumer in the group to receive messages from it. As you’ll see, provided the partitions are well balanced, this also balances the load on consumption, although it does mean the maximum number of consumers that can consume data from a topic is limited to the number of partitions.

It’s worth noting that Kafka provides ordering guarantees only over a partition, the intuition being that if a was written before b within a partition, it will also be consumed in this order. It provides no guarantee on ordering between partitions: when reading from multiple partitions, it may be possible to read a message that was written before another, but this would have to come from a different partition. In this sense, Kafka relaxes some of the guarantees made by a queue in order to achieve high throughput via parallelism. In practice, and for most logging applications, provided you can specify how data is partitioned, this should be sufficient. For example, you may choose to partition incoming web-click logs by some modulo of cookie/user ID. This ensures that partitions will be balanced and that all logs from a given user will be guaranteed to be in the same partition. You can then use ordering on this partition when trying to understand how users interacted with your ads. We’ll look at this in more detail in the next section.

Putting it all together

In this section, we’re going to bring everything together to provide an example that illustrates all the concepts introduced so far. We’ll again use a single topic, which we’ll call click-streams. This topic will be split into three partitions with level 3 replication. We’ll operate with two consumers groups: one containing a single consumer and the other containing three consumers. These will be named group-1 and group-2, respectively. Finally, we’ll ensure that all clicks from the same user end up in the same partition. Figure A.6 provides an overview of our final Kafka setup.

Figure A.6. Our final Kafka cluster. In this example, we use a low-level producer to perform our own partitioning of the stream of data. This cluster is operating with three brokers, three partitions, and level-3 replication. Two consumer groups exist, one with a single consumer called group-1 and the other with three consumers, known as group-2.

We previously presented the code required to bring up a new cluster consisting of any number of brokers and to create a new topic and specify the number of partitions and replication level. We also specified in listing A.2 how to bring up a consumer within a consumer group. What we don’t specify, however, is how to perform custom partitioning, because previously this was taken care of by SimpleProducer. To demonstrate this concept, we’ll introduce a schema of data as illustrated in the following listing. This can also be found in the additional resources available at this book’s website (as file A1.data), should you wish to follow the example programmatically.

Listing A.7. Click user data

This is a fictitious and simplified dataset based on page interaction. Note that the schema has three distinct columns. The first is a uniquely identifying code, or globally unique identifier (GUID); the second is the time of the interaction; and the third is the interaction type. There are only three possible types of interaction: adview, which corresponds to the action that an ad has been seen by a user; click, which denotes a click by a given user; and convert, which is set if the user buys a product.

We might imagine a payment method whereby our DSP gets paid a certain amount for every click, with the caveat that any clicks from the same user that happen within a set period of time (say, five minutes) should be counted only once. A quick glance at listing A.7 shows that the annotated user viewed an ad and was recorded as clicking twice before converting (perhaps buying the product in the ad they saw!). It also shows that the clicks happened within a few minutes of each other. Consequently, some downstream processing would need to occur if we were to discount the second click, leaving only a single click in our interaction stream.

If we revisit the example from the start of this appendix, listing A.1, we published to a single partition topic called test. What we’d like to do, though, is to have multiple partitions and somehow ensure that all information regarding a single user ends up at a single partition. As a secondary requirement, it would be ideal if the data were balanced across the available partitions. The next listing provides the code to do this.

Listing A.8. Custom partitioning with a producer

This toy producer partitions by user GUID. In a production implementation of such a system, the click stream would come directly from a web browser or associated code, but for the purposes of illustration this data comes directly from a file.

If your broker from listing A.6 is still dead, restart it now. If you wish to run this example, you’ll need to create the click-streams topic with replication level 3. Make sure you do this before you run the code in listing A.8; otherwise, you’ll end up automatically creating a topic with too few partitions:

bin/kafka-topics.sh --create 
                    --zookeeper localhost 2181 
                    --replication-factor 3 
                    --partitions 3 
                    --topic click-streams

Recall that we wish to ensure that all click streams from the same user end up at the same partition. Working through listing A.8, you can see that each GUID has the modulo operator applied to it (modulo 3), and the resulting number is used to determine the partition to publish to. Thus, all entries for the same user result in the same output (modulo 3) and will end up at the same partition! This partition isn’t exclusive for that user—other users’ data will also end up there—but this isn’t a requirement for the system, because the number of partitions is much less than the number of users. This method of custom partitioning has another useful property. Assuming the GUIDs are randomly distributed, then so will the result, modulo 3. This means users will be, in the limit, evenly balanced across the partitions. This is important in order to distribute both the publish and subscribe loads among the available resources.

From the perspective of consumer groups in figure A.6, let’s work through the partition assignment to ensure that a single consumer really does obtain all the data for a given user. In group-1, consumer A is the only member and thus will receive all the data for the topic click-streams. It does this by periodic communication with the leaders of all partitions of the topic. Because this consumer acquires data from all partitions, for any given user, all data will end up with a single consumer.

Perhaps more interesting is group-2, containing consumers B, C, and D. Recall that the consumer group guarantee is such that a given message is to be received by only a single consumer within a consumer group and that this is achieved through the assignment of a partition to a consumer. In this case, the leader of each partition communicates with a single consumer. Broker 1 (leading partition 1) is assigned to consumer B, broker 2 to consumer C, and broker 3 to consumer D. Note that this is the maximum number of consumers possible in this example because the number of consumers in a group can’t exceed the number of partitions in the topic it’s consuming from.

Because of our custom partitioning strategy, it’s now guaranteed that a single consumer consumes the messages from a given user. It would be possible to do things like click de-duplication (to remove erroneous clicks and work out how much the DSP should bill its client) at the consumer, because all relevant data is processed by that consumer and also because the ordering guarantees provided by Kafka ensure that events that are published in order at a partition will be retrieved in that same order.

There’s also an intermediate example, which we haven’t illustrated here. In the case where we have two consumers and three partitions, one of the consumers will be responsible for two of the partitions. This assignment will be random, and it does impact the load-balance on the consumers; but it again ensures that a single consumer obtains all the data for a given user.

Evaluating Kafka: data collection at scale

Thus far, we’ve presented Kafka along with the kafka-python client, which can be used to communicate with your cluster via Python. We present this as an alternative to a bespoke log-shipping service, which, although simple to understand, has been demonstrated to be non-optimal for our purposes. For completeness, and to illustrate the design of Kafka, we’ll compare a solution based on Kafka against our naïve yardstick—illustrating how it has the potential to perform better with respect to each of the metrics introduced at the start of this appendix.

One of the most important features for a web application is the ability to deal with large volumes of data. When assessing our naïve solution, we noted that in order to parallelize, we’d need to add a degree of coordination into the system that might be difficult to get correct. We also noted that data processed in parallel would need to be recombined at some point. Kafka takes care of this for us. By partitioning the data, we immediately add a level of parallelism in the order of the number of partitions. Zookeeper takes care of discovery and management, and combination can occur at the consumer. In order to get the highest level of parallelism, Kafka should be configured with distinct and unrelated topics, whose consumption patterns are independent from each other. This allows consumer groups to operate in parallel, potentially transforming that data as it’s consumed.

Scalability was considered a major sticking point in our naïve solution, because it was tied quite tightly to the chosen architecture. More clicks in a region might mean regional server processing power might need to be increased, but this might have a downstream effect when aggregating logs. Kafka can help in this situation, because it’s possible to add additional brokers to a cluster and rebalance partitions. It’s also possible to increase the number of partitions to make the best use of the broker pool. This centralizes scaling to the cluster, rather than distributing throughout the log-shipping pathway. As an aside, Kafka can also be configured for multicluster operation through mirroring. This provides a higher-latency link between clusters, suitable for cross–data center synchronization.

Durability is a strong point of Kafka. We’ve already discussed how Kafka replicates data, which provides fault tolerance where replica factors greater than 1 are used. In these cases, brokers can fail completely without affecting data flow within the system. To compare this to our naïve solution, we’d need to manually create and manage copies. Again, although not impossible, this adds a significant amount of complexity to a file-management service.

In our naïve approach, latency isn’t a tunable parameter. When an event occurs, we must wait until it has been logged, that file has been closed, and the data eventually reaches its destination before we can consider that event to be processed. In contrast, if we look at Kafka, both read and write latency can be parameterized individually. In the write case, we can choose the level of acknowledgement that we require before continuing to write (see the earlier section “Replication in Kafka”), and we can even choose to write asynchronously, not waiting for acknowledgements at all. Here we’re trading write latency for consistency. We can get low-latency writes, but we must deal with the possibility that a leader failure may cause a small amount of data loss as the new leader is elected. In terms of read latency, because reads start from the last recorded offset, leaders can wait as short or long a period as they wish (up to the data storage window of the cluster) before running again and reading from the offset where they left off.

The naïve solution presented assumed by design that there was a single consumer. It was a pipeline in the sense that generated data would eventually emerge in a database at some point in the future. It was also very much a push-based system. No provision was made to store data in any location other than its final resting place, so replaying data—or having multiple processes consume data—is a departure from the norm. In comparison, Kafka has been designed with these properties in mind. It’s easy for multiple processes to consume data; we simply fire up an additional consumer group. It’s also easy to replay data, provided it’s within the window of storage offered by the cluster. These properties are excellent for prototyping and deploying intelligent algorithms, because relevant data is always available on tap. This means we can train, deploy, and test algorithms quickly, tightening the loop between data generation and decisions made from that data.

Kafka design patterns

To consolidate some of the content covered here, we’ll finish by taking a look at some standard design patterns for the use of Kafka. So far, we’ve determined that Kafka is great as a data-brokerage platform; but to develop useful applications, this alone isn’t enough. Now that we’ve tamed the act of moving data from A to B, it’s imperative that we be able to do something useful with that data. Toward this end, we’ll present two potential use cases that may be of interest to you as you think about how to implement the algorithms introduced in the later chapters of this book.

In the first use case, we’ll deal with how to perform data transformations in flight. In the second, we’ll look at how we might extract data ready for querying in some batch data platform. Note, however, that these aren’t necessarily mutually exclusive! Stream processing could occur before data is transferred to a batch-processing engine, or these steps could occur in parallel.

Kafka plus Storm

Kafka alone can’t perform any transformations to the data it ingests. If we wish to perform in-stream transformations, we must couple Kafka with an additional technology, such as Storm.[6] Storm is a distributed, real-time system for processing stream data. This allows us to process data in arbitrary and complex ways through the use of several simple literals, two of which are the spout and the bolt. The spout acts as a source of data, whereas the bolt is a processor of data that can consume a number of streams and do some processing, possibly emitting a new stream. These literals, when combined with several ways to group and relay data between stages (shuffle randomly to the next stage, partition by a particular field, send to all, and so on), provide a powerful way to process data in flight. Such combinations are known as topologies.

6

Sean T. Allen, Matthew Jankowski, and Peter Pathirana, Storm Applied: Strategies for Real-Time Event Processing (Manning, 2015).

For example, using the combination of Kafka and Storm, it would be possible to remove duplicate clicks from our click-streams topic, as mentioned earlier. This would be achieved using a Kafka spout, such as storm-kafka, which extracts data from Kafka and acts as a data source for a topology. Figure A.7 shows an example.

Figure A.7. In this example, a consumer group of spouts is consuming from the Kafka cluster that has been partitioned by GUID. This ensures that each consumer spout receives all the data for a given user. Through Storm’s field grouping, we can also partition data on GUID as we send this data to one of several deduplication bolts that operate on the incoming clicks. Field grouping again on GUID, we can send these to one of several producers that will write these back into a new, deduplicated topic, again partitioned by GUID.

In this configuration, we’re using the power of Kafka plus Storm to deduplicate clicks in our click stream that occur within a short time of each other. The consumer group illustrated consists of Storm consumer spouts, which pull data from Kafka in the same way as any other consumer but forward information in a way that’s configurable to Storm. In this case, we use the Storm field grouping on GUID, sending the data on to three bolts that will perform the deduplication. Much in the same way that Kafka works, this grouping ensures that all user data for a given user will end up at a single bolt. This is necessary to enable us to perform the deduplication. The output from each of the bolts is a stream minus the duplicate data, and in this case we choose to use the field grouping again to send the data to the producers. This keeps per-user event ordering within a given producer. The producers then send messages back into the cluster, partitioning by GUID into a new topic, which will contain the deduplicated data. Because the data is processed in the order in which it arrives, per-user deduplicated data should arrive at its particular partition in more or less the same order as the duplicate data.

You’ll notice that we say “more or less.” This doesn’t sound very strict! Storm allows for several levels of guarantees with respect to ordering. In its simplest mode, messages may be dropped—or even replayed if the message times out—creating multiple entries in the output. For exactly-once semantics, Storm Trident must be used, which is an abstraction to provide enhanced guarantees on topology execution. Using this abstraction, we could guarantee that our new deduplicate topic would be in exactly the same order as the original, minus the duplicate clicks.

Note the scalability here. Each of the four steps can theoretically be scaled in parallel without breaking the semantics of the system. Using Kafka plus Storm with field groupings ensures parallel processing pipelines for subsets of the GUID space, which facilitates efficient scaling as the number of users in the system increases.

Just a quick note on the deduplicate bolt. Removing duplicate clicks is a fairly simple process if we can use the ordering of arriving data. For each click that occurs for a given user, we can store this in local memory, listening for any further clicks for that user within a given timestamp offset, dropping these messages if they occur, and throwing away that item from memory after an item greater than that window has been seen for that user. This works only if the per-user ordering is guaranteed throughout the system (consider what would happen if a message was replayed into one of the deduplicate bolts out of order, especially if that were outside the deduplicate window). If we were to choose this pattern, we’d need to give careful consideration to the number of bolts required and the memory required by each bolt.

Kafka plus Hadoop

Working with data in flight is a powerful mechanism. This allows data to be transformed as it flows into our system. The downsides to such an approach, however, are that we need to know the type of transformations up front! In our previous example, we knew all about deduplication and so could deploy this within a stream-processing architecture quite easily.

There are many cases where we don’t know in advance what we’d like to calculate, and this forms much of the workload of a traditional analyst. In order to facilitate this, we need to present the data to Kafka in a queryable manner, and to illustrate this we’ve chosen Hadoop (with some other enabling technologies).

Apache Hadoop (https://hadoop.apache.org) is a software framework that allows large datasets to be processed in parallel across many machines. It uses a simple processing paradigm known as MapReduce to perform computation in a distributed fashion much more quickly than could be achieved on a single machine. Hadoop is an extremely powerful framework, and we urge you to refer to the associated literature to get a better feel for its use.

Integral to Hadoop is the Hadoop Distributed File System (HDFS), which creates a file-system abstraction on top of a distributed cluster of machines. Data residing here can be operated on using MapReduce to generate results from extremely large datasets in reduced time frames. Consequently, getting data from Kafka and into HDFS for processing might be something you want to do!

Figure A.8 shows a simple architecture that makes this a possibility using the Camus project (https://github.com/linkedin/camus). Camus has been built as a MapReduce job to read data from topics in Kafka and write into HDFS in a distributed fashion.[7] In short, this allows for parallel loading of data from one system to the other. This is an extremely powerful feature when you’re working with very large datasets. From the point of view of Kafka, Camus is just another consumer. Upon execution, Camus performs topic discovery, loads in its own partition offsets from HDFS, and allocates the partitions to a fixed number of tasks (MapReduce executions)—these consume the data and write to HDFS. As with other consumers you’ve encountered so far, Camus is based on pull semantics, and thus a scheduler must trigger execution.

7

At the time of writing, Camus is being phased out and replaced by a next-generation project called Gobblin (https://github.com/linkedin/gobblin). This extends Camus and unifies it with several other LinkedIn data-ingestion projects.

Figure A.8. Overview of Kafka plus Hadoop integration. Camus is used to generate MapReduce jobs, which extract data from Kafka in parallel, placing the data in HDFS. Camus jobs must be executed periodically; hence the scheduler. High-level languages such as Hive and Pig can then be used to query the imported data.

With data residing in HDFS, it will now be possible to run MapReduce jobs or register the data with a higher-level query language, such as Apache Hive (https://hive.apache.org) or Pig (https://pig.apache.org). Such languages provide a familiar programming paradigm for analysts running ad hoc queries over the imported data.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset