Implementing processOneBatch

Here is the implementation of  BatchProducer.processOneBatch. As is often the case, the implementation is much shorter than the test:

val WaitTime: FiniteDuration = 59.minute
val ApiLag: FiniteDuration = 5.seconds

def processOneBatch(fetchNextTransactions: IO[Dataset[Transaction]],
transactions: Dataset[Transaction],
saveStart: Instant,
saveEnd: Instant)(implicit appCtx: AppContext)
: IO[(Dataset[Transaction], Instant, Instant)] = {
import appCtx._
val transactionsToSave = filterTxs(transactions, saveStart, saveEnd)
for {
_ <- BatchProducer.save(transactionsToSave,
appCtx.transactionStorePath)
_ <- IO.sleep(WaitTime)
beforeRead <- currentInstant
end = beforeRead.minusSeconds(ApiLag.toSeconds)
nextTransactions <- fetchNextTransactions
} yield (nextTransactions, saveEnd, end)
}

We first filter the transactions using a filterTxs function that we will define shortly. Then, using a for comprehension, we chain several IO values:

  1. Save the filtered transactions, using the save function that we implemented earlier
  2. Wait 59 minutes, using the implicit Timer that was brought in scope with import appCtx._
  3. Get the current time, using a currentInstant function that we will define shortly
  4. Fetch the next transactions using the first argument

Here is the implementation of the helper function, filterTxs:

def filterTxs(transactions: Dataset[Transaction], 
fromInstant: Instant, untilInstant: Instant): Dataset[Transaction] = {
import transactions.sparkSession.implicits._
transactions.filter(
($"timestamp" >=
lit(fromInstant.getEpochSecond).cast(TimestampType)) &&
($"timestamp" <
lit(untilInstant.getEpochSecond).cast(TimestampType)))
}

We did not need to pass an implicit SparkSession, as it is already available in the transaction Dataset. We only keep transactions for the interval (fromInstant, untilInstant). The end instant is excluded so that we do not have any overlap when we loop over processOneBatch.

Here is the definition of currentInstant:

def currentInstant(implicit timer: Timer[IO]): IO[Instant] =
timer.clockRealTime(TimeUnit.SECONDS) map Instant.ofEpochSecond

We use the Timer class to get the current time. As we saw while writing the integration test, this allowed us to use a fake timer to simulate a clock.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset