Concurrent Coordination
We can get quite far with parMapN and friends, but complex concurrent programs require coordination between different parts that extends beyond returning values. The Cats Effects standard library provides useful tools for communication between concurrent processes. There is also Ref and Deferred, which are part of the kernel and the most basic tools on which many others are built.
Creating Concurrent Tools
We'll use Ref as an example of a concurrent tool. All the others work in the same way.
The simplest way to create a Ref is to use IO.ref.
val ref: IO[Ref[IO, Int]] = IO.ref(1)
The type looks a bit complicated. Unpacking it we have:
- an
IO[Stuff], meaning anIOthat producesStuffwhen run; and StuffisRef[IO, Int], meaning aRefthat stores anIntand works withIO.
You'll have to get used to these kind of types when using Cats Effect.
We can also construct a Ref by calling the apply method on the companion object. In this case we have to specify the effect type (which is always IO, for us) to help out type inference.
val ref2 = Ref[IO].of(1)
We could also write out the full type, as below, but this quickly gets tedious.
val ref3: IO[Ref[IO, Int]] = Ref.of(1)
Exercise: Putting Tools to Use
Complete the challenge in code/src/main/scala/parallelism/02-tools.scala, which gets you to use some of the tools provided by Cats Effect.
-
This exercise is focusing on the difference between description and action. The code in
firstuses a description twice, so it gets two differentRefs. The code inseconduses the sameReftwice, which is usually what you want. -
The following code will do the job.
def generate(ref: Ref[IO, Int]) = smallRandomSleep
.map(_ => random.nextInt(10))
.flatMap(v => ref.getAndUpdate(a => a + v))
.replicateA_(100)
def collector(ref: Ref[IO, Int]) =
IO.sleep(1.second)
.flatMap(_ => ref.get)
.flatMap(v => IO.println(s"Value is $v"))
val run =
ref.flatMap { r =>
(
generate(r),
generate(r),
generate(r),
generate(r),
generate(r),
collector(r)
).parTupled.void
}