Exploring data

The first step involves data load. In the case of multiple files, the SparkContext's method wholeTextFiles provides the functionality we need. It reads each file as a single record and returns it as a key-value pair, where the key contains the location of the file and the value holds the file content. We can reference input files directly via the wildcard pattern data/subject*. This is not only useful during loading files from a local filesystem but especially important for loading files from HDFS as well.

val path = s"${sys.env.get("DATADIR").getOrElse("data")}/subject*"
val dataFiles = sc.wholeTextFiles(path)
println(s"Number of input files: ${dataFiles.count}")

Since the names are not part of the input data, we define a variable that is going to hold the column names:

val allColumnNames = Array( 
  "timestamp", "activityId", "hr") ++ Array( 
  "hand", "chest", "ankle").flatMap(sensor => 
    Array( 
      "temp",  
      "accel1X", "accel1Y", "accel1Z", 
      "accel2X", "accel2Y", "accel2Z", 
      "gyroX", "gyroY", "gyroZ", 
      "magnetX", "magnetY", "magnetZ", 
      "orientX", "orientY", "orientZ"). 
    map(name => s"${sensor}_${name}")) 

We simply defined the first three column names, and then column names for each of three position sensors. Furthermore, we also prepared a list of column indexes which are useless for modeling, including timestamp and orientation data:

val ignoredColumns =  
  Array(0,  
    3 + 13, 3 + 14, 3 + 15, 3 + 16, 
    20 + 13, 20 + 14, 20 + 15, 20 + 16, 
    37 + 13, 37 + 14, 37 + 15, 37 + 16) 

The next step is to process the content of the referenced files and create an RDD which we use as input for data exploration and modeling. Since we are expecting to iterate over the data several times and perform different transformations, we are going to cache the data in memory:

val rawData = dataFiles.flatMap { case (path, content) =>  
  content.split("
") 
}.map { row =>  
  row.split(" ").map(_.trim). 
  zipWithIndex. 
  map(v => if (v.toUpperCase == "NAN") Double.NaN else v.toDouble). 
  collect {  
    case (cell, idx) if !ignoredColumns.contains(idx) => cell 
  } 
} 
rawData.cache() 
 
println(s"Number of rows: ${rawData.count}") 

The output is as follows:

In this case, for each key-value pair we extract its content and split it based on line boundaries. Then we transform each line based on the file delimiter, which is a space between features. Since the files contains only numeric values and the string value NaN as a marker for missing values, we can simply transform all values into Java's Double, leaving Double.NaN as a representation for a missing value.

We can see our input file has 977,972 rows. During loading, we also skipped the timestamp column and columns which were marked as invalid in the dataset description (see the ignoredColumns array).

The RDD's interface follows the design principle of functional programming, the same principle which is adopted by the Scala programming language. This shared concept brings a uniform API for manipulating data structures; on the other hand, it is always good to know when an operation is invoked on a local object (array, list, sequence) and when it causes a distribution operation (RDD).

To keep our view of the dataset consistent, we also need to filter column names based on the list of ignored columns which was prepared in previous steps:

import org.apache.spark.utils.Tabulizer._
val columnNames = allColumnNames.
zipWithIndex.
filter { case (_, idx) => !ignoredColumns.contains(idx) }.
map { case (name, _) => name }

println(s"Column names:${table(columnNames, 4, None)}")

The output is as follows:

It is always good to get rid of data which is useless for modeling. The motivation is to mitigate memory pressure during computation and modeling. For example, good targets for data removal are columns which contain random IDs, timestamps, constant columns, or columns which are already represented in the dataset.
From an intuitive point of view also, modelling ID terms, for example, doesn't make a lot of sense given the nature of the field. Feature selection is a hugely important topic and one that we will spend a great deal of time on later in the book.

Now let's look at the distribution of the individual activities in our dataset. We are going to use the same trick as in the previous chapter; however, we also would like to see actual names of activities instead of pure number-based representation. Hence, at first we define mapping describing a relation between an activity number and its name:

val activities = Map( 
  1 -> "lying", 2 -> "sitting", 3 -> "standing", 4 -> "walking",  
  5 -> "running", 6 -> "cycling", 7 -> "Nordic walking",  
  9 -> "watching TV", 10 -> "computer work", 11 -> "car driving", 
 12 -> "ascending stairs", 13 -> "descending stairs",  
 16 -> "vacuum cleaning", 17 -> "ironing", 
 18 -> "folding laundry", 19 -> "house cleaning", 
 20 -> "playing soccer", 24 -> "rope jumping", 0 -> "other") 
 

Then we compute the number of individual activities in the data with the help of the Spark method reduceByKey:

val dataActivityId = rawData.map(l => l(0).toInt)

val activityIdCounts = dataActivityId.
map(n => (n, 1)).
reduceByKey(_ + _)

val activityCounts = activityIdCounts.
collect.
sortBy { case (activityId, count) =>
-count
}.map { case (activityId, count) =>
(activitiesMap(activityId), count)
}

println(s"Activities distribution:${table({activityCounts})}")

The command computes the count of individual activities, translates the activity number to its label, and sorts the result in descending order based on counts:

Or visualized based on activity frequencies as shown in Figure 2.

Figure 2: Frequencies of different activities in input data.
It is always good to think about the order of the individual transformations which are applied on the data. In the preceding example, we applied the sortBy transformation after collecting all data locally with help of the Spark collect action. In this context, it makes perfect sense since we know that the result of the collect action is reasonably small (we have only 22 activity labels) and sortBy is applied on the local collection. On the other hand, putting sortBy before the collect action would force invocation of Spark RDD's transformation and scheduling sort as Spark distributed task.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset