CHAPTER 10

image

Advanced IO

This chapter covers

  • Using Hive tables for IO
  • Accessing data through Gora

In this chapter, you continue with the more advanced features of Giraph, focusing on ways to read input data and write output data. Recall that as your input graph is usually stored in some special format on a storage system, Giraph must be instructed on how to read data from your storage system and how to convert it to its own internal representation; that is, vertices and edges using the VertexInputFormat and EdgeInputFormat implementations. For example, in Chapter 7, you described the basics of reading and writing data from the Hadoop Distributed File System (HDFS) that are typically in text format, a very common scenario for storing graph data.

However, HDFS is not the only option. There is a variety of storage systems and different formats in which you might store your data. For instance, you might use a NoSQL type of storage systems, like Hive and HBase, to store your data in the form of semistructured tables. Accumulo is another distributed key-value store that allows you to store data in a table format and provides access control mechanisms to enable security-related policies. In-memory data stores, such as Redis, have also become popular because they allow low-latency data access.

To facilitate all of these different storage options, Giraph provides an API that makes it easy to extend to different formats and storage systems. Aside from this, it also provides implementations of these APIs for a variety of storage systems.

In this chapter, you look at two specific cases of storage systems: Hive and Gora. Note that the goal of this chapter is not to present an exhaustive list of systems or data formats. Rather, by the end of this chapter, you will know how to customize the existing API implementation or extend it to cover your particular application scenario.

Accessing Data in Hive

First, you will look at table-based formats— Hive, in particular. In general, table-based formats for storing data are quite common. Hive allows you to view your data as a semistructured table and to query data using an SQL-like query language. Hive runs on top of the Hadoop clusters. Hive tables are stored on HDFS, and Hive queries are executed as Hadoop jobs, providing a scalable platform for storing and querying semistructured table data. For these reasons, it is a popular storage system.

A Hive table stores data in rows, with each row consisting of a number of columns. In, Hive the number and the type of the data contained in the columns of a row is agreed in advance; this is the schema of the table.

Here, you see how a Hive table may store information representing an input graph. Now, just like with the text-based input formats that you saw in the early chapters, a Hive table might store data about the input graph in an edge-based format or a vertex-based format. In an edge-based format, a Hive row contains all the information required to construct an edge; that is, the identities of the two endpoints of the edge and its edge value. In a vertex-based format, a Hive row may contain all the information required to construct a vertex; that is, the vertex ID, its value, and information about all the edges of the vertex. For instance, in a vertex-based format, a single Hive column may contain a list of all the vertex edges.

Reading Input Data

Let’s assume that you are running a microblogging service where users can post articles and share them with other users. Typically, whenever somebody makes such a post, you want store it for further analysis; for instance, find which users post the most articles, which people interacted the most over the last week, what are the most common topics users post about, and other useful metrics. Table-based stores are a good choice for storing these data. Table 10-1 shows an example of how you might store data there.

Table 10-1. A Table Storing Article Share Events from Our Example Service

Tab10-1.jpg

Even though it might not be immediately obvious, the users that are active on your service form a type of network. Whenever user A shares an article with another user, this makes for a connection between two users. Now let’s assume that George is a popular and frequent user of your service and that you are interested in finding out how fast information that George posts can travel on your service. In other words, once George posts an article, how many times does it have to be shared between users before it reaches everybody?

This type of analysis may sound familiar; you essentially want to compute distances between users. In Chapter 4, you saw how to write a Giraph application that computes the shortest paths between users. This is exactly what you want here too. Since you have already seen how to compute this metric, in this chapter you will only look into how to form the input graph from the input table data.

Now, recall that Giraph provides two basic kinds of input formats: edge-based and vertex-based. In edge-based formats, your data contains information about the connections of vertices in the graph; whereas in vertex-based formats, the data contains information about the vertices themselves. If you look at Table 10-1, you see that it stores information about which user shares articles with a particular other user; that is, the connection among users.

As with text-based formats and HDFS, Giraph already abstracts most of the details of reading data from Hive tables and provides a simple interface that you need to implement. Implementing this interface essentially requires you to specify how to create an edge from a table row, like the ones you saw in Table 10-1. Recall that the constituents of an edge definition are its source vertex ID, its destination vertex ID, and its edge value. These are the pieces of information that you have to extract from a Hive table row. But let’s look at the exact methods that you need to implement. In a moment, you will see them in action with an example implementation for your particular scenario. Listing 10-1 shows the SimpleHiveToEdge abstract class that Giraph provides.

This is the simplest class that you may have to implement to read data from Hive. The first method that you have to implement, getSourceVertexId(), receives as input an object of type HiveReadableRecord, which is an abstraction of a Hive row. The role of the method is to construct the source vertex ID from the HiveReadableRecord. You will see how to do this in a moment. The second method, getTargetVertexId(), has a similar role, to extract the ID from the hive record. Finally, through the getEdgeValue() method, you are defining what the edge value is given an input Hive record. Notice that as with the input formats you saw in earlier chapters, this input format has parameters too: the type of the vertex ID (I), the type of the edge value (E).

Now, let’s see all of these methods in action. In this particular scenario, there are two columns in the Hive table that you are interested in: the “user” column that is going to be the source of an edge and the “shared_with” column that is going to be the target of an edge. The vertex IDs represent the name of the users, so in your implementation, they are of type Text. Listing 10-2 shows the implementation.

First, notice that all the methods are passed as input an object of type HiveReadableRecord, which represents a Hive record, essentially a row in the table. In your implementation, you will use the methods of the HiveReadableRecord class to get access to the different columns of a Hive row.

Let’s look at the implementation of the methods one by one. The first method, getEdgeValue(), must return an object representing the edge value. In this particular scenario, you do not want to assign a particular value to an edge; rather it is of type NullWritable, so the method simply returns a NullWritable object through the static NullWritable.get() method.

Next, you must read the source and target IDs. You already know that the source ID of an edge is placed in the first column of a row and the target ID is placed in the second column. A HiveReadableRecord allows you to retrieve the data stored in a row by using the index of the column, with indices starting at column 0. Listing 10-3 shows all the methods of the HiveReadableRecord interface.

Another aspect you must be aware of is the type of data that each column stores. Each column may store data of different types. For instance, usernames may be stored in a column as strings; whereas a date may be stored in a column in the form of a timestamp of type long. Depending on the data type of each column, you are going to use the right method of the HiveReadableRecord interface. In the previous listing, you see that the interface provides methods for accessing data of all basic types, such as integers, floats, strings, and structures such as lists and maps.

Going back to your implementation, you now see how you can implement the getSourceVertexId() and getTargetVertexId() methods. You use the getString() method of the HiveReadableRecord interface to retrieve the columns with indices 0 and 1 accordingly, as these are of type string and then put them inside a Text object. And you are done.

The only thing that is missing now is instructing Giraph to use this particular implementation of an input format, but also to tell it which Hive table to read data from. You are going to do this using the familiar –eif command line parameter when you start the Giraph job.

Note that the implementation of the actual computation was not discussed here. The important thing is that Giraph allows you to run the same computation on data stored in different storage systems with no changes. All you have to do is write an input format; the rest remains the same. In fact, in many cases, you may not even have to write your own implementation. The Giraph code base already includes a variety of input format implementations to read data from Hive tables. One of these may already suit your particular scenario.

In this example, the input graph was stored in an edge-based format. As mentioned, it is possible to have vertex-based input formats in Hive too. Even though we do not go into the details, the concept of a vertex-based input format in Hive does not differ much from the text-based format that you saw in previous chapters. Giraph provides an interface to convert a single Hive row, just like with text lines in text-based formats, to a vertex. Listing 10-4 shows the interface that you would have to implement.

Similar to vertex-based formats, in this case you have to specify how to extract the following information from a row: the ID of a vertex, the value of the vertex, and the edges of the vertex. Just like the edge-based format, you can access the columns of the Hive table using the methods provided by the HiveReadableRecord interface. As an exercise, we suggest that you implement a vertex-based input format that builds a graph from Table 10-2.

Table 10-2. Example Input Hive Table in Vertex-Based Format

user

shared_with

George

{John, Mark}

Mary

{Nick, Maria, Mark, George}

Mark

{George}

John

{Maria, Helen, George}

Maria

{Mark, John, Nick, Helen}

This table contains similar information as the previous one; only in this case, all the information necessary to create a vertex: the vertex ID representing a user and the people who he or she shares with exist in the same row.

Writing Output Data

So far we talked about how to read input data from Hive, but naturally you may want the output of your application to be stored in a Hive table. This way you can query the results of your analysis in an easy way using the Hive Query Language. In this section, you see how to achieve this.

First, let’s decide what the output Hive table will look like. In this particular scenario, you compute every user’s distance from George. In the simplest form, the output table contains the computed distance for each user (see Table 10-3).

Table 10-3. Exampke Hive Output Table

user

value

George

0

Mary

4

Mark

3

John

1

Maria

2

The output table contains one row per user. Each row has one column for the username, called “user”, that contains data of type string, and one column to store the computed distance, called “value”, that contains data of type double. Here, you assume that this table already exists and has this particular schema. If you want to learn more about how to create Hive tables, please refer to the Hive documentation.

Similar to input formats, Giraph abstracts most of the details and makes it easy for you to fill this table with output. In practice, you are performing the inverse actions; you are telling Giraph how to extract information from a vertex table and where to place it in a table row. Let’s go over the implementation shown in Listing 10-5.

The only method that you need to implement here is the fillRecord() method, which passes as input a Vertex object and a HiveWritableRecord object. Giraph calls this method for every Vertex in your graph and passes the Vertex object as input to this method. It also passes as input a HiveWritableRecord object that represents an output Hive table row. Your responsibility is to specify how to fill that row with information extracted from the Vertex object.

Like the HiveReadableRecord, the HiveritableRecord interface provides a set of methods that make it easy for you to store information of different types in the columns of a Hive row. These are shown in Listing 10-6.

Typically, you just need to specify the index of the column that you want to store information and the value that you want to store. Aside from this, you need to ensure that you use the right method, depending on the schema of the table; that is, the type of data stored in each column.

Now, let’s consider your particular scenario. The output table has two columns: one of type string, where you store the name of the user, and one of type double, where you store the computed distance for that user. Listing 10-7 shows the implementation.

As with input tables, you use an index to identify which column you are referring to with column indices starting at 0. Keep in mind that you must be aware of the schema of the output table and write the correct values to the correct columns. In cases where you actually attempt to write a value to a column of a different type, this will raise an exception.

And this concludes the implementation of your output format. Now, similar to the input format, you need to instruct Giraph to use this particular implementation of the output format, and tell it what Hive table to write the data to. To do this, you are going to use the –of command-line parameter when you start the Giraph job.

Accessing Data in Gora

Next, let’s look at another type of storage system, specifically in-memory storage systems. In-memory storage systems are another popular class of systems because they provide fast access to data. This section discusses Apache Gora, a framework that provides an in-memory data model that also supports data persistence to different underlying storage systems: databases (like MySQL), column stores (like HBase and Cassandra), key-value stores (like Redis), and even simple files on HDFS. For a complete list of the supported storage systems, visit the Apache Gora site.

While Apache Gora provides persistence to such systems, it abstracts the details of how to persist objects, allowing the user to work with the in-memory representation of objects, making access and manipulation of objects much easier from a programming perspective. Typically, the Gora system is set up to map data in the underlying storage system; for instance, an HBase row to an in-memory Java object. From then on, a user may access data from Gora based on an object key, like an index.

In the remainder of this section, you will go over using the Giraph input formats to read data from Gora. Setting up Gora with an underlying persistent storage system is not covered, nor is how to define the mapping between the underlying storage system and the in-memory objects. Instead, you see how to access in-memory Gora objects and convert them to Vertex and Edge objects that represent your graph inside Giraph.

You will use the same application scenario that you used in the previous section. The input data represents a set of users that share articles with each other, forming a social network, and you want to compute the distance of all users from user George in this social network.

Reading Input Data

Let’s look at how you can read input data from the Gora in-memory storage system. First, recall that Gora provides an in-memory data model. This means that it presents data to applications in the form of Java objects. While underneath the hood data may persist in different storage systems, as a user you are manipulating the in-memory version of it. The conversion is left to the Gora system itself.

Let’s assume for a moment that the in-memory representation of a user in Gora is an object called GoraUser that has the form shown in Listing 10-8.

This object represents a user and contains information similar to what you saw in the previous section with data stored in Hive. Each user has a name that he or she is identified by and also a set of users with whom they share articles with. The sharedArticles map contains a mapping from a username to an article that was shared with that user. For simplicity, here you assume that a user can share only one article with another user.

As with input format in general, you have to decide whether you need to implement an edge-based or a vertex-based input format. In fact, the Giraph code contains both edge-based and vertex-based implementations for reading data from Gora. Now, you must have noticed that the representation of a user in memory contains all the necessary information to construct a vertex; therefore, a vertex-based input format is the natural choice here.

So let’s look at the interface that you have to implement to read vertex-based data from Gora. As expected, the Gora interface is implemented by extending the familiar VertexInputFormat class. Several of the details for setting up a GoraVertexInputFormat are already implemented. The remaining function, shown in Listing 10-9, requires you to essentially create a GoraVertexReader.

As you saw in Chapter 7, a VertexReader is responsible for performing the bulk of the work when reading input data; that is, for converting input data to the Vertex objects that comprise the graph. A GoraVertexReader, in particular, is responsible for converting an in-memory Gora object to a Vertex. Listing 10-10 shows the single method that you have to implement to be able to read data from Gora.

Notice that the API provided is quite flexible. It passes a generic Java object to you and expects you to transform it to a vertex. This object is nothing more than the in-memory representation of a user, which you saw earlier, and your implementation of the input format specifies how to translate this to a Giraph Vertex object. Let’s look at the implementation in Listing 10-11.

Before getting into the details, notice how this class takes the types of the vertex ID, vertex value, and the edge value as parameters. Similar to the application scenario in the previous section, you represent the ID as an object of type Text, the value of the vertex (that is, the computed distance) as an object of type DoubleWritable, and finally, since you do not care about the value of edges, you use objects of type NullWritable. The implementation is shown in Listing 10-11.

#1 Create an empty vertex object

#2 Cast the input generic object to the GoraUser specific type

#3 Extract the name and create a new vertex Id of type Text out of it

#4 The initial value of the vertex should be 0

#5 You initialize the empty vertex object with its id and value

#6 For every entry in the shared articles map for this user

#7 Create an edge with the key of the entry as the target id

#8 Each newly created edge, you add it to the vertex

#9 At this point you have created our vertex object and can return it

Now, let’s dive into the implementation of the API. The first action is to create an empty Vertex object that you fill with information from the Gora object. Recall that Giraph already provides some utility methods to instantiate objects, like edges and vertices. In this case, you can simply call getConf().createVertex() to create an empty, uninitialized Vertex object. Next, you will see how to fill it with the necessary information: its value and its edges.

Next, you need to extract the information for the in-memory object. Since the API input is a generic object, you first need to cast it to the internal representation that you expect it to have; in this case, the GoraUser class. If there is a disagreement between the type that you are trying to cast it to and the actual representation of the objects in memory, this naturally throws an exception at runtime.

First, you extract the ID of the vertex. For this, you are going to use the “name” field of the GoraUser object, assuming that the username uniquely identifies a user. As you have already seen, in Giraph you typically represent IDs of type string as Text objects. Similarly, out the string field, you create a new Text object to represent the vertex ID.

Then, you are going to set the value of the vertex. Recall that in the algorithm that computes distances, the initial value of a vertex is zero. All you have to do is create a DoubleWritable object set to 0. Now, that you have created the ID and the value of the vertex, you can call the Vertex.initialize() method to set these values in your newly created Vertex object.

Finally, you are going to extract the edges of the vertex. Here, you assume that each user object stores the articles that the user shares with others in a map. The vertex reader implementation iterates over all the map entries containing user-article pairs and creates an edge using the EdgeFactory utility class. Each edge created has the corresponding username as the target ID, which is the key of the map entry. In this scenario, you do not need an edge value, so you set it to NullWritable. Once you have created an Edge object, you can add it to the vertex using the familiar Vertex.addEdge() API.

Writing Output Data

Next, you learn how to output. In this case, you need to convert the Giraph representation of vertices and objects to the Gora in-memory model. Again, you are not going to worry about how Gora persists objects—this is left to the underlying system, simplifying your job here. Giraph abstracts the details of this process by providing output format implementations—the GoraVertexOutputFormat and the GoraEdgeOutputFormat—and requires you to implement a couple of methods, making the process easy.

Before getting into the details of the output format implementations, as always, you need to make a choice about whether you need a vertex-based or an edge-based output format. In this case, you want to output the per-vertex distances that the job calculates, so a vertex-based output format is the natural choice. Next, let’s look at the API that you have to implement.

Recall that VertexWriters are the way Giraph outputs a Vertex object to any storage system. This is not different here. Giraph abstracts the details into the GoraVertexWriter class, shown in Listing 10-12.

The interface is quite simple and requires you to implement only two methods. The first method, getGoraKey(), takes the Vertex object as input and returns a key. This is going to be used to uniquely identify this object in the in-memory storage. Normally, the Gora key coincides with the vertex ID, used to uniquely identify the vertex. You will see this in action in a bit.

The second method that you have to implement, getGoraVertex(), takes the Vertex object as input and returns an object that implements the Persistent interface of Gora. Even though you can have your own implementation of the Persistent interface for storing the output, Giraph already provides a generic implementation—called GVertexResult—that allows you to store any information you want about a vertex, such as its ID and value. In the following examples, you will see this put in use.

First, let’s take a look at the implementation of the input format shown in Listing 10-13. As mentioned, Giraph abstracts many of the details through the GoraVertexOutputFormat, an abstract class that you have to extend. Call your implementation the GoraUserVertexOutputFormat. As usual, you need to implement the createVertexWriter() method that returns an object implementing the VertexWriter interface that does the bulk of the work. Let’s call this GoraUserVertexWriter. Now let’s look at the VertexWriter implementation in more detail.

#1 Exctract the vertex ID and return its string representation as the Gora key

#2 Create an empty GVertexResult object to hold the output for this vertex

#3 Set its vertex ID using the provided API

#4 Similarly, set its vertex value

First, let’s take a look at the implementation of the getGoraKey() method. This method should return an object used to uniquely identify the object in memory. Even though this is not necessary, using the vertex ID is a perfect candidate for this, as you already know that it uniquely identifies a vertex. For the Gora key, you do not need a Writable object, such as Text, so you can just extract the String representation of the key and return it.

Next, let’s look into the implementation of the getGoraVertex() method, which performs most of the work. In this method, you need to extract all the necessary information that you want to store in a Vertex object and encapsulate it in an object of a type that Gora understands; that is, an object that implements the Persistent interface. The Giraph source code contains a generic object—called GVertexResult—that implements this interface and allows you to store information in it. We are not going to describe its implementation in detail here; the important aspect is that it abstracts the details of the Persistent interface and provides simple API calls, such as getting and setting the ID, value, and edges of a vertex. The implementation handles the details of converting those fields to the correct underlying representation.

Let’s now look at the body of the method implementation. The first action is to create a new object of type GVertexResult that is going to hold the output. Recall that in this particular scenario, the output you care about is the vertex ID and its value; that is, the computed distance. For this, you use the setVertexId() and setVertexValue() methods of the GVertexResult class. Even though you do not show it here, the internal implementations of these methods convert the string and double values into the expected formats of the GVertexResult.

Further, although this is not necessary in your particular scenario, the GVertexResult allows you to set the edges of the vertex as well. The data structure used to maintain information about the edges is essentially a map. The code fragment in Listing 10-14 shows how to use it.

#1 Iterate of the edges of a vertex

#2 Access the edges data structure throught the getEdges() method

#3 Use the put() API to insert the target ID and edge value

This concludes accessing data from Gora. Although you didn’t learn it here, Giraph also provides an implementation of an edge-based output format for writing to Gora. Note also that even though the Giraph code base implementations of the output formats and interfaces, such as the GVertexResult, would cover most of your needs, you may still need to customize them. The existing implementations, though, serve as perfect guides to extending your own implementations.

Summary

In previous chapters, you saw how to use HDFS as the basic storage for data input and output. Giraph, however, provides a flexible API that allows the extension to a variety of storage systems. In this chapter, you explored table-based and in-memory storage systems.

  • Table-based storage systems have become quite popular because they allow the storage of and the query of semistructured data.
  • Among other table-based storage systems, Giraph provides vertex-based and edge-based APIs for accessing data in Hive tables. Typically, you are transforming a single Hive table row to a Vertex or an Edge object.
  • In-memory storage systems are another popular class of systems that allow fast access to data. Apache Gora provides an in-memory data model that abstracts the details of the underlying storage and allows you to plug it into different storage systems for persistence.
  • Giraph further hides many of the complexities of reading data from and writing data to the Gora in-memory model. You can use the GoraVertexInputFormat and GoreEdgeInputFormat APIs to convert generic Gora objects to and from Giraph Vertex and Edge objects.

By now you should be familiar with accessing data from a variety of data stores. Giraph provides interfaces to more systems than covered in this chapter. You should take a look at the code base and documentation for more information. You continue in the next chapter with more advanced features—in particular, tuning and performance.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset