© Raju Kumar Mishra and Sundar Rajan Raman 2019
Raju Kumar Mishra and Sundar Rajan RamanPySpark SQL Recipeshttps://doi.org/10.1007/978-1-4842-4335-0_3

3. IO in PySpark SQL

Raju Kumar Mishra1  and Sundar Rajan Raman2
(1)
Bangalore, Karnataka, India
(2)
Chennai, Tamil Nadu, India
 
Reading data from different types of file formats and saving the result to many data sinks is an inevitable part the data scientist’s job. In this chapter, we are going to learn the following recipes. Through these recipes, we will learn how to read data from different types of data sources and how to save the results of the analysis to different data sinks.
  • Recipe 3-1. Read a CSV file

  • Recipe 3-2. Read a JSON file

  • Recipe 3-3. Save a DataFrame as a CSV file

  • Recipe 3-4. Save a DataFrame as a JSON file

  • Recipe 3-5. Read ORC files

  • Recipe 3-6. Read a Parquet file

  • Recipe 3-7. Save a DataFrame as an ORC file

  • Recipe 3-8. Save a DataFrame as a Parquet file

  • Recipe 3-9. Read data from MySQL

  • Recipe 3-10. Read data from PostgreSQL

  • Recipe 3-11. Read data from Cassandra

  • Recipe 3-12. Read data from MongoDB

  • Recipe 3-13. Save a DataFrame to MySQL

  • Recipe 3-14. Save a DataFrame to PostgreSQL

  • Recipe 3-15. Save a DataFrame to MongoDB

  • Recipe 3-16. Read data from Apache Hive

Recipe 3-1. Read a CSV File

Problem

You want to read a CSV (comma-separated value) file.

Solution

CSV files (see Figure 3-1) are one of the most used and most popular file types. A data scientist or data engineer will encounter CSV files in their day-to-day experiences.
../images/469054_1_En_3_Chapter/469054_1_En_3_Fig1_HTML.png
Figure 3-1

A sample CSV file

This CSV file shown in Figure 3-1 is named swimmerData.csv . This tabular form has four columns—id, Gender, Occupation, and swimTimeInSecond. The first three columns contain data in the String datatype. The last column uses the float or double datatype.

We can read a CSV file using the spark.read.csv() function . Here, spark is object of the SparkSession class. This function has many arguments but we are going to discuss the more important ones. We specify the path of the CSV file, which has to be read by the path argument. The PySpark SQL DataFrame has a tabular structure similar to RDBMS tables. Therefore, we have to specify the schema of the DataFrame. We specify the schema of the DataFrame using the second argument of the csv() function, that is, schema.

The CSV name is misleading, because these files might have any character as a data field separator. We specify the data field separator using the argument sep. If the file has a header, we can specify it using the header argument. If the value of the argument header is None, this indicates that there is no header in the file. But if there is a header, we must set the value of the header argument to True. The function is smart and can infer the schema if the inferSchema argument has been set to True.

How It Works

Let’s start by creating the schema of the DataFrame.

Step 3-1-1. Creating the Schema of the DataFrame

In our DataFrame, we have four columns. At first, we are going to define the columns. We define columns using the StructField() function. PySpark SQL has its own datatypes. All datatypes for PySpark SQL have been defined in the submodule named pyspark.sql.types. We have to import everything from pyspark.sql.types as follows.
In [1]: from pyspark.sql.types import *
After importing the required submodule, we are going to define the first column of the DataFrame.
In [2]: idColumn = StructField("id",StringType(),True)
Let’s look at the arguments of StructField(). The first argument is the column name. We provide the column name as id. The second argument is the datatype of the elements of the column. The datatype of the first column is StringType(). If some ID is missing then some element of a column might be null. The last argument, whose value is True, tells you that this column might have null values or missing data.
In [3]: genderColumn = StructField("Gender",StringType(),True)
In [4]: OccupationColumn = StructField("Occupation",StringType(),True)
In [5]: swimTimeInSecondColumn = StructField("swimTimeInSecond",DoubleType(),True)
We have created a StructField for each column. Now we have to create a schema of the full DataFrame. We can create that using a StructType object, as the following code line is depicting.
In [6]: columnList = [idColumn, genderColumn, OccupationColumn, swimTimeInSecondColumn]
In [7]: swimmerDfSchema = StructType(columnList)
swimmerDfSchema is the full schema of our DataFrame.
In [8]: swimmerDfSchema
Here is the output:
Out[8]: StructType(List(StructField(id,StringType,true),
                                     StructField(Gender,StringType,true),
                                      StructField(Occupation,StringType,true),
                                      StructField(swimTimeInSecond,DoubleType,true)))
                                 )

The schema of the four columns of the DataFrame can be observed by using the swimmerDfSchema variable.

Step 3-1-2. Reading a CSV File

We have created our schema. We are now going to read the CSV file.
In [9]: swimmerDf = spark.read.csv('data/swimmerData.csv',
    ...:                           header=True, schema=swimmerDfSchema
    ...: )
In [10]: swimmerDf.show(4)
Here is the output:
+---+------+-----------+----------------+
| id|Gender| Occupation|swimTimeInSecond|
+---+------+-----------+----------------+
|id1|  Male| Programmer|           16.73|
|id2|Female|    Manager|           15.56|
|id3|  Male|    Manager|           15.15|
|id4|  Male|RiskAnalyst|           15.27|
+---+------+-----------+----------------+
only showing top 4 rows
In [13]: swimmerDf.printSchema()
Here is the output:
root
 |-- id: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- swimTimeInSecond: double (nullable = true)

We read the file and created the DataFrame.

Step 3-1-3. Reading a CSV File Using the Argument inferSchema Value as True

Now we are going to read the same file, swimmerData.csv. This time we are not going to provide the schema. We will let PySpark SQL infer the schema of the DataFrame. If we set the value of the inferSchema argument to True in the csv() function, PySpark SQL will try to infer the schema.
In [14]: swimmerDf = spark.read.csv('swimmerData.csv',
                                    header=True, inferSchema=True)
In [15]: swimmerDf.show(4)
+---+------+-----------+----------------+
| id|Gender| Occupation|swimTimeInSecond|
+---+------+-----------+----------------+
|id1|  Male| Programmer|           16.73|
|id2|Female|    Manager|           15.56|
|id3|  Male|    Manager|           15.15|
|id4|  Male|RiskAnalyst|           15.27|
+---+------+-----------+----------------+
only showing top 4 rows
The DataFrame called swimmerDf has been created. We have used the inferSchema argument set to True. It is better to check if the schema created the DataFrame. So let let’s print it using the printSchema() function.
In [16]:  swimmerDf.printSchema()
Here is the output:
root
 |-- id: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- swimTimeInSecond: double (nullable = true)

PySpark SQL has inferred the schema correctly.

Recipe 3-2. Read a JSON File

Problem

You want to read a JSON (JavaScript Object Notation) file.

Solution

We have been given a JSON data file called corrData.json . The contents of the file look like this:
{"iv1":5.5,"iv2":8.5,"iv3":9.5}
{"iv1":6.13,"iv2":9.13,"iv3":10.13}
{"iv1":5.92,"iv2":8.92,"iv3":9.92}
{"iv1":6.89,"iv2":9.89,"iv3":10.89}
{"iv1":6.12,"iv2":9.12,"iv3":10.12}

Imagine this data in tabular form. We can find that the data has three columns called iv1, iv2, and iv3. Each column has decimal or floating point values.

The spark.read.json() function will read JSON files. Like the csv() function, there are many arguments in the json() function. The first argument is path and it determines the location of the file to be read. The second argument is schema and it determines the schema of the DataFrame to be created.

How It Works

Let’s read our corrData.json file.
In [1]: corrData = spark.read.json(path='corrData.json')
In [2]: corrData.show(6)
Here is the output:
+---+-----+-----+
|iv1|  iv2|  iv3|
+----+----+-----+
| 5.5| 8.5|  9.5|
|6.13|9.13|10.13|
|5.92|8.92| 9.92|
|6.89|9.89|10.89|
|6.12|9.12|10.12|
|6.32|9.32|10.32|
+----+----+-----+
only showing top 6 rows

We have created the DataFrame successfully.

Whenever, we do not provide the schema of DataFrame explicitly. It is better to check the schema of the newly created DataFrame to ensure that everything is as expected.
In [3]: corrData.printSchema()
Here is the output:
root
 |-- iv1: double (nullable = true)
 |-- iv2: double (nullable = true)
 |-- iv3: double (nullable = true)

And everything is as expected, as the datatype of each column is double or DoubleType() . So, while reading a JSON file, if we do not provide the schema, PySpark SQL will infer it.

Recipe 3-3. Save a DataFrame as a CSV File

Problem

You want to save the contents of a DataFrame to a CSV file.

Solution

Whenever we think to write or save a DataFrame to an external storage system, we can use the DataFrameWriter class and the methods defined in it. We can access DataFrameWriter using DataFrame.write. So, if we want to save our DataFrame as a CSV file, we have to use the DataFrame.write.csv() function.

Similar to the spark.read.csv() function, the DataFrame.write.csv() function has many arguments. Let’s discuss its three arguments—path, sep, and header. The path argument defines the directory where DataFrame will be written. We can specify the data field separator using the sep argument. If the value of the header argument is True, the header of the DataFrame will be written as the first line in the CSV file.

We are going to write the corrData DataFrame into the csvFileDir directory. We created the corrData DataFrame in Recipe 3-2.

Note

You can read more about the DataFrameWriter class from the following link

https://spark.apache.org/docs/2.3.0/api/python/_modules/pyspark/sql/readwriter.html#DataFrameWriter

How It Works

The following line of code will write the DataFrame to a CSV file.
corrData.write.csv(path='csvFileDir', header=True,sep=',')
Let’s see what’s inside the csvFileDir directory. We can check using the bash ls command as follows:
csvFileDir$ ls
Here is the output:
part-00000-eb3df2e6-8098-488d-be22-5e9db4a5cb08-c000.csv
 _SUCCESS
We can see there are two files in the directory. Let’s first discuss the second file, called _SUCCESS. This file just tells that our write operation is successful. The actual contents of the DataFrame are inside the part-00000-eb3df2e6-8098-488d-be22-5e9db4a5cb08-c000.csv file. Now let’s investigate the contents of that file.
$ head -5 part-00000-eb3df2e6-8098-488d-be22-5e9db4a5cb08-c000.csv
Here is the output:
iv1,iv2,iv3
5.5,8.5,9.5
6.13,9.13,10.13
5.92,8.92,9.92
6.89,9.89,10.89

Now it is confirmed that the contents have been written to a CSV file.

Recipe 3-4. Save a DataFrame as a JSON File

Problem

You want to save the contents of a DataFrame as a JSON file.

Solution

In order to save a DataFrame as a JSON file, we are going to use the DataFrameWriter class function called json(). We are going to save the swimmerDf DataFrame as a JSON file.

How It Works

The following line will easily save our DataFrame as a JSON file in the jsonData directory.
In [1]: swimmerDf.write.json(path='jsonData')
Now let’s visualize the contents of the jsonData directory.
$ ls
Here is the output:
part-00000-51e76de8-127f-4549-84a8-7ea972632a4d-c000.json
 _SUCCESS

The file called part-00000-51e76de8-127f-4549-84a8-7ea972632a4d-c000.json contains the swimmerDf DataFrame data.

We can see that the data has been written in the JSON format correctly. We are going to use the head shell command to print the first four records.
$ head -4 part-00000-51e76de8-127f-4549-84a8-7ea972632a4d-c000.json
Here is the output:
{"id":"id1","Gender":"Male","Occupation":"Programmer","swimTimeInSecond":16.73}
{"id":"id2","Gender":"Female","Occupation":"Manager","swimTimeInSecond":15.56}
{"id":"id3","Gender":"Male","Occupation":"Manager","swimTimeInSecond":15.15}
{"id":"id4","Gender":"Male","Occupation":"RiskAnalyst","swimTimeInSecond":15.27}

PySpark SQL saved the DataFrame’s contents into the JSON file correctly.

Recipe 3-5. Read ORC Files

Problem

You want to read an ORC (Optimized Row Columnar) file.

Solution

The ORC file format was mainly developed for Apache Hive to make Hive queries faster on datasets in the ORC format. See Figure 3-2.
../images/469054_1_En_3_Chapter/469054_1_En_3_Fig2_HTML.jpg
Figure 3-2

A sample ORC file

The table data shown in Figure 3-2 is in an ORC file in the duplicateData directory. We have to read it.

We are going to read the ORC file using the spark.read.orc() function. Remember that spark is the object of the SparkSession class.

Note

More about ORC files can be found from the following link

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC

How It Works

We are going to read our data from the ORC file inside the duplicateData directory.
In [1]: duplicateDataDf = spark.read.orc(path='duplicateData')
In [2]: duplicateDataDf.show(6)
Here is the output:
+---+---+-----+
|iv1|iv2|  iv3|
+---+---+-----+
| c1| d2|  9.8|
| c1| d2| 8.36|
| c1| d2| 9.06|
| c1| d2|11.15|
| c1| d2| 6.26|
| c2| d2| 8.74|
+---+---+-----+
only showing top 6 rows

Recipe 3-6. Read a Parquet File

Problem

You want to read a Parquet file.

Solution

Apache Parquet is an open source file format developed for Apache Hadoop. It uses a columnar data storage format. It provides encoding to the data and efficient data compression.

We have a Parquet file in a directory called temperatureData. We can read the Parquet file using the spark.read.parquet() function of PySpark SQL.

Note

You can read more about Parquet files from the following links

https://parquet.apache.org/documentation/latest/

https://en.wikipedia.org/wiki/Apache_Parquet

How It Works

We have been given data in a Parquet file. The name of the data directory is temperatureData.
In [2]: tempDf = spark.read.parquet('temperatureData')
In [3]: tempDf.show(6)
Here is the output:
+----+-------------+
| day|tempInCelsius|
+----+-------------+
|day1|         12.2|
|day2|         13.1|
|day3|         12.9|
|day4|         11.9|
|day5|         14.0|
|day6|         13.9|
+----+-------------+
only showing top 6 rows

We can see that we have the DataFrame.

Recipe 3-7. Save a DataFrame as an ORC File

Problem

You want to save a DataFrame as an ORC file.

Solution

We are going to save the swimmerDf DataFrame to an ORC file. We are going to save it in a directory called orcData. We are going to use the orc() function of the DataFrameWriter class to save the DataFrame as an ORC file.

How It Works

The following line of code will save our swimmerDf DataFrame content to an ORC file in the orcData directory.
In [1]: swimmerDf.write.orc(path='orcData')
Let’s see what’s inside the orcData directory.
$ ls
Here is the output:
part-00000-4252c3c8-5f7d-48b2-8bc0-48426cb8d2e4-c000.snappy.orc  _SUCCESS

Recipe 3-8. Save a DataFrame as a Parquet File

Problem

You want to save a DataFrame as a Parquet file.

Solution

We created the duplicateDataDf DataFrame in Recipe 3-5. We are going to use the parquet() function of the DataFrameWrite class.

How It Works

Using the following line of code, we can write the contents of the DataFrame into a Parquet file in the parqData directory.
In [1]: duplicateDataDf.write.parquet(path='parqData')
In order to see the contents of the parqData directory, we are going to use the ls command.
$ ls
Here is the output:
part-00000-d44eb594-46da-495a-88e9-934ca1eff270-c000.snappy.parquet _SUCCESS

Data has been written to the file called part-00000-d44eb594-46da-495a-88e9-934ca1eff270-c000.snappy.parquet.

Recipe 3-9. Read Data from MySQL

Problem

You want to read a table of data from MySQL.

Solution

MySQL is one of the most popular relational database management systems there is. Many times, we might have to fetch data from MySQL to perform analysis by PySpark SQL. In order to connect to MySQL, we need a MySQL JDBC connector. The following command line will start the PySpark shell with a MySQL JDBC connector.
pyspark --driver-class-path  ~/.ivy2/jars/mysql_mysql-connector-java-8.0.12.jar --packages mysql:mysql-connector-java:8.0.12

How It Works

The admission data is contained in the ucbdata table of the pysparksqlbook database of MySQL. We can read this data using the following line of code.
In [1]: dbURL = "jdbc:mysql://localhost/pysparksqlbook"
The ucbdata table has been created by the root user.
In [2]: ucbDataFrame = spark.read.format("jdbc").options(url = dbURL, database ='pysparksqlbook', dbtable ='ucbdata', user="root", password="").load();
The options() function is used to set different options. In the options() function, we set the value of url, the value of database, the value of table, and the user and password of the database.
In [3]: ucbDataFrame.show()
Here is the output:
+--------+------+----------+---------+
|   admit|gender|department|frequency|
+--------+------+----------+---------+
|Admitted| Male |         A|      512|
|Rejected| Male |         A|      313|
|Admitted|Female|         A|       89|
|Rejected|Female|         A|       19|
|Admitted| Male |         B|      353|
|Rejected| Male |         B|      207|
|Admitted|Female|         B|       17|
|Rejected|Female|         B|        8|
|Admitted| Male |         C|      120|
|Rejected| Male |         C|      205|
|Admitted|Female|         C|      202|
|Rejected|Female|         C|      391|
|Admitted| Male |         D|      138|
|Rejected| Male |         D|      279|
|Admitted|Female|         D|      131|
|Rejected|Female|         D|      244|
|Admitted| Male |         E|       53|
|Rejected| Male |         E|      138|
|Admitted|Female|         E|       94|
|Rejected|Female|         E|      299|
+--------+------+----------+---------+
only showing top 20 rows

Recipe 3-10. Read Data from PostgreSQL

Problem

You want to read a table of data from a PostgreSQL database.

Solution

We have the firstverticaltable table in PostgreSQL. We can see the data using the select command as follows.
pysparksqldb=#  select * from firstverticaltable;
Here is the output:
  iv1|  iv2|  iv3
-----+-----+-----
    9|11.43|10.25
10.26| 8.35| 9.94
 9.84| 9.28| 9.22
11.77|10.18|11.02
(4 rows)
We have to read the firstverticaltable table using PySpark SQL. We know that in order to read data from PostgreSQL, we need a PostgreSQL JDBC connector. The following command will serve that purpose.
$pyspark --driver-class-path  ~/.ivy2/jars/org.postgresql_postgresql-42.2.4.jar  --packages org.postgresql:postgresql:42.2.4

Now in the PySpark shell, we can write commands to read data from the PostgreSQL database.

How It Works

Let’s read the data table from PostgreSQL and create the DataFrames.

First we are going to define dbURL.
In [1]: dbURL = "jdbc:postgresql://localhost/pysparksqldb?user=postgres&password=""
In [2]: verticalDfOne = spark.read.format("jdbc").options(url = dbURL, database ='pysparksqldb', dbtable ='firstverticaltable').load();
Again here, we are setting different options in the options() function as in the previous recipe.
In [3]: verticalDfOne.show()
Here is the output:
+-----+-----+-----+
|  iv1|  iv2|  iv3|
+-----+-----+-----+
|  9.0|11.43|10.25|
|10.26| 8.35| 9.94|
| 9.84| 9.28| 9.22|
|11.77|10.18|11.02|
+-----+-----+-----+

We have read the firstverticaltable table and created the verticalDfOne DataFrame.

Recipe 3-11. Read Data from Cassandra

Problem

You want to read a table of data from a Cassandra database.

Solution

Cassandra is a popular NoSQL database. Figure 3-3 shows a data table in Cassandra. The students table is in the pysparksqlbook keyspace.
../images/469054_1_En_3_Chapter/469054_1_En_3_Fig3_HTML.jpg
Figure 3-3

The students table in Cassandra

We have to read the students table using PySpark SQL. But we need a Cassandra connector. Therefore, we have to start our PySpark shell using the following command.
$ pyspark  --packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.0

Note

You can read more about connecting PySpark SQL to Cassandra on the following web page

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_python.md

How It Works

The following lines will read data tables from the Cassandra database.
In [1]: studentsDf = spark.read.format("org.apache.spark.sql.cassandra").options( keyspace="pysparksqlbook", table="students").load()
After reading data from Cassandra, let’s print it to verify that we have created the DataFrame.
In [2]: studentsDf.show()
Here is the output:
+---------+------+-------+
|studentid|gender|   name|
+---------+------+-------+
|      si3|     F|  Julie|
|      si2|     F|  Maria|
|      si1|     M|  Robin|
|      si6|     M|William|
|      si4|     M|    Bob|
+---------+------+-------+

Recipe 3-12. Read Data from MongoDB

Problem

You want to read a collection of data from MongoDB.

Solution

In the context of MongoDB, a collection is a table. In this case, we have a collection called restaurantSurvey in the pysparksqlbook database in MongoDB. Using find() on a collection will return all the documents in that collection. A document is what MongoDB calls a record.
> db.restaurantSurvey.find().pretty().limit(5)
Here is the output:
{
    "_id" : ObjectId("5ba7e6a259acc01fedb4d78a"),
    "Gender" : "Male",
    "Vote" : "Yes"
}
{
    "_id" : ObjectId("5ba7e6a259acc01fedb4d78b"),
    "Gender" : "Male",
    "Vote" : "Yes"
}
{
    "_id" : ObjectId("5ba7e6a259acc01fedb4d78c"),
    "Gender" : "Male",
    "Vote" : "No"
}
{
    "_id" : ObjectId("5ba7e6a259acc01fedb4d78d"),
    "Gender" : "Male",
    "Vote" : "DoNotKnow"
}
{
    "_id" : ObjectId("5ba7e6a259acc01fedb4d78e"),
    "Gender" : "Male",
    "Vote" : "Yes"
}

We have printed the first five records from our restaurantSurvey collection. We have to read this collection data using PySpark SQL and create a DataFrame.

Reading from MongoDB using PySpark SQL will look for some extra packages. The extra package is nothing but a MongoDB connector. We can use the following command to start the PySpark shell to include the extra package to read data from MongoDB.
$pyspark --conf "spark.mongodb.input.uri=mongodb://127.0.0.1/pysparksqlbook. restaurantSurvey?readPreference=primaryPreferred" --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.0

How It Works

We have the restaurant survey data in the restaurantSurvey collection of the pysparksqlbook database in MongoDB. The following code will read our data from MongoDB.
In [1]: surveyDf = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/pysparksqlbook.restaurantSurvey").load()
We are setting one option. That is the URI of the collection in MongoDB.
In [2]: surveyDf.show(5)
Here is the output:
+------+---------+--------------------+
|Gender|     Vote|                 _id|
+------+---------+--------------------+
|  Male|      Yes|[5ba7e6a259acc01f...|
|  Male|      Yes|[5ba7e6a259acc01f...|
|  Male|       No|[5ba7e6a259acc01f...|
|  Male|DoNotKnow|[5ba7e6a259acc01f...|
|  Male|      Yes|[5ba7e6a259acc01f...|
+------+---------+--------------------+
only showing top 5 rows

We have read our data.

Recipe 3-13. Save a DataFrame to MySQL

Problem

You want to save the contents of a DataFrame to MySQL.

Solution

In order to save a DataFrame to MySQL, we are going to use the write() function of the DataFrameWriter class.

Remember that we have to start the PySpark shell with the MySQL connector.
$pyspark --driver-class-path  ~/.ivy2/jars/mysql_mysql-connector-java-8.0.12.jar --packages mysql:mysql-connector-java:8.0.12

How It Works

Step 3-13-1. Creating a DataFrame

Let’s create a DataFrame. In order to create a DataFrame, we need the Row class. The Row class is in the pyspark.sql submodule. Let’s import the Row class now.
In [1]: from pyspark.sql import Row
We are creating a DataFrame on the fly using the following command.
In [2]: ourDf = spark.createDataFrame([
    ...:              Row(id=1, value=10.0),
    ...:              Row(id=2, value=42.0),
    ...:              Row(id=3, value=32.0)
    ...:              ])
In [3]: ourDf.show()
Here is the output:
+--+-----+
|id|value|
+--+-----+
| 1| 10.0|
| 2| 42.0|
| 3| 32.0|
+--+-----+

So the ourDf DataFrame is ready to be saved.

Step 3-13-2. Saving the DataFrame into a MySQL Database

We are going to define the url database first and then save it into a MySQL database.
In [4]: dbURL = "jdbc:mysql://localhost/pysparksqlbook"
In [5]: ourDf.write.format("jdbc").options(url = dbURL, database ='pysparksqlbook', dbtable ='mytab', user="root", password="").save()
After saving the DataFrame, let’s check if it has been saved properly. We have saved our DataFrame in the mytab table, which is inside the pysparksqlbook database.
mysql> use pysparksqlbook;
Here is the output:
Database changed
Using the select command , we can visualize the contents of the mytab table.
mysql> select * from mytab;
Here is the output:
+--+-----+
|id|value|
+--+-----+
| 1|   10|
| 3|   32|
| 2|   42|
+--+-----+
3 rows in set (0.00 sec)

We are successful.

Recipe 3-14. Save a DataFrame to PostgreSQL

Problem

You want to save a DataFrame into PostgreSQL.

Solution

Saving DataFrame content to PostgreSQL requires a PostgreSQL JDBC connector.

We can use the write() function of the DataFrameWriter class. In order to use the PostgreSQL JDBC connector, we have to start the PySpark shell using the following command.
$pyspark --driver-class-path  ~/.ivy2/jars/org.postgresql_postgresql-42.2.4.jar  --packages org.postgresql:postgresql:42.2.4

How It Works

Step 3-14-1. Creating a DataFrame

We are going to create a dummy DataFrame.
In [1]: from pyspark.sql import Row
In [2]: ourDf = spark.createDataFrame([
   ...:                   Row(iv1=1.2, iv2=10.0),
   ...:                   Row(iv1=1.3, iv2=42.0),
   ...:                   Row(iv1=1.5, iv2=32.0)
   ...:                   ])
In [3]: ourDf.show()
Here is the output:
+---+----+
|iv1| iv2|
+---+----+
|1.2|10.0|
|1.3|42.0|
|1.5|32.0|
+---+----+

Step 3-14-2. Saving a DataFrame into a PostgreSQL Database

We are going to define the url database first and then save it into the PostgreSQL database.
In [4]: dbURL = "jdbc:postgresql://localhost/pysparksqldb?user=postgres&password=""
We are saving the DataFrame contents to the mytab table.
In [5]: ourDf.write.format("jdbc").options(url = dbURL, database ='pysparksqlbook', dbtable ='mytab').save()
Let’s check if it has been saved into the PostgreSQL database.
postgres=#  c pysparksqldb;
pysparksqldb=# select * from mytab;
Here is the output:
iv1|iv2
---+---
1.3| 42
1.2| 10
1.5| 32
(3 rows)

Recipe 3-15. Save DataFrame Contents to MongoDB

Problem

You want to save DataFrame contents as a collection in MongoDB.

Solution

In order to save the DataFrame to MongoDB, we have to use the connector associated with it.

We are going to use the following line to start PySpark with a MongoDB connector.
$ pyspark --conf "spark.mongodb.input.uri=mongodb://127.0.0.1/pysparksqlbook?readPreference=primaryPreferred" --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.0

Note

You can read more about writing data to MongoDB using PySpark from the following link.

https://docs.mongodb.com/spark-connector/master/python/write-to-mongodb/

How It Works

In Recipe 3-14, we used a DataFrame called ourDf. Let’s print its contents.
In [1]: ourDf.show()
Here is the output:
+---+----+
|iv1| iv2|
+---+----+
|1.2|10.0|
|1.3|42.0|
|1.5|32.0|
+---+----+
We are going to write the contents of our ourDf DataFrame to MongoDB. We are going to write this DataFrame to a collection called mytab in the pysparksqlbook database. The following command will perform this task.
In [2]: ourDf.write.format("com.mongodb.spark.sql.DefaultSource").option("database",
    ...: "pysparksqlbook").option("collection", "mytab").option("uri",dbUrl).save()
We have written the DataFrame to MongoDB. Let’s check our data in the mytab collection, which is inside pysparksqlbook.
> use pysparksqlbook;
Here is the output:
switched to db pysparksqlbook
> db.mytab.find().pretty()
Here is the output:
{ "_id" : ObjectId("5bb51e00f17eb43aee74cb8f"), "iv1" : 1.2, "iv2" : 10 }
{ "_id" : ObjectId("5bb51e00f17eb43aee74cb91"), "iv1" : 1.3, "iv2" : 42 }
{ "_id" : ObjectId("5bb51e00f17eb43aee74cb90"), "iv1" : 1.5, "iv2" : 32 }

Finally, we have our DataFrame content in MongoDB.

Recipe 3-16. Read Data from Apache Hive

Problem

You want to read a table of data from Apache Hive.

Solution

We have a table called filamentdata in Hive. We have to read this data using PySpark SQL from Apache Hive. Let’s go through the whole process. First we are going to create a table in Hive and upload data into it. Let’s start by creating a table called filamentdata. We will create our table in the apress database of Hive. We can display all the databases in Hive using show.
hive> show databases;
Here is the output:
OK
apress
default
Time taken: 3.275 seconds, Fetched: 2 row(s)
The database is called apress, so we have to specify this database using the use command.
hive> use apress;
Here is the output:
OK
Time taken: 0.125 seconds
After using the database, we are going to create a table named filamenttable using the following command.
hive> create table filamenttable (
       >  filamenttype string,
       >  bulbpower string,
       >   lifeinhours float
        >)
        > row format delimited
        > fields terminated by ',';
We have created a Hive table with three columns. The first column is filamenttype, whose values are of string type. The second column of our table is bulbpower with a datatype of string. The third column is lifeinhours of float type. Now we can display our table creation using the show command.
hive> show tables;
Here is the output:
OK
filamenttable
Time taken: 0.118 seconds, Fetched: 1 row(s)
The required table has been created successfully. Let’s load the data into the table we created. We are loading data into Hive from a local directory. The data is being loaded using the load clause with local. This tells Hive that the data is being loaded from a local directory, not from HDFS.
hive> load data local inpath 'filamentData.csv' overwrite into table filamenttable;
Here is the output:
Loading data to table apress.filamenttable
OK
Time taken: 5.39 seconds
After the data load, we can query over the table. We can display certain rows using select and limit to limit the number of rows.
hive> select * from filamenttable limit 5;
Here is the output:
OK
filamentA    100W    605.0
filamentB    100W    683.0
filamentB    100W    691.0
filamentB    200W    561.0
filamentA    200W    530.0
Time
taken: 0.532 seconds, Fetched: 5 row(s)

We have displayed some rows of the filamenttable table. We have to read this table data using PySpark SQL.

We can read the table from Hive using PySpark SQL and the spark.table() function.

How It Works

In the table function , we have to provide the name of the table. The table name is provided in the format <databaseName>.<tableName>. In our case, the database’s name is apress and the table’s name is filamenttable. Therefore, the argument value to the table function will be apress.filamenttable.
In [1]: FilamentDataFrame = spark.table('apress.filamenttable')
In [2]: FilamentDataFrame.show(5)
Here is the output:
+------------+---------+-----------+
|filamenttype|bulbpower|lifeinhours|
+------------+---------+-----------+
|   filamentA|     100W|      605.0|
|   filamentB|     100W|      683.0|
|   filamentB|     100W|      691.0|
|   filamentB|     200W|      561.0|
|   filamentA|     200W|      530.0|
+------------+---------+-----------+
only showing top 5 rows

And finally we created a DataFrame from the table in Apache Hive.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset