The previous chapter introduced a gamut of Giraph APIs and showed how you can use them for various graph-processing applications. This chapter pops the hood on Giraph implementation and shows you what transpires behind the scenes when a graph-processing application is executed and APIs are called.
For the rest of this chapter, you use the example Giraph applications introduced in Chapter 5. Assume that the size of the graph on which the application operates is measured in billions of lines of input. With this assumption in place, the chapter traces every step of the application run on a Hadoop cluster. You start by learning about the functional roles of the different services that form a running Giraph application, how these services are assigned to compute nodes in the cluster, and how they coordinate their activity to perform an overall graph-processing task. You also look into steps that are needed to partition the input and load the initial graph topology in such a way that each vertex is assigned to a worker that performs compute operations on it. You then dive deep into the implementation details of each service and close the chapter by examining failure scenarios that a Giraph implementation needs to cope with.
So far, you have only used Giraph on tiny data sets that fit comfortably into the disk and memory space of a single host. Although Giraph is certainly applicable for use with small and medium-size graphs, its true power comes from its scalability. To give you a taste of the level of scalability required from a Giraph implementation, consider its use at Facebook: analyzing the social graph of the social network’s members. In 2013, the total number of vertices was estimated at around 1.1 billion, and the number of edges was well into the trillions. Even if storing each chunk of data associated with either a vertex or an edge takes only hundreds of bytes, you are approaching petabyte scale for the graph representation alone. Storing datasets of this size on a single node is impossible; and analyzing them using the CPU resources of a single node would be prohibitively time consuming. The only way to scale for both storage and compute requirements is to exploit clusters of commodity hardware.
It should come as little surprise, then, that a running Giraph application is a collection of distributed, networked services running in parallel on different nodes in the cluster. Each of these services provides a particular function for the rest of the Giraph application, has its own lifecycle, and interacts with the rest of the services running on different nodes via remote API calls. Because the majority of these interactions are network-based, all the complications of network programming (collectively known as fallacies of distributed computing) need to be dealt with. For example, the Giraph implementation spends a lot of time tuning the lifecycles of its internal services to enable the most efficient recovery in case of an individual service failure.
FALLACIES OF DISTRIBUTED COMPUTING
Back in 1994, a few smart engineers working at Sun Microsystems started to realize the power of distributed computing that has been unlocked by advances in local network design. Around the same time, one of those engineers, John Gage, coined the phrase “the network is the computer” to capture the company’s new direction. That was the good news. The bad news was compiled by another Sun engineer (Peter Deutsch) and delivered to future generations of engineers as a list of false assumptions to watch out for when designing this next-generation computer architecture. The list is known as the fallacies of distributed computing, and it is as relevant today as it was 20 years ago:
Keep this list in mind when reading this chapter—it will help you appreciate quite a few of the architectural decisions made by the Giraph implementation.
Giraph Building Blocks and Concepts
Regardless of whether you are running the example application on a single host (as you did in Chapter 5) or on a 1,000-node cluster, everything Giraph does boils down to making sure a piece of Java code that acts as a particular type of a network service can be executed on a given number of hosts. Before moving on, let’s quickly define the three types of network services that serve as fundamental building blocks for Giraph architecture: masters, workers, and coordinators.
Every Giraph computation is a result of the coordinated execution of these services, operating on subsets of vertices that are split into partitions and processed in parallel. Partitioning is a key optimization, allowing parallel execution of graph-processing tasks; it is dynamic, meaning the assignment of partitions to workers can change based on the characteristics of the running application. At every superstep, however, the overall functional structure of a Giraph application looks like Figure 6-1. All the services represented in this figure are internal to the Giraph implementation and are not visible to the end user, but it is still important to know their architecture—if for no other reason than to be able to debug failures in Giraph application runs.
Figure 6-1. Functional structure of a running Giraph application
There are always one active master service and a few standby masters bidding to become active if the current master fails. The standby masters are dormant and don’t play an active role in the lifecycle of a running Giraph application. Once a master becomes active, its job is to coordinate computation. This task consists of doing the following:
Workers represent the majority of Giraph services. Their primary function is to manage the state of graph partitions assigned to them. Each worker exposes a set of network APIs to let remote workers manipulate the data in partitions assigned to it. Again, note that these network APIs are internal to the Giraph implementation and are not expected to be available to end users. Workers transition from superstep to superstep as directed by the active master. During each superstep, workers iterate over the graph partitions they own and execute the compute() method for all the vertices belonging to these partitions. Workers are also responsible for checkpointing their state from time to time as a means of recovery from a worker failure.
Note that the assignment of graph partitions to workers is not permanent and is subject to master-driven rebalancing before every superstep. A class implementing the MasterGraphPartitioner interface provides the implementation of the partitioning logic. An instance of that class belonging to an active master manages the current mapping of partitions to workers and provides the generateChangedPartitionOwners() method for reevaluating the mapping based on the various statistics coming from the workers. The master updates the mapping via the coordination service. Workers, on the other hand, look up which partitions no longer belong to them and re-create the state of those partitions on a target worker by issuing network API calls.
Nodes running the coordination service provide the nervous tissue for the rest of the Giraph services. They don’t participate in performing any graph-processing work; instead, they provide distributed configuration, synchronization, and naming registry services for the rest of Giraph.
There are several coordination service implementations you can use, but the default choice in the Hadoop ecosystem has always been Apache ZooKeeper. The ZooKeeper service is so ubiquitous that it is hard to imagine a production Hadoop cluster deployment running without it. It has become synonymous with the coordination of any application in the Hadoop ecosystem.
A collection of nodes running a coordination service is collectively known as a ZooKeeper ensemble. All nodes in the ensemble are considered peers, and each node has a replica of a ZooKeeper state as a means of achieving high availability and fault tolerance. As long as the majority of the nodes are up, the service will function correctly.
Giraph provides two options for managing coordination services. The default option is to spin up ZooKeeper ensemble nodes on demand as part of a Giraph application run. That way, coordination services are no different from masters or workers: they are fully managed by Giraph, and they vanish once the application exits. The second option is to use a stand-alone ZooKeeper ensemble that is centrally maintained as part of the cluster. In that case, Giraph applications act as pure clients, and there is no need to have the coordination service running on nodes used by the Giraph application.
The second approach is recommended for production use of Giraph. It allows for easier postmortem and real-time debugging (because the coordination service stays alive after the Giraph application exits). Also, because it uses a cluster-wide, centralized, highly tuned, well-maintained ZooKeeper ensemble, your application’s performance may be better.
Consider any of the examples from Chapter 5, but this time assume that you have to deal with a much bigger input dataset. The size of the dataset makes it prohibitively expensive to execute the application on a single host and requires you to use a cluster. The good news is that you don’t have to change the application to make it run on a cluster of compute resources: the Giraph architecture is flexible enough to scale elastically. All you have to do to begin using a cluster is to tell Giraph how many compute resources it can use for which purpose.
Setting the configuration property giraph.maxWorkers (or passing the same value via the command-line option –w) tells Giraph how many compute resources should be used to execute worker services. Another configuration property, giraph.zkServerCount (which has a default value of 1), tells Giraph how many compute resources need to be allocated for running master and coordination services. Finally, the total number of cluster resources used by an application is determined based on the configuration property giraph.SplitMasterWorker. If that property is set to false, all services can run on all cluster resources, and the total number of resources used is equal to the value of giraph.maxWorkers. On the other hand, if it is set to true (its default value), the total number of utilized resources is the sum of giraph.maxWorkers and giraph.zkServerCount.
For the rest of this chapter, assume a cluster of at least ten nodes so you can execute the example application with giraph.maxWorkers set to 8 and giraph.zkServerCount set to 2. Also assume the default value (true) for giraph.SplitMasterWorker so the total number of utilized cluster resources is exactly ten (eight plus two). Those resources will be used to run exactly eight worker services, two master services, and two coordination services (with the master and coordination services co-located together).
With these settings in mind, let’s look into the underlying mechanism that pushes the required bits of Java code for each service to the desired cluster resources and makes them behave like a set of eight worker services, two master services, and two coordination services. See Figure 6-2.
Figure 6-2. Running an example on a cluster of 10 hosts
Although it is nice to know that given the right combination of command-line options, Giraph can start using the power of a Hadoop cluster, the question you answer in this chapter is how that happens. One way to distribute services on a cluster of compute resources requires building a custom Giraph-specific cluster-management layer, purposefully designed with a graph-processing application in mind. There is nothing wrong with doing that; Apache Hama (a graph-processing framework very similar to Giraph) made exactly that type of design decision. It allows for a greater degree of control, but the price is maintaining low-level cluster-management code that has nothing to do with Graph processing.
Unlike Hama, Giraph decided to use existing cluster-management solutions to do all the heavy lifting. Apache Hadoop is the most popular solution and also the one that comes by default with the Giraph distribution. It is not the only option, though. Even within the Hadoop project, Giraph has two alternatives: a more classic MapReduce framework (also available in prior versions of Hadoop) and a brand-new collection of resource-management APIs known as YARN. The YARN back end is still experimental, and this chapter assumes that the example application is executed using the Hadoop MapReduce framework.
WHEN MAPREDUCE IS NOT REALLY MAPREDUCE
The MapReduce framework for distributed data analysis was first described by two Google engineers (Jeffrey Dean and Sanjay Ghemawat) in the seminal 2004 paper “MapReduce: Simplified Data Processing on Large Clusters.” The core idea behind the framework was simple enough, but it required a radical shift in application design. The paper proposed that every data-processing application should be built around two things: a piece of code called a mapper and another piece of code called a reducer. Both pieces of code are transparently instantiated on a large cluster of compute resources, thus allowing the application to process as much data as there are compute resources. In other words, thousands of mapper copies run in parallel, all supplying data to hundreds of reducers, also running in parallel. The programmer is free from low-level cluster-management plumbing and can focus on implementing the mappers and reducers.
Apache Hadoop was the first highly popular, open source implementation of a MapReduce framework. Its availability led to explosive growth of data-processing applications that can be neatly decomposed into mappers and reducers. Graph processing is not one of those applications; it is typically much more iterative in nature, and it requires a different kind of data partitioning.
Thus, when running with a MapReduce back end, Giraph doesn’t have a chance to exploit the spirit of the framework. All it needs from Hadoop is a way to push a bit of its own code to a given number of compute resources. The trick it plays is that it constructs a fake MapReduce application with a fixed number of mappers and no reducers. This is known as a map-only application. Because it is disconnected from all the Hadoop data pipelines from Hadoop’s standpoint, it “runs forever” (as opposed to a normal MapReduce application, which runs as long as unprocessed data remains). Because of that, the Giraph implementation has to deal with ingesting the initial data and limiting the application’s runtime.
Even though a running Giraph application looks to a Hadoop cluster like a MapReduce application, it has very little to do with the behavior of a typical MapReduce job.
How does Giraph use MapReduce to spin up all the required internal services? It involves encapsulating Giraph’s business logic in the GraphTaskManager class implementation and expecting it to be instantiated on every node available to the Giraph application. The MapReduce Hadoop framework expects GraphTaskManager to provide a way to execute a mapper. Giraph doesn’t need to worry about the details of how an instance of GraphTaskManager is created; rather, it focuses on the job it needs to perform in the overall mesh of Giraph services. When Giraph uses MapReduce as a back end, the job of instantiating a GraphTaskManager object belongs to a mapper. This is the only thing this unusual mapper ever does.
At the highest level, the entire process of executing a Giraph application on a cluster of machines consists of the following steps (as illustrated in Figure 6-3):
Figure 6-3. Bootstrapping a Giraph application on a cluster of nodes
Despite its simplicity, this architecture means Giraph doesn’t have to be concerned with the details of finding appropriate cluster resources, scheduling containers to run there, restarting containers on different hosts if they fail, and otherwise allocating low-level resources. This is a very elastic approach that lets the Giraph application have as many workers as needed; the cluster-management framework is responsible for maintaining the worker pool. This is all good news, except for one detail: containers can be instantiated on any host in a cluster, and how can they communicate with each other if they don’t know the host names (or IP addresses) of other services?
The answer to this question lies in Giraph’s use of a coordination service that every container can communicate with. Different Giraph services use the coordination service as a bulletin board or directory where all containers can post information about themselves (such as their network coordinates, operational status, and so on) and inspect information posted by others. Thus, instead of every container keeping track of the network address and state of every other container, the only information needed to bootstrap the Giraph framework is the location of a coordination service.
An interesting consequence of a coordination service playing the role of central information radiator is that it bears the brunt of communicating with every container. To avoid overwhelming it, the Giraph implementation tries to minimize the amount of traffic between each container and the coordination service. The approach is to manage a small amount of metadata via the coordination service and opt for direct communication between containers whenever possible.
As long as the Giraph services are running as they should, a cluster-management framework such as Hadoop stays out of the way. If services fail (due to software bugs or hardware failure), it is up to the cluster-management framework to spin up new instances. The coordination service plays a vital role in allowing the Giraph implementation to detect when services fail and when they are being brought back online (perhaps on a different host). That way, an active master always has a way to rebalance the current graph computation between the active workers.
Of course, detecting service failure and bringing back new instances solves only half the problem. When a service goes down, it also brings down its in-memory state. It would be unfortunate if Giraph had to restart an entire computation simply because one service out of thousands went down. An elegant solution to that problem is to use periodic checkpoints. At a given interval (requested via the giraph.checkpointFrequency configuration property), Giraph records the state of all the services in a permanent location in HDFS (under the home directory of a current user). Suppose giraph.checkpointFrequency is set to 5. This means after every five supersteps, the state of all the services is checkpointed. Should any service fail, the computation must redo at most five recent supersteps, because it is restarted from a checkpointed state. The exact value the checkpoint frequency is set to is highly application specific. On one hand, more frequent checkpointing increases the overall runtime of the application; on the other, it speeds up recovery if a worker fails. Experiment with different values, and pick the one that works for you. Just keep in mind that the default value for this property is 0: by default, checkpointing is disabled.
Now that you understand the mechanics of how Giraph uses cluster compute resources, let’s proceed with a detailed overview of what each Giraph service does and how they interact.
Anatomy of Giraph Services
Once fully bootstrapped and running, every Giraph application consists of a network of services that, collectively, accomplish graph processing by communicating with each other via network API calls. The two services implemented by Giraph (masters and workers) share a common design based on the CentralizedService interface and the BspService abstract class implementing it. The coordination service is implemented separately by a stand-alone Apache ZooKeeper project and is not discussed in this book.
WHAT IS APACHE ZOOKEEPER?
Apache ZooKeeper is an effort to develop and maintain an open source server that enables highly reliable distributed coordination. It was developed in response to the proliferation of highly distributed applications, all trying to implement similar functionality in an ad hoc fashion. You can find information about the design and implementation of ZooKeeper at http://zookeeper.apache.org or in a book written by two of its authors, Flavio Junqueira and Benjamin Reed: ZooKeeper: Distributed Process Coordination (O’Reilly, 2013).
All the functionality offered by masters is implemented in a BspServiceMaster class. On the worker side, BspServiceWorker is the corresponding implementation. Both classes extend the common functionality of BspService by subclassing it and implementing more specific interfaces, as shown in the class hierarchy in Figure 6-4.
Figure 6-4. Giraph services class hierarchy
Both services act as network servers and clients at the same time by using the classes implementing the following interfaces:
Currently, the only implementation available for all four of these interfaces is based on the Netty framework. Netty is a new I/O (NIO) client/server framework that enables quick and easy development of network applications such as protocol servers and clients. Don’t worry if you are not familiar with Netty; as long as you understand that it implements NIO, you should have no trouble understanding how Giraph uses it. Giraph abstracts that away by wrapping the code that provides general-purpose, Netty-enabled client/server interactions into two stand-alone classes: NettyClient and NettyServer. That part has little to do with the Giraph graph-processing model. The part that does is then wrapped in the four classes shown in Figure 6-5.
Figure 6-5. Netty-enabled client/server architecture
As you can see, each of the classes implements the appropriate interface by delegating all non-Giraph-specific client/server interactions to a NettyClient or NettyServer object that it owns.
Determining which node should provide which Giraph service is handled by GraphTaskManager.setup(). That determination is permanent during the Giraph’s application run and is driven by the configuration parameters given to the Giraph framework. For example, if an external ZooKeeper ensemble has been specified via the giraph.zkList configuration option, then there is no need for Giraph to assign this service to the nodes it manages. The implementation has the flexibility to assign any combination of services to a given node, with one exception: a coordination service cannot be the only service assigned to a node. In other words, nodes running coordination services always have an additional master or all three services running there as well. The only two impossible combinations of node service assignments are these:
Now that you have enough background about the generic aspects of how Giraph implements its master, worker, and coordination services, it is time to dive deeper into their detailed architecture.
The master service always runs as an extra Java thread implemented by the MasterThread class. An instance of that class owns a reference to the BspServiceMaster object and implements its functionality by orchestrating calls to the APIs exposed by BspServiceMaster. The master algorithm is pretty simple, but it needs to be designed in a way that is resilient if the active master fails and a different master (typically running on a different host) assumes the responsibilities of an active master. Figure 6-6 shows the high-level steps that every master service goes through.
Figure 6-6. A single master service lifecycle
The first thing a master does is place a bid on becoming an active master via a BspServiceMaster.becomeMaster() call. Note that although all masters issue this call, all but one will block. The one that succeeds will determine the active master; the others will stay blocked until the active master fails. At that point, one of the blocked calls will return and thus determine the next active master. This synchronization barrier is implemented by using the functionality of a coordination service, as you see later.
On becoming an active master, the master thread instantiates a MasterCompute object, provided it was requested as part of the Giraph application configuration. This, in turn, has a chance to register the application aggregators on the active master.
Before entering a superstep loop, an active master tries to perform one more important function: creating a set of input splits for the graph data (both vertex and edge data) to be loaded by the appropriate input formats. Because Giraph applications typically operate on a set of huge files stored in a distributed filesystem, effectively spreading the work of loading that data among all the available workers is a key scalability requirement. This functionality is implemented in the BspServiceMaster.createInputSplits() method, and it works as follows. First it consults a coordination service to see if the mapping of which worker is supposed to load which file has been established. If it hasn’t, the map is generated and externalized into the coordination service. This accomplishes two things at once. First, because every worker is watching the coordination service for updates, once the mapping is made available, the workers can load their portion of the input data (the details of this step are laid out in the section “Coordination Services” below). Second, persisting the mapping to the coordination service as a transaction makes it possible for the standby master to effectively take over in case the active master dies.
Once the input splits’ mapping has been made available to all the workers, the active master transitions into the superstep loop that runs for as long as supersteps remain. The bulk of the functionality driving the superstep loop is implemented in the BspServiceMaster method coordinateSuperstep() and consists of the following steps:
Once the superstep loop is over, the only thing left for the master to do is to call the cleanup routine BspServiceMaster.cleanup(). All the master processes should signal that they are done by posting a special kind of note to a coordination service. When the number of notes equals the number of partitions for workers and masters, the master cleans up the global state associated with a job. This is how a global barrier is implemented with the help of a coordination service.
Getting back to the example application, there are always exactly two master services running: one active and one standby. Should the active master fail, the standby resumes coordination activities. As long as the master is active, its superstep loop goes through the steps outlined in Figure 6-7.
Figure 6-7. Example application’s active master superstep loop
The key function of a worker service in a Giraph application is to manage the state of a few graph partitions assigned to it by the master. Just as with the master, the lifecycle of a worker is managed by the GraphTaskManager.execute() method orchestrating calls to the API methods exposed by the object of the BspServiceWorker class, which provides the bulk of the implementation.
Each worker service operates under the same resiliency constraints that masters operate under. Worker failure is a norm in a Giraph application, and the worker algorithm should be designed with that possibility in mind.
The algorithm consists of two phases: the setup phase and a superstep loop phase. The setup phase is mostly concerned with loading initial graph data into the memory of all the workers. The key steps of how it happens are outlined in Figure 6-8.
Figure 6-8. Setup phase of the worker service lifecycle
Not surprisingly, BspServiveWorker.setup() provides the implementation of a setup phase, which, before it loads the data, checks whether setup() was called as part of restarting the failed superstep. If that’s the case, it loads the state from the checkpoint and bails, because the reset of the setup() machinery only applies when the entire Giraph application is initialized for the first time:
Once all workers are ready to transition to the superstep loop, they do so at once and enter the main computational loop of the Giraph application. This worker service superstep loop cycles through the following steps as long as supersteps remain:
Once again, if you turn to the example application, each worker (once it enters the superstep loop phase) goes through the set of transitions outlined in Figure 6-9.
Figure 6-9. Setup phase of the worker service lifecycle
The worker superstep loop is the central process driving the execution of every Giraph application. Most of the steps are pretty easy to follow, and the only part worth reiterating is the implementation of sharded aggregators. Steps 4 and 6 are the key to understanding how sharded aggregators work.
First, during each superstep, values provided to the aggregators are partially aggregated by the workers locally. No network API calls are involved. Once the superstep is done, all of these partially aggregated values coming from different workers need to be aggregated once more, and the resulting value must be made available to MasterCompute.compute(). One option is to let the master manage all the aggregators because their value needs to end up there anyway. This simple approach unfortunately doesn’t scale if an application uses a lot of aggregators; it turns the master into a bottleneck from both the computation and network communication standpoints.
The approach that Giraph takes is to dynamically shard the aggregators between the workers in the cluster. Here’s how it works: when the superstep ends, each aggregator is assigned to one of the workers, and that worker becomes responsible for receiving partially aggregated values from its neighbors. It is also expected to push the final aggregated value back to the master. That way, the master doesn’t need to perform any aggregation, and it always receives one final value for each aggregator. Once the values are received by the master and it is finished executing MasterCompute.compute(), the only worker to which that master sends each value is the owner of that aggregator. As a final step, the owners are expected to distribute these values to their neighbors.
Coordination Services
As noted earlier, a coordination service is not implemented by Giraph. The implementation comes from a stand-alone Apache ZooKeeper project. Still, it is important to understand the APIs it offers and how masters and workers use them. Most of the data that Giraph puts into a ZooKeeper is permanent, so knowing the details of how Giraph externalizes its state can be an invaluable tool for debugging failed applications. And if you want to make it even easier to observe and diagnose running Giraph applications, run them with an external ZooKeeper ensemble (by setting the giraph.zkList property) instead of relying on Giraph to spin up coordination services on demand. By running with an external ZooKeeper ensemble, you can inspect the internal state of the coordination service (and thus the internal state of your Giraph application) whenever you like, as opposed to only while the application is running.
One of the key properties of ZooKeeper is that the service it provides is highly available. The way it is implemented requires that all nodes that are part of the ZooKeeper ensemble know the network addresses (hostnames and ports) of every other node that is part of the same ensemble. That way, they can make the best effort to synchronize the common state between as many nodes as possible. ZooKeeper clients (remember, Giraph is only a ZooKeeper client) talk to only one ZooKeeper node at a time. But even the client has to know the network addresses of as many nodes in the ZooKeeper ensemble. After all, if the node the client is currently talking to goes down, the client has to know where to reconnect.
As long as Giraph uses an external ZooKeeper service, the network addresses of all the nodes in the ensemble are passed to the worker and master services as a static, comma-separated list specified via the giraph.zkList property. If, on the other hand, Giraph needs to spin up a coordination service on demand, it can no longer know in advance what nodes in the cluster those services will be instantiated at. It is a curious chicken-and-egg problem: Giraph needs ZooKeeper to store all the information that must be shared between services running on different nodes, but the network addresses of the ZooKeeper nodes are exactly the kind of information that need to be stored in ZooKeeper. It seems as though Giraph needs ZooKeeper to be able to bootstrap ZooKeeper. How do you avoid this infinite regression?
Giraph uses another storage substrate that all Giraph services know how to access: HDFS. Although most of the time HDFS is an extremely poor choice for keeping small files that need to be frequently updated, it is OK to use it on a case-by-case basis. That is exactly how Giraph uses it. It records the network addresses of the ZooKeeper ensemble nodes as names of HDFS files. The exact location where it stores these files is set by the giraph.zkManagerDirectory property; the default value is _bsp/_defaultZkManagerDir (note that because the default value has a relative, not absolute, path name, it is rooted under the home directory of the user running the job).
Now that you know how bootstrapping the ZooKeeper ensemble works and how master and worker services are enabled to connect to it as clients, let’s see what ZooKeeper client APIs they use. From a client perspective, ZooKeeper offers a filesystem-like hierarchical API with a namespace structured as a tree of nodes (each node in a tree, including inner nodes, is called a znode), with every node answering to a small set of API calls. Each znode in the hierarchy can hold some data, be a parent node to other nodes, or, unlike in any of the filesystems, do both. ZooKeeper is designed for coordination, which typically requires small chunks of data stored in znodes (the amount of data is capped at 1MB). All APIs are atomic, which means clients receive either all the data or none of it. When a client creates a znode, it may elect to declare the znode to be ephemeral and thus guarantee that it is visible only as long as the client that created it is connected to ZooKeeper. If the node is not ephemeral, it is considered to be persistent, and an explicit delete call would be required to remove it from the tree. Both types of znodes can have an additional property of being sequential (thus ZooKeeper znodes fall into one of four categories: ephemeral, ephemeral sequential, non-ephemeral, or non-ephemeral sequential). A sequential znode’s name is derived from the initial name supplied as part of the create API call and the sequence number ZooKeeper assigns it. So, if multiple clients try to create a sequential znode with the same path name, each of them will end up creating a unique znode with ordering guaranteed by the ZooKeeper service. Finally, clients can register watches that are triggered when other clients perform certain types of operations on the tree of znodes. Events such as creation, deletion, and modification can be tracked that way.
INSPECTING THE ZOOKEEPER ZNODES TREE
By default, Giraph doesn’t provide any tools to inspect the tree of znodes it maintains in the ZooKeeper service. It is highly recommended that you download and install Apache ZooKeeper separately and use the command-line utility zkCli.sh or zkCli.cmd to browse the file tree. The only command-line argument you need to supply is –server. This is the comma-separated list of ZooKeeper ensemble nodes that the client connects to. Once the client is started, it behaves similarly to an ftp client. Type help to get a quick overview of supported commands.
Giraph keeps all of its coordination metadata and internal state data rooted under the _hadoopBsp znode. By default, it is created directly at the root of ZooKeeper’s hierarchy, but you can also instruct Giraph to root it at a subtree by setting the giraph.zkBaseZNode property. For example, setting giraph.zkBaseZNode to a value of /giraph/examples results in a running Giraph application creating the set of znodes shown in Listing 6-1.
As you can see, Giraph exposes a treasure trove of information in the ZooKeeper state it maintains. An obvious implication of this is help with debugging; a less obvious one is integration with external applications that may need to track the progress of running Giraph applications. Keep in mind that the location and content of Giraph znodes are considered internal APIs of Giraph and may change without notice between releases. Figure 6-10 gives you a complete view of a hierarchy of znodes used by various Giraph services for coordination and metadata management.
Figure 6-10. Tree of znodes used by various Giraph services for coordination
All the znodes from the example are rooted under /giraph/examples/_hadoopBsp/job_1394131425944_0001. This common prefix was determined by the giraph.zkBaseZNode setting and the ID of a graph-processing job currently run by Giraph. Under that common root are roughly five different classes of znodes, color-coded according to the function they serve. Magenta znodes (names starting with _vertexInputSplit) are used to coordinate workers processing splits. Yellow znodes (names starting with _edgeInputSplit) are used for the same purpose when edge-split processing is requested. For both of these, the postfix of the znode name determines what it is used for:
Using these four znodes, you can create a fault-tolerant work-sharing scheme with an active master coordinating the processing and assignment of edge and/or vertex input splits as part of creating the initial graph topology. Green znodes are used for electing the active master (_masterElectionDir) and also for keeping the global, master-specific job state in a safe location (_masterJobState) in case an active master dies. Violet znodes are used to signal from the client side to stop the current graph-processing job and also to track worker cleanup activities:
Finally, blue znodes are the heart of graph processing. Each graph-processing application goes through a number of attempts to run a given graph-processing job the same way a Hadoop MapReduce execution goes through a certain number of attempts to run a mapping phase before declaring a failure. All the attempts are tracked under the znode path _applicationAttemptsDir/<attempt ID>. Within each attempt’s znode, _superstepDir tracks supersteps by ID (starting from -1). During each superstep, the master coordinates the progression of work among workers and rebalances the graph partitions. The following znodes are used to do that:
Most of the time, the Giraph implementation doesn’t interact with ZooKeeper client APIs explicitly, and instead wraps all API calls into methods provided by the ZooKeeperExt class. This class not only acts as a façade for ZooKeeper client APIs but also provides extended functionality such recursive path creation and non-atomic operations.
The majority of znodes created by Giraph fall under the category of regular (non-ephemeral, non-sequential) znodes; working with them is very similar to working with files in a regular filesystem. There are, however, a few exceptions when Giraph needs to use the APIs provided by the ephemeral and sequential flavors of znodes:
This concludes the review of the design and implementation of each of the three services: masters, workers, and coordinators. So far, the chapter has focused on the normal execution path and has not spent much time covering fault-tolerance considerations. This is the subject for the rest of the chapter.
As mentioned earlier, much of the Giraph architecture is dictated by the need to effectively address the issues captured by the fallacies of distributed computing. The biggest one often is the fact that at Giraph’s typical operating scale, nothing can be assumed to be reliable.
HOW OFTEN DOES IT BREAK?
Everybody knows Google is an authority on managing the world’s information. What’s less well known is that based on its track record of building and operating large datacenters, Google has become one of the leading authorities on datacenter buildout and maintenance. The company is extremely careful about the design of everything that goes into its datacenters, but even Google, according to Google Fellow Jeff Dean (in his “Software Engineering Advice from Building Large-Scale Distributed Systems” talk), sees a lot of things go wrong.
In each cluster’s first year, it’s typical for 1,000 individual machine failures to occur; along with thousands of hard drive failures. One power distribution unit will fail, bringing down 500 to 1,000 machines for about 6 hours; 20 racks will fail, each time causing 40 to 80 machines to vanish from the network; 5 racks will “go wonky,” with half their network packets missing in action; and the cluster will have to be rewired once, affecting 5% of the machines at any given moment over a 2-day span, Dean says. And there’s about a 50% chance that the cluster will overheat, taking down most of the servers in less than 5 minutes and requiring one or two days to recover.
So far in this chapter, you have seen many techniques employed by Giraph to withstand various failure scenarios. This section looks at those scenarios and reviews the steps Giraph must go through to recover from disk failures, node failures, and network failures.
As mentioned, Giraph is not concerned with managing storage. It does, however, interact with various storage layers to accomplishing the following goals:
For both use cases, what happens when a disk fails depends a great deal on what storage substrate is being used. In the case of a correctly deployed and configured HDFS, Giraph will not notice a failure rate of a few disks per day. HDFS is specifically designed to cope with those types of failures behind the scenes and mask them from the client. This is accomplished by making copies of all the data on at least three nodes (one on the same server rack and one on a different server rack). As far as metadata (a mapping between file names and blocks) is concerned, it is also typically stored in a few alternative locations. This makes Giraph checkpoints and the input/output formats operating on storage frameworks riding on top of HDFS (HBase, Hive, and so on) extremely reliable and oblivious to the disk failure rate of a typical datacenter. On a correctly configured HDFS deployment, the failure of a Giraph application due to a disk failure requires a very improbable set of events (all three disks hosting the same block failing at the same time).
Of course, however improbable it is, it can still happen. In addition, there are input/output formats for non-HDFS backed storage frameworks. These frameworks may not be as tolerant to individual disk failures as HDFS is. Either way, if a disk fails and a storage framework cannot silently recover, the error is propagated to the level of Giraph. At that point, if the error was encountered while an input/output format was reading or writing the data, the application will fail with an error message.
Node Failure
Because nodes in a cluster typically host a collection of different services, a useful way to analyze what happens when a node fails is to look at what happens when a particular service fails. That way, a failure of a single node can be viewed as a complex event consisting of a simultaneous failure of all the services hosted by it.
A master service can be in one of two states: it can be blocked on a bid to become an active master, or it can function as one. The master service that is blocked on a bid doesn’t have any internal state. Thus, when it fails, the only event of any significance is that the znode advertising its bid for becoming an active master disappears from the coordination service. There is no internal state to recover (because a blocked master doesn’t have any state), and additional master services can be spun up (and wait to become active) very easily.
A failure of an active master is more complex. If an active master fails, its ephemeral, sequential znode disappears as well, and that event wakes up all the master services blocked on a bid to become active. One of those master services will discover that its znode is now the first child of _masterElectionDir and will proceed to become an active master (the rest remain blocked on a bid). Because an active master keeps most of its state in a coordination service, a newly appointed active master doesn’t need to recover the state of the failed master. Even the aggregator values don’t need to be recovered, because they are fetched from the workers they are assigned to at the end of every superstep.
All in all, regardless of where an active master fails, a new active master can start from the beginning of coordinating the current superstep, fetch the state from the other services, and retrace all the steps just before the previous master failed.
The purpose of the worker service is to manage the graph partitions and the aggregators assigned to it. This means, unlike a master service, a failed worker brings down quite a bit of unique state associated with it. The state is unique to each worker and is not redundant, and it would be undesirable for Giraph to have to restart an entire computation because of one failed worker. Fortunately, as mentioned earlier in this chapter, you can protect a long-running computation against this risk by using checkpoints.
Before you go any further with checkpoints, recall that a worker service goes through two phases: a split-loading phase and a superstep loop. During the split-loading phase (designated as a fake superstep with ID -1), each worker is tasked with loading initial graph data, turning it into vertices and/or edges, and sending those objects to an owning worker via network API calls. Each worker tries to load as many input splits as possible. The master monitors overall progress and waits for all the input splits to be loaded and (possibly) checkpointed on the receiving worker. If any worker fails during the split-loading phase, the master aborts the entire application run. This approach means there’s no need for a special recovery mechanism during the split-loading phase.
After the split-loading phase, each worker transitions into the superstep loop. If checkpoints are enabled, you are guaranteed to have the initial graph data safely stored in HDFS. From that point on, whenever a healthy worker fails, its ephemeral znode disappear from _workerHealthyDir and an active master is notified by the coordination service. On receiving this notification, the active master declares the current superstep as failed, immediately restarts its own superstep loop from the last checkpointed one, and instructs all the healthy workers to load the state corresponding to that last checkpointed superstep. Once that is done, the computation continues, and the current superstep is the last checkpointed superstep plus one. Of course, if checkpoints are not enabled, the loss of even a single worker will mean an entire application run will terminate immediately and be marked as a failure.
ZooKeeper is architected to be resilient to the failures of the individual nodes participating in its ensemble by reliably replicating its state to the majority of nodes. It also guarantees a reliable process for nodes to rejoin the ensemble after they go down or are cut off from the rest of the service. As long as the majority of the nodes in the ensemble are available, the service will be available and consistent.
From a client perspective, a single ZooKeeper node failure triggers a connection loss event. But because clients are supplied with a list of all the nodes in the ensemble, they can try to reconnect to the nodes that are still functioning.
If Giraph uses an external ZooKeeper service, the number of nodes in the ensemble is predetermined and can’t be changed. If Giraph is managing its own coordination service nodes, you can control the size of the ensemble by setting the giraph.zkServerCount configuration property. The default value is 1, which doesn’t allow for any kind of recovery. If you have to run with a Giraph-managed coordination service, be sure to set it to at least 3. In general, it is wise to set it to an odd number, because an even number doesn’t add much to the reliability of the overall service compared to the lower odd number. For example, if you set it to 4, ZooKeeper will tolerate at most one node failure (because if more than one node fails, the remaining one or two nodes won’t constitute a majority). This is no different from setting it to 3, which can also tolerate at most one node failure.
A network failure occurs when one node can no longer communicate with a subset of nodes in a cluster. A particularly nasty type of a network failure, known as network partitioning, happens when two subsets of cluster nodes can no longer communicate with each other but retain perfect communication abilities between the nodes in each subset.
Network failures affect Giraph in two major ways. First, network errors may prevent communication between the services (masters, workers, and coordinators) that form a running Giraph application. As you’ve seen in this chapter, the entire graph computation is done by master and worker services communicating via a set of network-enabled API calls. Masters and workers communicate with each other by using the Netty-based implementation of NettyClient and NettyServer. In general, the NettyClient implementation tries to be as asynchronous as possible. The remote API calls made in a superstep are non-blocking and go into a queue of outstanding requests for asynchronous processing. The superstep is considered finished when the queue of outstanding requests is drained. This is accomplished by relying on the synchronization method waitAllRequests(), which blocks until all the outstanding API calls are finished. It doesn’t just passively block, though; it keeps polling for the status of outstanding requests with a frequency determined by the giraph.waitingRequestMsecs property (default value 15 seconds). Each API request sits in a queue of outstanding requests for not more than the time determined by the giraph.maxRequestMilliseconds property (default value 10 minutes). Any requests that take longer than that are considered to have failed and are resent. The size of the queue of outstanding requests is determined by the giraph.maxNumberOfOpenRequests property (default value 10,000).
Because communication with the coordination service is not Netty based but relies on native ZooKeeper Java APIs, it works slightly differently and relies on a different set of timeouts and polling intervals. As you’ve seen, an object of class ZooKeeperExt manages a connection to a coordination service for both masters and workers. As part of its initialization, it expects to receive values for the following network-related ZooKeeper client settings:
Finally, unbeknownst to Giraph, network-related failures may affect the internal state of the services it interacts with. For example, if you are using an external ZooKeeper ensemble and a network partition splits that ensemble in two, different masters and workers may end up interacting with different subsets of a single ZooKeeper ensemble. It is up to the ZooKeeper service to make sure it does whatever is necessary to present a consistent view of the world to all Giraph masters and workers. The last thing you want is an active master that has a different view of the workers than was reported by the workers themselves. This could happen in a scenario where a few workers report back their state to part of a ZooKeeper ensemble that is currently partitioned away from the subset of the ensemble that the master is connected to. The reason it doesn’t happen has to do with how ZooKeeper combats network partitioning; describing the ZooKeeper implementation is outside the scope of this book.
Remember that whenever Giraph services are talking to external distributed services, they rely on those services to either report an error or maintain a fully consistent model of their world. In general, Giraph is not architected for interacting with eventually consistent services.
Summary
This chapter covered these topics:
The key takeaway from this chapter is that the Giraph architecture is fundamentally built around three types of internal network services (masters, workers, and coordinators) running in a distributed fashion and collectively orchestrating the work that needs to be done for each superstep:
Throughout this chapter, you have looked at various “knobs” specific to each of the services and how different setting for these knobs affects the overall execution of a Giraph application (such as setting the number of masters via giraph.zkServerCount or specifying a checkpoint interval by setting a non-zero value for giraph.checkpointFrequency). Overall, the Giraph implementation is extremely tunable; to understand the ways you can change its behavior, you have to dig into Giraph’s source code (start with GiraphConstants) or consult the manual at http://giraph.apache.org/options.html.
Although this chapter is heavy on technical details, it provides an important overview of the internals of the Giraph implementation—something that comes in handy any time you need to debug or tune a Giraph application. One thing you haven’t spent much time on is how Giraph interacts with external storage frameworks by using its flexible input/output format extensions. This is the focus of the next chapter.