First, we can start by defining a case class that represents the same data as in our JSON payload. Create a new package, coinyser, and then a class, coinyser.HttpTransaction, in src/main/scala:
package coinyser
case class HttpTransaction(date: String,
tid: String,
price: String,
`type`: String,
amount: String)
This class has the same attribute names with the same types (all string) as the JSON objects. The first step is to implement a function that transforms a JSON string into Dataset[HttpTransaction]. For this purpose, let's create a new test class, coinyser.BatchProducerSpec, in src/test/scala:
package coinyser
import org.apache.spark.sql._
import org.apache.spark.sql.test.SharedSparkSession
import org.scalatest.{Matchers, WordSpec}
class BatchProducerSpec extends WordSpec with Matchers with SharedSparkSession {
val httpTransaction1 =
HttpTransaction("1532365695", "70683282", "7740.00", "0",
"0.10041719")
val httpTransaction2 =
HttpTransaction("1532365693", "70683281", "7739.99", "0",
"0.00148564")
"BatchProducer.jsonToHttpTransaction" should {
"create a Dataset[HttpTransaction] from a Json string" in {
val json =
"""[{"date": "1532365695", "tid": "70683282", "price":
"7740.00", "type": "0", "amount": "0.10041719"},
|{"date": "1532365693", "tid": "70683281", "price":
"7739.99", "type": "0", "amount":
"0.00148564"}]""".stripMargin
val ds: Dataset[HttpTransaction] =
BatchProducer.jsonToHttpTransactions(json)
ds.collect() should contain theSameElementsAs
Seq(httpTransaction1, httpTransaction2)
}
}
}
Our test extends SharedSparkSession. This trait provides an implicit SparkSession that can be shared across several tests.
First, we defined a string containing a JSON array with two transactions that we extracted from Bitstamp's endpoint. We defined two instances of HttpTransaction that we expect to have in our Dataset outside of the test because we will reuse them in another test later on.
After the call to jsonToHttpTransaction that we are going to implement, we obtain Dataset[HttpTransaction]. However, Spark's Dataset is lazy—at this stage, nothing has been processed yet. In order to materialize Dataset, we need to force its evaluation by calling collect(). The return type of collect() here is Array[HttpTransaction], and we can therefore use ScalaTest's assertion, contain theSameElementsAs.