Stream creation

There are several ways to create a stream, described in the Spark documentation (https://spark.apache.org/docs/2.1.1/structured-streaming-programming-guide.html), including socket-based, Kafka, or file-based streams. In this chapter, we will use file-based streams, streams that are pointed to a directory and deliver all the new files that appear in the directory.

Moreover, our application will read CSV files; thus, we will connect the stream input with the Spark CSV parser. We also need to configure the parser with the input data schema, which we exported from the mode-training application. Let's load the schema first:

def loadSchema(srcFile: File): StructType = {
import org.apache.spark.sql.types.DataType
StructType(
DataType.fromJson(scala.io.Source.fromFile(srcFile).mkString).asInstanceOf[StructType].map {
case StructField(name, dtype, nullable, metadata) =>StructField(name, dtype, true, metadata)
case rec => rec
}
)
}
val inputSchema = Chapter8Library.loadSchema(new File(modelDir, "inputSchema.json"))

The loadSchema method modifies the loaded schema by marking all the loaded fields as nullable. This is a necessary step to allow input data to contain missing values in any column, not only in columns that contained missing values during model training.

In the next step, we will directly configure a CSV parser and the input stream to read CSV files from a given data folder:

val inputDataStream = spark.readStream
.schema(inputSchema)
.option("timestampFormat", "MMM-yyy")
.option("nullValue", null)
.CSV(s"${dataDir}/*.CSV")

The CSV parser needs a minor configuration to set up the format for timestamp features and representation of missing values. At this point, we can even explore the structure of the stream:

inputDataStream.schema.printTreeString()

The output is as follows:

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset