Using the IO Monad

We mentioned earlier that our function, unsafeSave, has a side effect, which is to write to a file. But as functional programmers, we try to only write pure functions that have no side effects. However, at the end of the program, you still want this side effect to happen; otherwise, there would be no point in running it!

A common way of solving this dilemma is to use a parametrized type that encapsulates the side effect to run it asynchronously. A good candidate for that is the cats.effect.IO class in the cats.effect library (see https://typelevel.org/cats-effect/datatypes/io.html).

Here is an example that you can try in a Scala Console:

import cats.effect.IO

val io = IO{ println("Side effect!"); 1 }
// io: cats.effect.IO[Int] = …
io.unsafeRunSync()
// Side effect!
// res1: Int = 1

We can observe that nothing happened when we declared the io variable. At this point, the block passed to the IO constructor is only registered and will be executed later. The actual execution only happens when we call unsafeRunSync(). Our io variable is a pure, immutable value, and hence preserves referential transparency.

IO is Monad, and as such we can use map, flatMap and for comprehensions to compose side effects:

val program = for {
a <- io
b <- io
} yield a+b
// program: cats.effect.IO[Int]
program.unsafeRunSync()
// IO is run!
// IO is run!
// res2: Int = 2

We can reuse the io variable many times; the side effect that it encapsulates will be run as many times as necessary at the end of the world when we call unsafeRunSync().

If we had used scala.concurrent.Future instead of cats.effect.IO, the side effect would have been only run once. This is because Future memorizes the result. The behavior of Future may be desirable in some cases, but in some other cases, you really want your effects to be performed as many times as you define them in your code. The approach of IO also avoids shared state and memory leaks.

IO values can also be run in parallel. They can effectively replace scala.concurrent.Future:

import cats.effect.IO
import cats.implicits._
import scala.concurrent.ExecutionContext.Implicits.global

val io = IO{ Thread.sleep(100); Thread.currentThread().getName }
val program = (io, io, io).parMapN((a, b, c) => s"$a $b $c")
program.unsafeRunSync()
// res2: String =
// ForkJoinPool-1-worker-5
// ForkJoinPool-1-worker-3
// ForkJoinPool-1-worker-1

The IO block returns the current thread's name as a string. We create a program of the IO[String] type using parMapN to indicate that we want to execute the IO values in the tuple in parallel. The output of unsafeRunSync shows that the program was executed in three different threads.

Going back to our transaction saving, all we have to do to make our unsafeSave function safe is to wrap it in IO:

  def save(transactions: Dataset[Transaction], path: URI): IO[Unit] =
IO(unsafeSave(transactions, path))

Alternatively, you can inline unsafeSave and change the integration test to call save instead:

      BatchProducer.save(sourceDS, uri).unsafeRunSync()

We can now save transactions while controlling side effects and keeping our functions pure.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset