HDFS and Hive in Python

This book is about Python for geospatial development, so in this section, you will learn how to use Python for HDFS operations and Hive queries. There are several database wrapper libraries with Python and Hadoop, but it does not seem like a single library has become a standout go-to library, and others, like Snakebite, don't appear ready to run on Python 3. In this section, you will learn how to use two libraries—PyHive and PyWebHDFS. You will also learn how you can use the Python subprocess module to execute HDFS and Hive commands.

To get PyHive, you can use conda and the following command:

conda install -c blaze pyhive

You may also need to install the sasl library:

conda install -c blaze sasl

The previous libraries will give you the ability to run Hive queries from Python. You will also want to be able to move files to HDFS. To do so, you can install pywebhdfs:

conda install -c conda-forge pywebhdfs

The preceding command will install the library, and as always, you can also use pip install or use any other method.

With the libraries installed, let's first look at pywebhdfs.

The documentation for pywebhdfs is located at: http://pythonhosted.org/pywebhdfs/

To make a connection in Python, you need to know the location of your Hive server. If you have followed this chapter, particularly the configuration changes in /etc/hosts—you can do so using the following code:

from pywebhdfs.webhdfs import PyWebHdfsClient as h
hdfs=h(host='sandbox.hortonworks.com',port='50070',user_name='raj_ops')

The previous code imports the PyWebHdfsClient as h. It then creates the connection to the HDFS file system running in the container. The container is mapped to sandbox. hortonworks.com, and HDFS is on port 50070. Since the examples have been using the raj_ops user, the code did so as well.

The functions now available to the hdfs variable are similar to your standard terminal commands, but with a different name—mkdir is now make_dir and ls is list_dir. To delete a file or directory, you will use delete_file_dir. The make and delete commands will return True if they are successful.

Let's look at the root directory of our HDFS file system using Python:

ls=hdfs.list_dir('/')

The previous code issued the list_dir command (ls equivalent) and assigned it to ls. The result is a dictionary with all the files and folders in the directory.

To see a single record, you can use the following code:

ls['FileStatuses']['FileStatus'][0]

The previous code gets to the individual records by using the dictionary keys FileStatuses and FileStatus.

To get the keys in a dictionary, you can use .keys(). ls.keys() which returns [FileStatuses], and ls['FileStatuses'].keys() which returns ['FileStatus'].

The output of the previous code is shown as follows:

{'accessTime': 0, 'blockSize': 0, 'childrenNum': 1, 'fileId': 16404, 'group': 'hadoop', 'length': 0, 'modificationTime': 1510325976603, 'owner': 'yarn', 'pathSuffix': 'app-logs', 'permission': '777', 'replication': 0, 'storagePolicy': 0, 'type': 'DIRECTORY'}

Each file or directory contains several pieces of data, but most importantly the type, owner, and permissions.

The first step to running a Hive query example is to move our data files from the local machine to HDFS. Using Python, you can accomplish this using the following code:

hdfs.make_dir('/samples',permission=755)
f=open('/home/pcrickard/sample.csv')
d=f.read()
hdfs.create_file('/samples/sample.csv',d)

The previous code creates a directory called samples with the permissions 755. In Linux, permissions are based on read (4), write (2), and execute (1) for three types of users—owner, and group, other. So, permissions of 755 mean that the owner has read, write, and execute permissions (4+2+1 =7), and that the group and others have read and execute (4+1=5).

Next, the code opens and reads the CSV file we want to transfer to HDFS and assigns it to the variable d. The code then creates the sample.csv file in the samples directory, passing the contents of d.

To verify that the file was created, you can read the contents of the file using the following code:

hdfs.read_file('/samples/sample.csv')

The output of the previous code will be a string of the CSV file. It was created successfully.

Or, you can use the following code to get the status and details of the file:

hdfs.get_file_dir_status('/samples/sample.csv')

The previous code will return the details as follows, but only if the file or directory exists. If it does not, the preceding code will raise a FileNotFound error. You can wrap the preceding code in a try...except block:

{'FileStatus': {'accessTime': 1517929744092, 'blockSize': 134217728, 'childrenNum': 0, 'fileId': 22842, 'group': 'hdfs', 'length': 47, 'modificationTime': 1517929744461, 'owner': 'raj_ops', 'pathSuffix': '', 'permission': '755', 'replication': 1, 'storagePolicy': 0, 'type': 'FILE'}}

With the data file transferred to HDFS, you can move on to querying the data with Hive.


The documentation for PyHive is located at: https://github.com/dropbox/PyHive

Using pyhive, the following code will create a table:

from pyhive import hive
c=hive.connect('sandbox.hortonworks.com').cursor()
c.execute('CREATE TABLE FromPython (age int, name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ","')

The previous code imports pyhive as hive. It creates a connection and gets the cursor. Lastly, it executes a Hive statement. Once you have the connection and the cursor, you can make your SQL queries by wrapping them in the .execute() method. To load the data from the CSV in HDFS into the table and then to select all, you would use the following code:

c.execute("LOAD DATA INPATH '/samples/sample.csv' OVERWRITE INTO TABLE FromPython")
c.execute("SELECT * FROM FromPython")
result=c.fetchall()

The preceding code uses the execute() method two more times to load the data and then executes select all. Using fetchall(), the results are passed to the result variable and will look like they do in the following output:

[(40, ' Paul'), (23, ' Fred'), (72, ' Mary'), (16, ' Helen'), (16, ' Steve')]

Working with pyhive is just like working with psycopg2  the Python library for connecting to PostgreSQL. Most database wrapper libraries are very similar in that you make a connection, get a cursor, and then execute statements. Results can be retrieved by grabbing all, one, or next (iterable).

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset