Tabular formats – CSV

In this section, we will be covering text data, but in a tabular format—CSV. The following topics will be covered:

  • Saving data in CSV format
  • Loading CSV data
  • Testing

Saving CSV files is even more involved than JSON and plain text because we need to specify whether we want to retain headers of our data in our CSV file.

First, we will create a DataFrame:

test("should save and load CSV with header") {
//given
import spark.sqlContext.implicits._
val rdd = spark.sparkContext
.makeRDD(List(UserTransaction("a", 100), UserTransaction("b", 200)))
.toDF()

Then, we will use the write format CSV. We also need to specify that we don't want to include the header option in it:

//when
rdd.coalesce(1)
.write
.format("csv")
.option("header", "false")
.save(FileName)

We will then perform a test to verify whether the condition is true or false:

    //when
rdd.coalesce(1)
.write
.format("csv")
.option("header", "true")
.save(FileName)

Also, we don't need to add any additional dependency to support CSV, as required in the previous versions.

We will then specify the read mode, which should be similar to the write mode, and we need to specify whether we have a header or not:

val fromFile = spark.read.option("header", "false").csv(FileName)

Let's start the test and check the output:

+---+---+
|_c0|_c1|
+---+---+
| a|100|
| b|200|
+---+---+

In the preceding code output, we can see that the data is loaded, but we lost our schema. c0 and c1 are the aliases for column 0 (c0) and column 1 (c1) that were created by Spark.

So, if we are specifying that the header should retain that information, let's specify the header at the write and also at the read:

val fromFile = spark.read.option("header", "true).csv(FileName)

We will specify that the header should retain our information. In the following output, we can see that the information regarding the schema was perceived throughout the read and write operation:

+------+------+
|userId|amount|
+------+------+
| a| 100|
| b| 200|
+------+------+

Let's see what happens if we write with the header and read without it. Our test should fail, as demonstrated in the following code screenshot:

In the preceding screenshot, we can see that our test failed because we don't have a schema as we were reading without headers. The first record, which was a header, was treated as the column value.

Let's try a different situation, where we are writing without header and reading with header:

  //when
rdd.coalesce(1)
.write
.format("csv")
.option("header", "false")
.save(FileName)

val fromFile = spark.read.option("header", "false").csv(FileName)

Our test will fail again because this time, we treated our first record as the header record.

Let's set both the read and write operations with header and test our code after removing the comment we added previously:

override def afterEach() {
val path = Path(FileName)
path.deleteRecursively()
}

The CSV and JSON files will have schema, but with less overhead. Therefore, it could be even better than JSON.

In the next section, we'll see how we can use a schema-based format as a whole with Spark.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset