Job Manager

Our final challenge for this section returns to the Job Manager. In this case we're going to the concurrent tools we've just learned about to visualize the progress of running a job. Below you'll see an example of the output you should end up with.

There are many different ways you could solve this problem. Part of the challenge is coming up with a way to solve it, using the available tools. Here's an example implementation that uses just a Semaphore.

// Evaluate a Stage, creating an IO that runs the Stage correctly.
def evalStage(
    stage: Stage,
    results: List[Result],
    canvas: Canvas
): IO[(Int, Result)] =
  stage match {
    case Stage.Sequential(work) =>
      draw(results :+ Result.StageInProgress(0, 1), canvas)
        .flatMap(_ => work)
        .map(i => (i, Result.StageComplete(1)))
    case Stage.Parallel(repeats, work) =>
      Semaphore[IO](0).flatMap { s =>
        def monitor(count: Int): IO[Unit] =
          if count == repeats then
            draw(results :+ Result.StageInProgress(count, repeats), canvas)
          else
            s.acquire *>
              draw(
                results :+ Result.StageInProgress(count, repeats),
                canvas
              ) *> monitor(count + 1)

        val parallelWork =
          List
            .fill(repeats)(work.flatMap(r => s.release *> IO.pure(r)))
            .parSequence

        draw(results :+ Result.StageInProgress(0, repeats), canvas) *>
          (parallelWork, monitor(0)).parMapN((results, _) =>
            (results.sum, Result.StageComplete(repeats))
          )
      }
  }

// Evaluate the Job, creating an IO that runs all the stages in the correct
// order and returns the sum of the values computed by each stage.
def eval(job: Job, canvas: Canvas): IO[Int] =
  job.stages
    .foldLeftM[IO, (Int, List[Result])]((0, List.empty[Result]))(
      (accum, elt) =>
        val (total, results) = accum
        evalStage(elt, results, canvas)
          .map { case (i, r) => (total + i, results :+ r) }
          .flatTap { case (_, results) => draw(results, canvas) }
    )
    .flatMap { case (sum, results) =>
      draw(results :+ Result.Complete(sum), canvas).as(sum)
    }

val run =
  Frame.default
    .withSize(800, 200)
    .canvas()
    .flatMap(c =>
      (randomJob.flatMap(job => eval(job, c)) *> IO.sleep(5.seconds))
        .replicateA_(5)
    )