CHAPTER 6

image

Giraph Architecture

The previous chapter introduced a gamut of Giraph APIs and showed how you can use them for various graph-processing applications. This chapter pops the hood on Giraph implementation and shows you what transpires behind the scenes when a graph-processing application is executed and APIs are called.

For the rest of this chapter, you use the example Giraph applications introduced in Chapter 5. Assume that the size of the graph on which the application operates is measured in billions of lines of input. With this assumption in place, the chapter traces every step of the application run on a Hadoop cluster. You start by learning about the functional roles of the different services that form a running Giraph application, how these services are assigned to compute nodes in the cluster, and how they coordinate their activity to perform an overall graph-processing task. You also look into steps that are needed to partition the input and load the initial graph topology in such a way that each vertex is assigned to a worker that performs compute operations on it. You then dive deep into the implementation details of each service and close the chapter by examining failure scenarios that a Giraph implementation needs to cope with.

Genesis of Giraph

So far, you have only used Giraph on tiny data sets that fit comfortably into the disk and memory space of a single host. Although Giraph is certainly applicable for use with small and medium-size graphs, its true power comes from its scalability. To give you a taste of the level of scalability required from a Giraph implementation, consider its use at Facebook: analyzing the social graph of the social network’s members. In 2013, the total number of vertices was estimated at around 1.1 billion, and the number of edges was well into the trillions. Even if storing each chunk of data associated with either a vertex or an edge takes only hundreds of bytes, you are approaching petabyte scale for the graph representation alone. Storing datasets of this size on a single node is impossible; and analyzing them using the CPU resources of a single node would be prohibitively time consuming. The only way to scale for both storage and compute requirements is to exploit clusters of commodity hardware.

It should come as little surprise, then, that a running Giraph application is a collection of distributed, networked services running in parallel on different nodes in the cluster. Each of these services provides a particular function for the rest of the Giraph application, has its own lifecycle, and interacts with the rest of the services running on different nodes via remote API calls. Because the majority of these interactions are network-based, all the complications of network programming (collectively known as fallacies of distributed computing) need to be dealt with. For example, the Giraph implementation spends a lot of time tuning the lifecycles of its internal services to enable the most efficient recovery in case of an individual service failure.

FALLACIES OF DISTRIBUTED COMPUTING

Back in 1994, a few smart engineers working at Sun Microsystems started to realize the power of distributed computing that has been unlocked by advances in local network design. Around the same time, one of those engineers, John Gage, coined the phrase “the network is the computer” to capture the company’s new direction. That was the good news. The bad news was compiled by another Sun engineer (Peter Deutsch) and delivered to future generations of engineers as a list of false assumptions to watch out for when designing this next-generation computer architecture. The list is known as the fallacies of distributed computing, and it is as relevant today as it was 20 years ago:

  • The network is reliable.
  • Latency is zero.
  • Bandwidth is infinite.
  • The network is secure.
  • Topology doesn’t change.
  • There is one administrator.
  • Transport cost is zero.
  • The network is homogeneous.

Keep this list in mind when reading this chapter—it will help you appreciate quite a few of the architectural decisions made by the Giraph implementation.

Giraph Building Blocks and Concepts

Regardless of whether you are running the example application on a single host (as you did in Chapter 5) or on a 1,000-node cluster, everything Giraph does boils down to making sure a piece of Java code that acts as a particular type of a network service can be executed on a given number of hosts. Before moving on, let’s quickly define the three types of network services that serve as fundamental building blocks for Giraph architecture: masters, workers, and coordinators.

Every Giraph computation is a result of the coordinated execution of these services, operating on subsets of vertices that are split into partitions and processed in parallel. Partitioning is a key optimization, allowing parallel execution of graph-processing tasks; it is dynamic, meaning the assignment of partitions to workers can change based on the characteristics of the running application. At every superstep, however, the overall functional structure of a Giraph application looks like Figure 6-1. All the services represented in this figure are internal to the Giraph implementation and are not visible to the end user, but it is still important to know their architecture—if for no other reason than to be able to debug failures in Giraph application runs.

9781484212523_Fig06-01.jpg

Figure 6-1. Functional structure of a running Giraph application

Masters

There are always one active master service and a few standby masters bidding to become active if the current master fails. The standby masters are dormant and don’t play an active role in the lifecycle of a running Giraph application. Once a master becomes active, its job is to coordinate computation. This task consists of doing the following:

  • Transitioning the workers from one superstep to the next in a coordinated manner
  • Before each superstep, assigning partitions to active workers
  • Running the master compute code
  • Monitoring the health and statistics of all the workers

Workers

Workers represent the majority of Giraph services. Their primary function is to manage the state of graph partitions assigned to them. Each worker exposes a set of network APIs to let remote workers manipulate the data in partitions assigned to it. Again, note that these network APIs are internal to the Giraph implementation and are not expected to be available to end users. Workers transition from superstep to superstep as directed by the active master. During each superstep, workers iterate over the graph partitions they own and execute the compute() method for all the vertices belonging to these partitions. Workers are also responsible for checkpointing their state from time to time as a means of recovery from a worker failure.

Note that the assignment of graph partitions to workers is not permanent and is subject to master-driven rebalancing before every superstep. A class implementing the MasterGraphPartitioner interface provides the implementation of the partitioning logic. An instance of that class belonging to an active master manages the current mapping of partitions to workers and provides the generateChangedPartitionOwners() method for reevaluating the mapping based on the various statistics coming from the workers. The master updates the mapping via the coordination service. Workers, on the other hand, look up which partitions no longer belong to them and re-create the state of those partitions on a target worker by issuing network API calls.

Coordinators

Nodes running the coordination service provide the nervous tissue for the rest of the Giraph services. They don’t participate in performing any graph-processing work; instead, they provide distributed configuration, synchronization, and naming registry services for the rest of Giraph.

There are several coordination service implementations you can use, but the default choice in the Hadoop ecosystem has always been Apache ZooKeeper. The ZooKeeper service is so ubiquitous that it is hard to imagine a production Hadoop cluster deployment running without it. It has become synonymous with the coordination of any application in the Hadoop ecosystem.

A collection of nodes running a coordination service is collectively known as a ZooKeeper ensemble. All nodes in the ensemble are considered peers, and each node has a replica of a ZooKeeper state as a means of achieving high availability and fault tolerance. As long as the majority of the nodes are up, the service will function correctly.

Giraph provides two options for managing coordination services. The default option is to spin up ZooKeeper ensemble nodes on demand as part of a Giraph application run. That way, coordination services are no different from masters or workers: they are fully managed by Giraph, and they vanish once the application exits. The second option is to use a stand-alone ZooKeeper ensemble that is centrally maintained as part of the cluster. In that case, Giraph applications act as pure clients, and there is no need to have the coordination service running on nodes used by the Giraph application.

The second approach is recommended for production use of Giraph. It allows for easier postmortem and real-time debugging (because the coordination service stays alive after the Giraph application exits). Also, because it uses a cluster-wide, centralized, highly tuned, well-maintained ZooKeeper ensemble, your application’s performance may be better.

Bootstrapping Giraph Services

Consider any of the examples from Chapter 5, but this time assume that you have to deal with a much bigger input dataset. The size of the dataset makes it prohibitively expensive to execute the application on a single host and requires you to use a cluster. The good news is that you don’t have to change the application to make it run on a cluster of compute resources: the Giraph architecture is flexible enough to scale elastically. All you have to do to begin using a cluster is to tell Giraph how many compute resources it can use for which purpose.

Setting the configuration property giraph.maxWorkers (or passing the same value via the command-line option –w) tells Giraph how many compute resources should be used to execute worker services. Another configuration property, giraph.zkServerCount (which has a default value of 1), tells Giraph how many compute resources need to be allocated for running master and coordination services. Finally, the total number of cluster resources used by an application is determined based on the configuration property giraph.SplitMasterWorker. If that property is set to false, all services can run on all cluster resources, and the total number of resources used is equal to the value of giraph.maxWorkers. On the other hand, if it is set to true (its default value), the total number of utilized resources is the sum of giraph.maxWorkers and giraph.zkServerCount.

For the rest of this chapter, assume a cluster of at least ten nodes so you can execute the example application with giraph.maxWorkers set to 8 and giraph.zkServerCount set to 2. Also assume the default value (true) for giraph.SplitMasterWorker so the total number of utilized cluster resources is exactly ten (eight plus two). Those resources will be used to run exactly eight worker services, two master services, and two coordination services (with the master and coordination services co-located together).

With these settings in mind, let’s look into the underlying mechanism that pushes the required bits of Java code for each service to the desired cluster resources and makes them behave like a set of eight worker services, two master services, and two coordination services. See Figure 6-2.

9781484212523_Fig06-02.jpg

Figure 6-2. Running an example on a cluster of 10 hosts

Although it is nice to know that given the right combination of command-line options, Giraph can start using the power of a Hadoop cluster, the question you answer in this chapter is how that happens. One way to distribute services on a cluster of compute resources requires building a custom Giraph-specific cluster-management layer, purposefully designed with a graph-processing application in mind. There is nothing wrong with doing that; Apache Hama (a graph-processing framework very similar to Giraph) made exactly that type of design decision. It allows for a greater degree of control, but the price is maintaining low-level cluster-management code that has nothing to do with Graph processing.

Unlike Hama, Giraph decided to use existing cluster-management solutions to do all the heavy lifting. Apache Hadoop is the most popular solution and also the one that comes by default with the Giraph distribution. It is not the only option, though. Even within the Hadoop project, Giraph has two alternatives: a more classic MapReduce framework (also available in prior versions of Hadoop) and a brand-new collection of resource-management APIs known as YARN. The YARN back end is still experimental, and this chapter assumes that the example application is executed using the Hadoop MapReduce framework.

WHEN MAPREDUCE IS NOT REALLY MAPREDUCE

The MapReduce framework for distributed data analysis was first described by two Google engineers (Jeffrey Dean and Sanjay Ghemawat) in the seminal 2004 paper “MapReduce: Simplified Data Processing on Large Clusters.” The core idea behind the framework was simple enough, but it required a radical shift in application design. The paper proposed that every data-processing application should be built around two things: a piece of code called a mapper and another piece of code called a reducer. Both pieces of code are transparently instantiated on a large cluster of compute resources, thus allowing the application to process as much data as there are compute resources. In other words, thousands of mapper copies run in parallel, all supplying data to hundreds of reducers, also running in parallel. The programmer is free from low-level cluster-management plumbing and can focus on implementing the mappers and reducers.

Apache Hadoop was the first highly popular, open source implementation of a MapReduce framework. Its availability led to explosive growth of data-processing applications that can be neatly decomposed into mappers and reducers. Graph processing is not one of those applications; it is typically much more iterative in nature, and it requires a different kind of data partitioning.

Thus, when running with a MapReduce back end, Giraph doesn’t have a chance to exploit the spirit of the framework. All it needs from Hadoop is a way to push a bit of its own code to a given number of compute resources. The trick it plays is that it constructs a fake MapReduce application with a fixed number of mappers and no reducers. This is known as a map-only application. Because it is disconnected from all the Hadoop data pipelines from Hadoop’s standpoint, it “runs forever” (as opposed to a normal MapReduce application, which runs as long as unprocessed data remains). Because of that, the Giraph implementation has to deal with ingesting the initial data and limiting the application’s runtime.

Even though a running Giraph application looks to a Hadoop cluster like a MapReduce application, it has very little to do with the behavior of a typical MapReduce job.

How does Giraph use MapReduce to spin up all the required internal services? It involves encapsulating Giraph’s business logic in the GraphTaskManager class implementation and expecting it to be instantiated on every node available to the Giraph application. The MapReduce Hadoop framework expects GraphTaskManager to provide a way to execute a mapper. Giraph doesn’t need to worry about the details of how an instance of GraphTaskManager is created; rather, it focuses on the job it needs to perform in the overall mesh of Giraph services. When Giraph uses MapReduce as a back end, the job of instantiating a GraphTaskManager object belongs to a mapper. This is the only thing this unusual mapper ever does.

At the highest level, the entire process of executing a Giraph application on a cluster of machines consists of the following steps (as illustrated in Figure 6-3):

  1. An underlying cluster-management framework (most likely Hadoop’s MapReduce or YARN, although Giraph can be plugged in to other ones as well) instantiates a GraphTaskManager class implementation on a number of nodes. The exact number is determined by Giraph’s configuration.
  2. The same framework then calls the GraphTaskManager.setup() method, which takes care of the initialization and also determines which internal Giraph service (master, worker, or coordination service) this node is supposed to provide to the rest of the Giraph application.
  3. The GraphTaskManager.execute() method gets control. The Giraph application run commences, with each node assuming the role of one of the three internal services: master, worker or coordinator.

9781484212523_Fig06-03.jpg

Figure 6-3. Bootstrapping a Giraph application on a cluster of nodes

Despite its simplicity, this architecture means Giraph doesn’t have to be concerned with the details of finding appropriate cluster resources, scheduling containers to run there, restarting containers on different hosts if they fail, and otherwise allocating low-level resources. This is a very elastic approach that lets the Giraph application have as many workers as needed; the cluster-management framework is responsible for maintaining the worker pool. This is all good news, except for one detail: containers can be instantiated on any host in a cluster, and how can they communicate with each other if they don’t know the host names (or IP addresses) of other services?

The answer to this question lies in Giraph’s use of a coordination service that every container can communicate with. Different Giraph services use the coordination service as a bulletin board or directory where all containers can post information about themselves (such as their network coordinates, operational status, and so on) and inspect information posted by others. Thus, instead of every container keeping track of the network address and state of every other container, the only information needed to bootstrap the Giraph framework is the location of a coordination service.

An interesting consequence of a coordination service playing the role of central information radiator is that it bears the brunt of communicating with every container. To avoid overwhelming it, the Giraph implementation tries to minimize the amount of traffic between each container and the coordination service. The approach is to manage a small amount of metadata via the coordination service and opt for direct communication between containers whenever possible.

As long as the Giraph services are running as they should, a cluster-management framework such as Hadoop stays out of the way. If services fail (due to software bugs or hardware failure), it is up to the cluster-management framework to spin up new instances. The coordination service plays a vital role in allowing the Giraph implementation to detect when services fail and when they are being brought back online (perhaps on a different host). That way, an active master always has a way to rebalance the current graph computation between the active workers.

Of course, detecting service failure and bringing back new instances solves only half the problem. When a service goes down, it also brings down its in-memory state. It would be unfortunate if Giraph had to restart an entire computation simply because one service out of thousands went down. An elegant solution to that problem is to use periodic checkpoints. At a given interval (requested via the giraph.checkpointFrequency configuration property), Giraph records the state of all the services in a permanent location in HDFS (under the home directory of a current user). Suppose giraph.checkpointFrequency is set to 5. This means after every five supersteps, the state of all the services is checkpointed. Should any service fail, the computation must redo at most five recent supersteps, because it is restarted from a checkpointed state. The exact value the checkpoint frequency is set to is highly application specific. On one hand, more frequent checkpointing increases the overall runtime of the application; on the other, it speeds up recovery if a worker fails. Experiment with different values, and pick the one that works for you. Just keep in mind that the default value for this property is 0: by default, checkpointing is disabled.

Now that you understand the mechanics of how Giraph uses cluster compute resources, let’s proceed with a detailed overview of what each Giraph service does and how they interact.

Anatomy of Giraph Services

Once fully bootstrapped and running, every Giraph application consists of a network of services that, collectively, accomplish graph processing by communicating with each other via network API calls. The two services implemented by Giraph (masters and workers) share a common design based on the CentralizedService interface and the BspService abstract class implementing it. The coordination service is implemented separately by a stand-alone Apache ZooKeeper project and is not discussed in this book.

WHAT IS APACHE ZOOKEEPER?

Apache ZooKeeper is an effort to develop and maintain an open source server that enables highly reliable distributed coordination. It was developed in response to the proliferation of highly distributed applications, all trying to implement similar functionality in an ad hoc fashion. You can find information about the design and implementation of ZooKeeper at http://zookeeper.apache.org or in a book written by two of its authors, Flavio Junqueira and Benjamin Reed: ZooKeeper: Distributed Process Coordination (O’Reilly, 2013).

All the functionality offered by masters is implemented in a BspServiceMaster class. On the worker side, BspServiceWorker is the corresponding implementation. Both classes extend the common functionality of BspService by subclassing it and implementing more specific interfaces, as shown in the class hierarchy in Figure 6-4.

9781484212523_Fig06-04.jpg

Figure 6-4. Giraph services class hierarchy

Both services act as network servers and clients at the same time by using the classes implementing the following interfaces:

  • For masters: MasterClient and MasterServer.
  • For workers: WorkerClient and WorkerServer.

Currently, the only implementation available for all four of these interfaces is based on the Netty framework. Netty is a new I/O (NIO) client/server framework that enables quick and easy development of network applications such as protocol servers and clients. Don’t worry if you are not familiar with Netty; as long as you understand that it implements NIO, you should have no trouble understanding how Giraph uses it. Giraph abstracts that away by wrapping the code that provides general-purpose, Netty-enabled client/server interactions into two stand-alone classes: NettyClient and NettyServer. That part has little to do with the Giraph graph-processing model. The part that does is then wrapped in the four classes shown in Figure 6-5.

9781484212523_Fig06-05.jpg

Figure 6-5. Netty-enabled client/server architecture

As you can see, each of the classes implements the appropriate interface by delegating all non-Giraph-specific client/server interactions to a NettyClient or NettyServer object that it owns.

Determining which node should provide which Giraph service is handled by GraphTaskManager.setup(). That determination is permanent during the Giraph’s application run and is driven by the configuration parameters given to the Giraph framework. For example, if an external ZooKeeper ensemble has been specified via the giraph.zkList configuration option, then there is no need for Giraph to assign this service to the nodes it manages. The implementation has the flexibility to assign any combination of services to a given node, with one exception: a coordination service cannot be the only service assigned to a node. In other words, nodes running coordination services always have an additional master or all three services running there as well. The only two impossible combinations of node service assignments are these:

  • A node exclusively dedicated to running a coordination service
  • A node running a coordination service and a worker service

Now that you have enough background about the generic aspects of how Giraph implements its master, worker, and coordination services, it is time to dive deeper into their detailed architecture.

Master Services

The master service always runs as an extra Java thread implemented by the MasterThread class. An instance of that class owns a reference to the BspServiceMaster object and implements its functionality by orchestrating calls to the APIs exposed by BspServiceMaster. The master algorithm is pretty simple, but it needs to be designed in a way that is resilient if the active master fails and a different master (typically running on a different host) assumes the responsibilities of an active master. Figure 6-6 shows the high-level steps that every master service goes through.

9781484212523_Fig06-06.jpg

Figure 6-6. A single master service lifecycle

The first thing a master does is place a bid on becoming an active master via a BspServiceMaster.becomeMaster() call. Note that although all masters issue this call, all but one will block. The one that succeeds will determine the active master; the others will stay blocked until the active master fails. At that point, one of the blocked calls will return and thus determine the next active master. This synchronization barrier is implemented by using the functionality of a coordination service, as you see later.

On becoming an active master, the master thread instantiates a MasterCompute object, provided it was requested as part of the Giraph application configuration. This, in turn, has a chance to register the application aggregators on the active master.

Before entering a superstep loop, an active master tries to perform one more important function: creating a set of input splits for the graph data (both vertex and edge data) to be loaded by the appropriate input formats. Because Giraph applications typically operate on a set of huge files stored in a distributed filesystem, effectively spreading the work of loading that data among all the available workers is a key scalability requirement. This functionality is implemented in the BspServiceMaster.createInputSplits() method, and it works as follows. First it consults a coordination service to see if the mapping of which worker is supposed to load which file has been established. If it hasn’t, the map is generated and externalized into the coordination service. This accomplishes two things at once. First, because every worker is watching the coordination service for updates, once the mapping is made available, the workers can load their portion of the input data (the details of this step are laid out in the section “Coordination Services” below). Second, persisting the mapping to the coordination service as a transaction makes it possible for the standby master to effectively take over in case the active master dies.

Once the input splits’ mapping has been made available to all the workers, the active master transitions into the superstep loop that runs for as long as supersteps remain. The bulk of the functionality driving the superstep loop is implemented in the BspServiceMaster method coordinateSuperstep() and consists of the following steps:

  1. Determine the set of healthy workers by observing the self-reporting available in the coordination service. The master observes the healthy workers for the duration of the superstep by listening to the coordination service events and takes corrective actions if they fail.
  2. Assign graph partitions to the healthy workers. This is a dynamic assignment for a given superstep and may change in the next superstep based on worker load and other considerations. If any partition is assigned to a worker that doesn’t have the graph data for that partition in its local memory, the worker that owns the partition is in charge of sending graph data by issuing API calls re-creating the partition on a target worker. The partition data may also come from the checkpoint file if this superstep was restarted from a checkpoint.
  3. Finalize the state of the aggregators from the previous superstep, and send them to the new worker owners. This is the last step in the two-phase distributed aggregator management. At this point the master has done the final aggregation and is ready to send the values back for further aggregation during the current superstep. See the section “Worker Services” below for the worker side of the two-phase distributed aggregator implementation.
  4. If the current superstep is an input superstep (a fake superstep that is needed to make workers load the initial graph data from input splits), coordinate the loading of vertex and edge data. This is accomplished by partitioning the data files into input splits and writing the mapping of input splits to workers to the coordination service. The workers watching updates in the coordination service will notice that the mapping belonging to them is available and will interpret that as a signal to start loading the data.
  5. Wait for all the workers to finish processing the current superstep. If any of the workers fail while processing the current superstep, attempt to restart from a checkpointed state. If that is successful, the next iteration of the superstep loop will assume that the previous superstep was the one restored from a checkpointed state.
  6. If the checkpoint frequency is met, wait for all the workers to checkpoint their state, and then finalize the checkpointing process globally (including checkpointing the aggregators).
  7. At this point, assume that all the workers have successfully completed the current superstep and it is safe to collect the values of the aggregators assigned to them.
  8. Once all the aggregator values are collected and the global state of the aggregators (as stored in the active master’s memory) is brought up to date, call a master compute’s compute() method.
  9. Determine whether it is time to stop the entire computation. To do so, first check whether an external flag was set in a coordination service, signaling to halt the computation for an arbitrary reason. If it wasn’t, check whether all the vertices voted to halt and there are no more messages pending to be delivered to the vertices. Finally, check whether more supersteps have happened than a maximum number of supersteps allowed for this application (you can set a value for this using the giraph.maxNumberOfSupersteps property). If any of these conditions are true, record the fact that the computation has been halted in the global statistics for this job, and bail out of the superstep loop.
  10. Publish the aggregated application state into the coordination service.
  11. Depending on the value of giraph.textAggregatorWriter.frequency, check whether it is time to output the state of all the registered aggregators. If it is, call the supplied AggregatorWriter implementation.

Once the superstep loop is over, the only thing left for the master to do is to call the cleanup routine BspServiceMaster.cleanup(). All the master processes should signal that they are done by posting a special kind of note to a coordination service. When the number of notes equals the number of partitions for workers and masters, the master cleans up the global state associated with a job. This is how a global barrier is implemented with the help of a coordination service.

Getting back to the example application, there are always exactly two master services running: one active and one standby. Should the active master fail, the standby resumes coordination activities. As long as the master is active, its superstep loop goes through the steps outlined in Figure 6-7.

9781484212523_Fig06-07.jpg

Figure 6-7. Example application’s active master superstep loop

Worker Services

The key function of a worker service in a Giraph application is to manage the state of a few graph partitions assigned to it by the master. Just as with the master, the lifecycle of a worker is managed by the GraphTaskManager.execute() method orchestrating calls to the API methods exposed by the object of the BspServiceWorker class, which provides the bulk of the implementation.

Each worker service operates under the same resiliency constraints that masters operate under. Worker failure is a norm in a Giraph application, and the worker algorithm should be designed with that possibility in mind.

The algorithm consists of two phases: the setup phase and a superstep loop phase. The setup phase is mostly concerned with loading initial graph data into the memory of all the workers. The key steps of how it happens are outlined in Figure 6-8.

9781484212523_Fig06-08.jpg

Figure 6-8. Setup phase of the worker service lifecycle

Not surprisingly, BspServiveWorker.setup() provides the implementation of a setup phase, which, before it loads the data, checks whether setup() was called as part of restarting the failed superstep. If that’s the case, it loads the state from the checkpoint and bails, because the reset of the setup() machinery only applies when the entire Giraph application is initialized for the first time:

  1. Start a fake superstep that does no computation but instead waits for the master to calculate input splits and then loads the slice of input data that is assigned to this worker. Internally, this superstep is called INPUT_SUPERSTEP and has a numeric ID of -1. An important note about loading input splits is that in general, you shouldn’t expect the input data from a split assigned to the worker to generate vertices belonging to one of the graph partitions assigned to the same worker. Thus, a worker loading vertex and edge data is virtually guaranteed to generate network traffic, sending messages to other workers and requesting them to create vertices in their partitions.
  2. Keep loading data and creating new vertices or edges as long as unprocessed input splits remain. The two classes VertexInputSplitsCallable and EdgeInputSplitsCallable provide an implementation of this functionality.
  3. When there are no splits left to process, signal that this worker is done by posting a message to the coordination service, and wait for the other workers to finish. This is necessary because it allows a worker that is finished to still receive network API calls and update its partitions with the vertices loaded by workers still processing their input splits. The fact that all workers are finished is recognized by the master and reflected in the coordination service.
  4. When the master has signaled that all workers are finished loading their data, do a few housekeeping tasks: create the remaining partitions owned by this worker (the ones for which no input data has arrived), and finalize mutations deriving from requests to add edges to the vertices belonging to partitions managed by this worker.
  5. Wait for the master to signal that the fake INPUT_SUPERSTEP has completed and all the vertices are ready to move into the superstep loop.

Once all workers are ready to transition to the superstep loop, they do so at once and enter the main computational loop of the Giraph application. This worker service superstep loop cycles through the following steps as long as supersteps remain:

  1. Prepare for the current superstep by calling BspServiceWorker.startSuperstep(). This combines all API requests received during the previous superstep and mutates the graph partitions accordingly (adding, removing, and updating vertices, and so on). It also waits until master is finished rebalancing the partitions between workers and returns this worker’s partition assignment.
  2. Based on the current partition assignment, the worker may have to push some of its partitions to the workers they were assigned to. It does so by calling BspServiceWorker.exchangeVertexPartitions(). The method makes sure not to return until all of the worker’s dependencies are finished sending their data to it. As usual, the worker signals completion of pushing its own data by sending a message to the coordination service; it relies on the master to aggregate those messages and signal when the entire worker collective is done exchanging graph partitions.
  3. Check whether this superstep is supposed to be loaded from a checkpointed state or whether, instead, it is time to save the current state into a checkpoint (as signaled by the checkpointFrequencyMet() method). The implementation of both actions is provided by two complementary methods defined in BspServiceWorker: loadCheckpoint() and storeCheckpoint().
  4. Call the BspServiceWorker.prepareSuperstep() method, which is currently mostly concerned with synchronizing the state of the aggregators owned by this worker and the rest of the workers. In this step, the worker fetches the state of the aggregators it manages from the master and blocks until they are pushed to all the other workers.
  5. Call compute() for all the vertices belonging to partitions managed by this worker. Iterate over all the vertices, but only call compute() for those that either are not halted or have messages available. This is done in a multithreaded fashion, with threads using the ComputeCallable class as an implementation for the task that needs to be added to the task queue and later executed.
  6. Call the BspServiceWorker.finishSuperstep()method, which does all the necessary work to complete the current superstep and then blocks, waiting for the master to signal that all the workers are ready for the next superstep to begin. As part of this step, all the aggregators that ended up having a value assigned to them as part of this worker are sent to their rightful owners (the owners are picked dynamically by the master). Then, wait to receive values of the aggregators that are managed by this worker from all the other workers. Once that is done, aggregate them one final time and push them back to the master. The last bit of housekeeping, performed by the finishSuperstep() method, is to reflect the statistics from the current superstep in a coordination service. This is needed to keep track of what’s happening on what worker for observability and load-balancing reasons.
  7. Because the previous step makes the global Giraph statistics available to each worker, the superstep loop continues to the next superstep, provided there are active vertices left in the graph.

Once again, if you turn to the example application, each worker (once it enters the superstep loop phase) goes through the set of transitions outlined in Figure 6-9.

9781484212523_Fig06-09.jpg

Figure 6-9. Setup phase of the worker service lifecycle

The worker superstep loop is the central process driving the execution of every Giraph application. Most of the steps are pretty easy to follow, and the only part worth reiterating is the implementation of sharded aggregators. Steps 4 and 6 are the key to understanding how sharded aggregators work.

First, during each superstep, values provided to the aggregators are partially aggregated by the workers locally. No network API calls are involved. Once the superstep is done, all of these partially aggregated values coming from different workers need to be aggregated once more, and the resulting value must be made available to MasterCompute.compute(). One option is to let the master manage all the aggregators because their value needs to end up there anyway. This simple approach unfortunately doesn’t scale if an application uses a lot of aggregators; it turns the master into a bottleneck from both the computation and network communication standpoints.

The approach that Giraph takes is to dynamically shard the aggregators between the workers in the cluster. Here’s how it works: when the superstep ends, each aggregator is assigned to one of the workers, and that worker becomes responsible for receiving partially aggregated values from its neighbors. It is also expected to push the final aggregated value back to the master. That way, the master doesn’t need to perform any aggregation, and it always receives one final value for each aggregator. Once the values are received by the master and it is finished executing MasterCompute.compute(), the only worker to which that master sends each value is the owner of that aggregator. As a final step, the owners are expected to distribute these values to their neighbors.

Coordination Services

As noted earlier, a coordination service is not implemented by Giraph. The implementation comes from a stand-alone Apache ZooKeeper project. Still, it is important to understand the APIs it offers and how masters and workers use them. Most of the data that Giraph puts into a ZooKeeper is permanent, so knowing the details of how Giraph externalizes its state can be an invaluable tool for debugging failed applications. And if you want to make it even easier to observe and diagnose running Giraph applications, run them with an external ZooKeeper ensemble (by setting the giraph.zkList property) instead of relying on Giraph to spin up coordination services on demand. By running with an external ZooKeeper ensemble, you can inspect the internal state of the coordination service (and thus the internal state of your Giraph application) whenever you like, as opposed to only while the application is running.

One of the key properties of ZooKeeper is that the service it provides is highly available. The way it is implemented requires that all nodes that are part of the ZooKeeper ensemble know the network addresses (hostnames and ports) of every other node that is part of the same ensemble. That way, they can make the best effort to synchronize the common state between as many nodes as possible. ZooKeeper clients (remember, Giraph is only a ZooKeeper client) talk to only one ZooKeeper node at a time. But even the client has to know the network addresses of as many nodes in the ZooKeeper ensemble. After all, if the node the client is currently talking to goes down, the client has to know where to reconnect.

As long as Giraph uses an external ZooKeeper service, the network addresses of all the nodes in the ensemble are passed to the worker and master services as a static, comma-separated list specified via the giraph.zkList property. If, on the other hand, Giraph needs to spin up a coordination service on demand, it can no longer know in advance what nodes in the cluster those services will be instantiated at. It is a curious chicken-and-egg problem: Giraph needs ZooKeeper to store all the information that must be shared between services running on different nodes, but the network addresses of the ZooKeeper nodes are exactly the kind of information that need to be stored in ZooKeeper. It seems as though Giraph needs ZooKeeper to be able to bootstrap ZooKeeper. How do you avoid this infinite regression?

Giraph uses another storage substrate that all Giraph services know how to access: HDFS. Although most of the time HDFS is an extremely poor choice for keeping small files that need to be frequently updated, it is OK to use it on a case-by-case basis. That is exactly how Giraph uses it. It records the network addresses of the ZooKeeper ensemble nodes as names of HDFS files. The exact location where it stores these files is set by the giraph.zkManagerDirectory property; the default value is _bsp/_defaultZkManagerDir (note that because the default value has a relative, not absolute, path name, it is rooted under the home directory of the user running the job).

Now that you know how bootstrapping the ZooKeeper ensemble works and how master and worker services are enabled to connect to it as clients, let’s see what ZooKeeper client APIs they use. From a client perspective, ZooKeeper offers a filesystem-like hierarchical API with a namespace structured as a tree of nodes (each node in a tree, including inner nodes, is called a znode), with every node answering to a small set of API calls. Each znode in the hierarchy can hold some data, be a parent node to other nodes, or, unlike in any of the filesystems, do both. ZooKeeper is designed for coordination, which typically requires small chunks of data stored in znodes (the amount of data is capped at 1MB). All APIs are atomic, which means clients receive either all the data or none of it. When a client creates a znode, it may elect to declare the znode to be ephemeral and thus guarantee that it is visible only as long as the client that created it is connected to ZooKeeper. If the node is not ephemeral, it is considered to be persistent, and an explicit delete call would be required to remove it from the tree. Both types of znodes can have an additional property of being sequential (thus ZooKeeper znodes fall into one of four categories: ephemeral, ephemeral sequential, non-ephemeral, or non-ephemeral sequential). A sequential znode’s name is derived from the initial name supplied as part of the create API call and the sequence number ZooKeeper assigns it. So, if multiple clients try to create a sequential znode with the same path name, each of them will end up creating a unique znode with ordering guaranteed by the ZooKeeper service. Finally, clients can register watches that are triggered when other clients perform certain types of operations on the tree of znodes. Events such as creation, deletion, and modification can be tracked that way.

INSPECTING THE ZOOKEEPER ZNODES TREE

By default, Giraph doesn’t provide any tools to inspect the tree of znodes it maintains in the ZooKeeper service. It is highly recommended that you download and install Apache ZooKeeper separately and use the command-line utility zkCli.sh or zkCli.cmd to browse the file tree. The only command-line argument you need to supply is –server. This is the comma-separated list of ZooKeeper ensemble nodes that the client connects to. Once the client is started, it behaves similarly to an ftp client. Type help to get a quick overview of supported commands.

Giraph keeps all of its coordination metadata and internal state data rooted under the _hadoopBsp znode. By default, it is created directly at the root of ZooKeeper’s hierarchy, but you can also instruct Giraph to root it at a subtree by setting the giraph.zkBaseZNode property. For example, setting giraph.zkBaseZNode to a value of /giraph/examples results in a running Giraph application creating the set of znodes shown in Listing 6-1.

As you can see, Giraph exposes a treasure trove of information in the ZooKeeper state it maintains. An obvious implication of this is help with debugging; a less obvious one is integration with external applications that may need to track the progress of running Giraph applications. Keep in mind that the location and content of Giraph znodes are considered internal APIs of Giraph and may change without notice between releases. Figure 6-10 gives you a complete view of a hierarchy of znodes used by various Giraph services for coordination and metadata management.

9781484212523_Fig06-10.jpg

Figure 6-10. Tree of znodes used by various Giraph services for coordination

All the znodes from the example are rooted under /giraph/examples/_hadoopBsp/job_1394131425944_0001. This common prefix was determined by the giraph.zkBaseZNode setting and the ID of a graph-processing job currently run by Giraph. Under that common root are roughly five different classes of znodes, color-coded according to the function they serve. Magenta znodes (names starting with _vertexInputSplit) are used to coordinate workers processing splits. Yellow znodes (names starting with _edgeInputSplit) are used for the same purpose when edge-split processing is requested. For both of these, the postfix of the znode name determines what it is used for:

  • _vertexInputSplitDir and _edgeInputSplitDir contain paths to the input splits written by the master.
  • _vertexInputSplitDoneDir and _edgeInputSplitDoneDir contain paths to the vertex input splits that have been fully processed.
  • _vertexInputSplitsAllReady and _edgeInputSplitsAllReady contain paths to the vertex input splits ready to be processed by workers.
  • _vertexInputSplitsAllDone and _edgeInputSplitsAllDone contain paths to the finished vertex input splits, to notify the workers that they should proceed.

Using these four znodes, you can create a fault-tolerant work-sharing scheme with an active master coordinating the processing and assignment of edge and/or vertex input splits as part of creating the initial graph topology. Green znodes are used for electing the active master (_masterElectionDir) and also for keeping the global, master-specific job state in a safe location (_masterJobState) in case an active master dies. Violet znodes are used to signal from the client side to stop the current graph-processing job and also to track worker cleanup activities:

  • _haltComputation: A client setting this global flag can signal Giraph to halt the computation regardless of whether it should be halted naturally. This is the quickest way to abort the job without attempting to save anything.
  • _checkpointAndStop: A client setting this global flag can signal Giraph to do the checkpointing of its current state and then halt the computation. This is a graceful way to stop the job.
  • _cleanedUpDir: This znode denotes which workers have been cleaned up.

Finally, blue znodes are the heart of graph processing. Each graph-processing application goes through a number of attempts to run a given graph-processing job the same way a Hadoop MapReduce execution goes through a certain number of attempts to run a mapping phase before declaring a failure. All the attempts are tracked under the znode path _applicationAttemptsDir/<attempt ID>. Within each attempt’s znode, _superstepDir tracks supersteps by ID (starting from -1). During each superstep, the master coordinates the progression of work among workers and rebalances the graph partitions. The following znodes are used to do that:

  • _workerHealthyDir and _workerUnhealthyDir are used to partition workers (nodes) in the cluster into a set that can be assigned work and a set that needs to be skipped for work assignment. Each worker can be assigned to one or the other.
  • _workerFinishedDir and _workerWroteCheckpointDir contain znodes of workers that are either done with the current superstep or done checkpointing the state of the current superstep, if checkpointing is enabled.
  • _addressesAndPartitions is used for tracking master and worker addresses and partition assignments.
  • _partitionExchangeDir helps coordinate the partition exchange between workers.
  • _superstepFinished flags this superstep as finished.

Most of the time, the Giraph implementation doesn’t interact with ZooKeeper client APIs explicitly, and instead wraps all API calls into methods provided by the ZooKeeperExt class. This class not only acts as a façade for ZooKeeper client APIs but also provides extended functionality such recursive path creation and non-atomic operations.

The majority of znodes created by Giraph fall under the category of regular (non-ephemeral, non-sequential) znodes; working with them is very similar to working with files in a regular filesystem. There are, however, a few exceptions when Giraph needs to use the APIs provided by the ephemeral and sequential flavors of znodes:

  • Depending on whether they consider themselves healthy, workers create ephemeral, non-sequential znodes (named according to the hostname the worker is running on) under either the _workerHealthyDir or _workerUnhealthyDir znode. Giraph offers a hook for defining the criteria of what makes a node unhealthy however default implementation always returns health status for any node as healthy. Because these znodes are ephemeral, ZooKeeper guarantees that when a worker dies, a znode associated with it will disappear as well, and an active master will be notified because it maintains a watch over the two locations.
  • Workers processing splits assigned to them by the master put the claim on the work that needs to be done by creating an ephemeral, non-sequential znode with a name corresponding to the input split assigned to them. When a worker fails, its work reservation znode disappears, allowing other workers to claim its previously read splits.
  • Different master services place a bid to become an active master by creating an ephemeral, sequential znode under _masterElectionDir. Once the bids are placed, ZooKeeper imposes total ordering on the file names. The master whose bid has the smallest ID in that order proceeds under the assumption that it can now act as an active master. The rest of the masters block, watching over the children of the _masterElectionDir znode. Whenever an active master dies, its ephemeral znode is automatically removed by ZooKeeper. This, in turn, wakes up all the candidate masters. The candidate master with the smallest sequential ID associated with its bid assumes the duties of an active master.
  • An active master service maintains periodic snapshots of the global state of the entire Giraph job by writing it out to a persistent (non-ephemeral), but at the same sequential, znode named _masterJobState/jobState_XXX. This guarantees that the master never overwrites the previous state, but keeps creating additional snapshots with increasing IDs. This snapshot of the internal state is used during recovery from a master failure.

This concludes the review of the design and implementation of each of the three services: masters, workers, and coordinators. So far, the chapter has focused on the normal execution path and has not spent much time covering fault-tolerance considerations. This is the subject for the rest of the chapter.

Fault Tolerance

As mentioned earlier, much of the Giraph architecture is dictated by the need to effectively address the issues captured by the fallacies of distributed computing. The biggest one often is the fact that at Giraph’s typical operating scale, nothing can be assumed to be reliable.

HOW OFTEN DOES IT BREAK?

Everybody knows Google is an authority on managing the world’s information. What’s less well known is that based on its track record of building and operating large datacenters, Google has become one of the leading authorities on datacenter buildout and maintenance. The company is extremely careful about the design of everything that goes into its datacenters, but even Google, according to Google Fellow Jeff Dean (in his “Software Engineering Advice from Building Large-Scale Distributed Systems” talk), sees a lot of things go wrong.

In each cluster’s first year, it’s typical for 1,000 individual machine failures to occur; along with thousands of hard drive failures. One power distribution unit will fail, bringing down 500 to 1,000 machines for about 6 hours; 20 racks will fail, each time causing 40 to 80 machines to vanish from the network; 5 racks will “go wonky,” with half their network packets missing in action; and the cluster will have to be rewired once, affecting 5% of the machines at any given moment over a 2-day span, Dean says. And there’s about a 50% chance that the cluster will overheat, taking down most of the servers in less than 5 minutes and requiring one or two days to recover.

So far in this chapter, you have seen many techniques employed by Giraph to withstand various failure scenarios. This section looks at those scenarios and reviews the steps Giraph must go through to recover from disk failures, node failures, and network failures.

Disk Failure

As mentioned, Giraph is not concerned with managing storage. It does, however, interact with various storage layers to accomplishing the following goals:

  • Reading the initial graph data and possibly outputting the final graph data at the end of the application run. Most of the time, the data comes from HDFS; but with an increased number of Giraph input/output formats available for other storage frameworks, the HDFS may be out of the picture.
  • Storing checkpoints of each vertex state during the computation. Unlike with input/output formats, which provide unlimited flexibility for connecting Giraph to various data sources, when it comes to checkpoints, flexibility is very limited. Currently, the Giraph implementation can only use Hadoop-compatible filesystems, but most of the time stock HDFS is used.

For both use cases, what happens when a disk fails depends a great deal on what storage substrate is being used. In the case of a correctly deployed and configured HDFS, Giraph will not notice a failure rate of a few disks per day. HDFS is specifically designed to cope with those types of failures behind the scenes and mask them from the client. This is accomplished by making copies of all the data on at least three nodes (one on the same server rack and one on a different server rack). As far as metadata (a mapping between file names and blocks) is concerned, it is also typically stored in a few alternative locations. This makes Giraph checkpoints and the input/output formats operating on storage frameworks riding on top of HDFS (HBase, Hive, and so on) extremely reliable and oblivious to the disk failure rate of a typical datacenter. On a correctly configured HDFS deployment, the failure of a Giraph application due to a disk failure requires a very improbable set of events (all three disks hosting the same block failing at the same time).

Of course, however improbable it is, it can still happen. In addition, there are input/output formats for non-HDFS backed storage frameworks. These frameworks may not be as tolerant to individual disk failures as HDFS is. Either way, if a disk fails and a storage framework cannot silently recover, the error is propagated to the level of Giraph. At that point, if the error was encountered while an input/output format was reading or writing the data, the application will fail with an error message.

Node Failure

Because nodes in a cluster typically host a collection of different services, a useful way to analyze what happens when a node fails is to look at what happens when a particular service fails. That way, a failure of a single node can be viewed as a complex event consisting of a simultaneous failure of all the services hosted by it.

Giraph Master Failure

A master service can be in one of two states: it can be blocked on a bid to become an active master, or it can function as one. The master service that is blocked on a bid doesn’t have any internal state. Thus, when it fails, the only event of any significance is that the znode advertising its bid for becoming an active master disappears from the coordination service. There is no internal state to recover (because a blocked master doesn’t have any state), and additional master services can be spun up (and wait to become active) very easily.

A failure of an active master is more complex. If an active master fails, its ephemeral, sequential znode disappears as well, and that event wakes up all the master services blocked on a bid to become active. One of those master services will discover that its znode is now the first child of _masterElectionDir and will proceed to become an active master (the rest remain blocked on a bid). Because an active master keeps most of its state in a coordination service, a newly appointed active master doesn’t need to recover the state of the failed master. Even the aggregator values don’t need to be recovered, because they are fetched from the workers they are assigned to at the end of every superstep.

All in all, regardless of where an active master fails, a new active master can start from the beginning of coordinating the current superstep, fetch the state from the other services, and retrace all the steps just before the previous master failed.

Giraph Worker Failure

The purpose of the worker service is to manage the graph partitions and the aggregators assigned to it. This means, unlike a master service, a failed worker brings down quite a bit of unique state associated with it. The state is unique to each worker and is not redundant, and it would be undesirable for Giraph to have to restart an entire computation because of one failed worker. Fortunately, as mentioned earlier in this chapter, you can protect a long-running computation against this risk by using checkpoints.

Before you go any further with checkpoints, recall that a worker service goes through two phases: a split-loading phase and a superstep loop. During the split-loading phase (designated as a fake superstep with ID -1), each worker is tasked with loading initial graph data, turning it into vertices and/or edges, and sending those objects to an owning worker via network API calls. Each worker tries to load as many input splits as possible. The master monitors overall progress and waits for all the input splits to be loaded and (possibly) checkpointed on the receiving worker. If any worker fails during the split-loading phase, the master aborts the entire application run. This approach means there’s no need for a special recovery mechanism during the split-loading phase.

After the split-loading phase, each worker transitions into the superstep loop. If checkpoints are enabled, you are guaranteed to have the initial graph data safely stored in HDFS. From that point on, whenever a healthy worker fails, its ephemeral znode disappear from _workerHealthyDir and an active master is notified by the coordination service. On receiving this notification, the active master declares the current superstep as failed, immediately restarts its own superstep loop from the last checkpointed one, and instructs all the healthy workers to load the state corresponding to that last checkpointed superstep. Once that is done, the computation continues, and the current superstep is the last checkpointed superstep plus one. Of course, if checkpoints are not enabled, the loss of even a single worker will mean an entire application run will terminate immediately and be marked as a failure.

ZooKeeper Service Failure

ZooKeeper is architected to be resilient to the failures of the individual nodes participating in its ensemble by reliably replicating its state to the majority of nodes. It also guarantees a reliable process for nodes to rejoin the ensemble after they go down or are cut off from the rest of the service. As long as the majority of the nodes in the ensemble are available, the service will be available and consistent.

From a client perspective, a single ZooKeeper node failure triggers a connection loss event. But because clients are supplied with a list of all the nodes in the ensemble, they can try to reconnect to the nodes that are still functioning.

If Giraph uses an external ZooKeeper service, the number of nodes in the ensemble is predetermined and can’t be changed. If Giraph is managing its own coordination service nodes, you can control the size of the ensemble by setting the giraph.zkServerCount configuration property. The default value is 1, which doesn’t allow for any kind of recovery. If you have to run with a Giraph-managed coordination service, be sure to set it to at least 3. In general, it is wise to set it to an odd number, because an even number doesn’t add much to the reliability of the overall service compared to the lower odd number. For example, if you set it to 4, ZooKeeper will tolerate at most one node failure (because if more than one node fails, the remaining one or two nodes won’t constitute a majority). This is no different from setting it to 3, which can also tolerate at most one node failure.

Network Failure

A network failure occurs when one node can no longer communicate with a subset of nodes in a cluster. A particularly nasty type of a network failure, known as network partitioning, happens when two subsets of cluster nodes can no longer communicate with each other but retain perfect communication abilities between the nodes in each subset.

Network failures affect Giraph in two major ways. First, network errors may prevent communication between the services (masters, workers, and coordinators) that form a running Giraph application. As you’ve seen in this chapter, the entire graph computation is done by master and worker services communicating via a set of network-enabled API calls. Masters and workers communicate with each other by using the Netty-based implementation of NettyClient and NettyServer. In general, the NettyClient implementation tries to be as asynchronous as possible. The remote API calls made in a superstep are non-blocking and go into a queue of outstanding requests for asynchronous processing. The superstep is considered finished when the queue of outstanding requests is drained. This is accomplished by relying on the synchronization method waitAllRequests(), which blocks until all the outstanding API calls are finished. It doesn’t just passively block, though; it keeps polling for the status of outstanding requests with a frequency determined by the giraph.waitingRequestMsecs property (default value 15 seconds). Each API request sits in a queue of outstanding requests for not more than the time determined by the giraph.maxRequestMilliseconds property (default value 10 minutes). Any requests that take longer than that are considered to have failed and are resent. The size of the queue of outstanding requests is determined by the giraph.maxNumberOfOpenRequests property (default value 10,000).

Because communication with the coordination service is not Netty based but relies on native ZooKeeper Java APIs, it works slightly differently and relies on a different set of timeouts and polling intervals. As you’ve seen, an object of class ZooKeeperExt manages a connection to a coordination service for both masters and workers. As part of its initialization, it expects to receive values for the following network-related ZooKeeper client settings:

  • How long a connection can be in a failed state before the entire session with a ZooKeeper ensemble is declared to be timed out (set by the configuration property giraph.zkSessionMsecTimeout with a default value of 1 minute).
  • How many times to retry connecting to a ZooKeeper ensemble before giving up (set by the giraph.zkOpsMaxAttempts property with a default value of 3).
  • How much time to wait before retrying due to a connection loss. This timeout interval is controlled by two properties. The giraph.zkOpsRetryWaitMsecs property (default value 5 seconds) on the client side determines the timeout for the masters’ and workers’ connections to the ZooKeeper ensemble. The giraph.zkServerlistPollMsecs property controls the same timeout for some internal operations and is also used for spinning up coordination service nodes on demand (instead of using a permanent ZooKeeper ensemble). Its default value is 3 seconds.

Finally, unbeknownst to Giraph, network-related failures may affect the internal state of the services it interacts with. For example, if you are using an external ZooKeeper ensemble and a network partition splits that ensemble in two, different masters and workers may end up interacting with different subsets of a single ZooKeeper ensemble. It is up to the ZooKeeper service to make sure it does whatever is necessary to present a consistent view of the world to all Giraph masters and workers. The last thing you want is an active master that has a different view of the workers than was reported by the workers themselves. This could happen in a scenario where a few workers report back their state to part of a ZooKeeper ensemble that is currently partitioned away from the subset of the ensemble that the master is connected to. The reason it doesn’t happen has to do with how ZooKeeper combats network partitioning; describing the ZooKeeper implementation is outside the scope of this book.

Remember that whenever Giraph services are talking to external distributed services, they rely on those services to either report an error or maintain a fully consistent model of their world. In general, Giraph is not architected for interacting with eventually consistent services.

Summary

This chapter covered these topics:

  • High-level architectural overview of the Giraph framework
  • A detailed walkthrough of internal services that together form the Giraph implementation
  • How Giraph uses Apache Hadoop for all low-level cluster-management tasks while remaining flexible enough to allow alternative cluster-management frameworks to be plugged in by putting Giraph business logic into a stand-alone GraphTaskManager class
  • How Giraph uses the Apache ZooKeeper implementation for all its coordination tasks, and the overall structure of the znode tree that various Giraph services use to communicate with each other
  • Review of failure scenarios and scalability challenges and how Giraph deals with them

The key takeaway from this chapter is that the Giraph architecture is fundamentally built around three types of internal network services (masters, workers, and coordinators) running in a distributed fashion and collectively orchestrating the work that needs to be done for each superstep:

  • Workers are defined by two phases: a setup phase and a superstep loop phase, each of which deals with working on a particular partition of either input data or graph data assigned to every worker by the active master.
  • Masters play a central role in coordinating worker partition assignments (for both input data and graph data) and also coordinating worker transitions from phase to phase and from superstep to superstep.
  • Coordinators form a ZooKeeper ensemble and let masters and workers coordinate with each other while also tracking the overall composition of roles in the Giraph service assignment.

Throughout this chapter, you have looked at various “knobs” specific to each of the services and how different setting for these knobs affects the overall execution of a Giraph application (such as setting the number of masters via giraph.zkServerCount or specifying a checkpoint interval by setting a non-zero value for giraph.checkpointFrequency). Overall, the Giraph implementation is extremely tunable; to understand the ways you can change its behavior, you have to dig into Giraph’s source code (start with GiraphConstants) or consult the manual at http://giraph.apache.org/options.html.

Although this chapter is heavy on technical details, it provides an important overview of the internals of the Giraph implementation—something that comes in handy any time you need to debug or tune a Giraph application. One thing you haven’t spent much time on is how Giraph interacts with external storage frameworks by using its flexible input/output format extensions. This is the focus of the next chapter.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset