Deserializing live transactions

The JSON payload that we receive when we subscribe to live transactions is slightly different from the one we had in the REST endpoint for fetching batch transactions.

We are going to need to deserialize it to a case class before we can transform it to the same Transaction case class that we used in the previous chapter. For this, first create a new case class, coinyser.WebsocketTransaction in src/main/scala:

package coinyser

case class WebsocketTransaction(amount: Double,
buy_order_id: Long,
sell_order_id: Long,
amount_str: String,
price_str: String,
timestamp: String,
price: Double,
`type`: Int,
id: Int)

The names and types of the attributes correspond to the JSON attributes.

After that, we can write a unit test for a new function, deserializeWebsocketTransaction. Create a new class, coinyser.StreamingProducerSpec, in src/test/scala:

package coinyser

import java.sql.Timestamp
import coinyser.StreamingProducerSpec._
import org.scalactic.TypeCheckedTripleEquals
import org.scalatest.{Matchers, WordSpec}

class StreamingProducerSpec extends WordSpec with Matchers with TypeCheckedTripleEquals {
"StreamingProducer.deserializeWebsocketTransaction" should {
"deserialize a valid String to a WebsocketTransaction" in {
val str =
"""{"amount": 0.045318270000000001, "buy_order_id": 1969499130,
|"sell_order_id": 1969495276, "amount_str": "0.04531827",
|"price_str": "6339.73", "timestamp": "1533797395",
|"price": 6339.7299999999996, "type": 0, "id":
71826763}"""
.stripMargin
StreamingProducer.deserializeWebsocketTransaction(str) should
===(SampleWebsocketTransaction)
}
}
}

object StreamingProducerSpec {
val SampleWebsocketTransaction = WebsocketTransaction(
amount = 0.04531827, buy_order_id = 1969499130, sell_order_id =
1969495276, amount_str = "0.04531827", price_str = "6339.73",
timestamp = "1533797395", price = 6339.73, `type` = 0, id =
71826763)
}

The test is straightforward—we define a sample JSON string, call the function under test, and make sure the deserialized object SampleWebsocketTransaction contains the same values.

Now we need to implement the function. Add a new val mapper: ObjectMapper and a new function deserializeWebsocketTransaction to the StreamingProducer object:

package coinyser

import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.TimeZone

import cats.effect.IO
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.pusher.client.Client
import com.pusher.client.channel.SubscriptionEventListener
import com.typesafe.scalalogging.StrictLogging

object StreamingProducer extends StrictLogging {

def subscribe(pusher: Client)(onTradeReceived: String => Unit):
IO[Unit] =
...

val mapper: ObjectMapper = {
val m = new ObjectMapper()
m.registerModule(DefaultScalaModule)
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
sdf.setTimeZone(TimeZone.getTimeZone("UTC"))
m.setDateFormat(sdf)
}

def deserializeWebsocketTransaction(s: String): WebsocketTransaction
= {
mapper.readValue(s, classOf[WebsocketTransaction])
}
}

For this part of the project, we use the Jackson Java library to deserialize/serialize JSON objects. It is the library that is used under the hood by Spark when it reads/writes dataframe from/to JSON. Hence, it is available without adding any more dependencies.

We define a constant, mapper: ObjectMapper, which is the entry point of Jackson for serializing/deserializing classes. We configure it to write timestamps in a format that is compatible with what Spark can parse. This will be necessary later on when we read the Kafka topic using Spark. Then the function’s implementation calls readValue to deserialize the JSON into WebsocketTransaction.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset