Introduction

When writing concurrent programs, have you ever wondered how its mechanism may work under the hood? I have a similar question. Here is the journey of exploring an alternative apporach to monadic operations style employeed by libraries such as ZIO Fiber, cats-effect Fiber.

Concepts

Before starting, two concepts are important

  1. Coroutine

  2. Event Loop

Coroutine

Coroutine, according to Wikipedia, allows an execution to be suspended, and resumed from where it was left off. From the code snippet below, we can observe that the coroutine Gen yields values at the lines 3rd, 5th, and 7th, and the main thread notifies the coroutine by send method at the line 17th; Gen instance can then output those values to console at the lines 4th, 6th, and 8th.

class Gen extends Coroutine[String, Int] {
    override def generate(): Unit = {
        val received1 = `yield`(1)
        println(s"Message sent from the caller: ${received1}")
        val received2 = `yield`(2)
        println(s"Message sent from the caller: ${received2}")
        val received3 = `yield`(3)
        println(s"Message sent from the caller: ${received3}")
    }
}

@main def run(): Unit =  {
    val gen = new Gen()
    while (gen.hasMoreElements()) {
        val yielded = gen.nextElement()
        println("Caller receives a value: ${yielded}")
        gen.send(s"Caller sends str ${yielded}")
    }
}

Thus, it can be viewed as a generalized subroutine in favor of cooperative multitasking. A higher level workflow between coroutine(s), and the main thread can be roughly sketeched with the following image.

Coroutine cooperates with the main thread

Event loop

Beside the component coroutine, the entire system needs a way to manage the execution of coruotines submitted. A simplest setup is create an event loop that picks up a coroutine from its backlog, and execute that coroutine until the coroutine suspends, or completes. The control flow is then returned to the event loop, which picks up the next coroutine to run, repeating the same operation, until no more pending tasks. The pseudocode could be something like this:

SET all coroutines TO event loop's backlog somewhere in the system
WHILE event loop's backlog size > 0 DO
  GET a coroutine from event loop's backlog
  EXECUTE the coroutine
  IF running == coroutine state THEN
    PUT the coroutine back to the event loop's backlog
  ELSE IF done == coroutine state THEN
    PASS
  END IF

Scala version’s code snippet with some detail omitted can be referred to as below.

def consume(taskQueue: TaskQueue[Task[_, _]]): Any = {
    @tailrec
    def fnWhile(fetchTask: => Task[_, _]...): Any = {

        val (newTask, ...) = fetchTask
        val (_, newTask1) = execute(newTask)
        newTask1.state() match {
            case State.Ready | State.Running =>
                val (_, ...) = newTaskQueue.add(newTask1)
                fnWhile(newTaskQueue1.fetch())
            case State.Stopped =>
                if (0 != newTaskQueue.size()) fnWhile(newTaskQueue.fetch()) else ()
        }
    }
    fnWhile(taskQueue.fetch())
}
scheduler.taskQueues.foreach { taskQueue =>
    val callable = new Callable[Any] {
      @throws(classOf[RuntimeException])
      override def call(): Any = consume(taskQueue)
    }
    executors.submit(callable)
}

Prerequisite

[!CAUTION] The code in the repository merely tests against specific versions of Scala and GraalVM specified in this section. Other versions may or may not be working as expected. Also, at the writeup, build tools such as sbt, maven, and gradle did not work, so some manual operations are necessary.

In order to achieve the goal, following two softwares are required to install before proceeding further - one is GraalVM, the other Scala.

GraalVM provides Continuation API, based on Truffleo framework, allowing the program to suspend, resume, and serialize its state to external storages.

After instllation, configure and export related environment variables.

export JAVA_HOME=/path/to/graalvm-espresso-dir
export SCALA_HOME=/path/to/scala-3.3.6
export PATH=$JAVA_HOME/bin:$SCALA_HOME/bin:$PATH
java --version  # Check if output continas information like (build 21-espresso-24.2.0, mixed mode)
scala --version # Check if output contains information like version 3.3.6

Structure Layout

The layout of the folder structure actually is similar to that of conventional build tools employeed by sbt, maven, and so on.

concurrent4s
├── concurrent
│   ├── libs
│   │   └── *.jar
│   └── src
│   │   └── main
│   │       └── scala
│   │           └── concurrent
│   │               └── *.scala
│   └── target
│       └── classes
│           └── continuations
│               └── *.tasty and *.class
├── continuations
│   ├── libs
│   │   └── *.jar
│   └── src
│   │   └── main
│   │       └── scala
│   │           └── continuations
│   │               └── *.scala
│   └── target
│       └── classes
│           └── continuations
│               └── *.tasty and *.class
└── Makefile

Before starting, please execute following commands

# Crate concurrent directories
mkdir -p concurrent/src/{main,test}/scala/concurrent
mkdir -p concurrent/target/{classes,test-classes}
mkdir concurrent/libs

# Crate continuations directories
mkdir -p continuations/src/main/scala/continuations
mkdir -p continuations/target/classes
mkdir continuations/libs

Dependencies

The installation of required depdendent jar can be referred to at the Makefile install-continuations-libraries target. Otherwise, please use the curl command to install the jar file manully.

# continuations dependencies
curl -s -L -O --create-dirs --output-dir ./continuations/libs \
    https://repo1.maven.org/maven2/org/graalvm/espresso/continuations/24.2.2/continuations-24.2.2.jar

Higher Overview

The diagram below sketeches a higher overview relationship between the components this project is going to use.

Class Diagram

Components

Task

Coroutine

Although GraalVM provdes Continuation API, the API itself does not contain any Coroutine classes. Fortunately, this API offers Generator class, which provides several critical methods - generate(), emit(), hasMoreElements(), and nextElement(). The first method is a place where a developer fills in their program’s logic, inside which the code can execute emit() producing a value to the caller if needed, and then suspend the program execution, whilst the developer can also exploit the last two methods checking if the Generator object emits more values in the caller function.

Therefore, the frist thing is to create a Coroutine class. The steps include

  1. Adding the send() method (at the lines 3th, and 4th) for storing the value sent from the caller side, which will call the Coroutine instance during runtime execution

  2. Creating the yield() method that will emit a value (at the line 6th) if one or multiple elements to produce, and pass the value (at the line 7th) sent from the caller back to the Coroutine if any

abstract class Coroutine[S, E] extends Generator[E] {
    var value: Option[S] = None
    def send(value: Option[S]): Unit = this.value = value
    def send(value: S): Unit = send(Option(value))
    def `yield`(element: E): Option[S] = {
        emit(element)
        value
    }
}

Compile Coroutine

Back to the Task component, in fact it is merely a wrapper of the coroutine mentioned above, plus self defined States shown as below -

enum State {
    case Ready, Stopped, Running
}

The Task state transits starting from Ready, to Running, and then to Stopped.

With these states, the system can then determine whether to emit (at the lines 5th, and 11th below), or to send (at the line 9th) values during program execution.

override def execute(value: Option[S] = None): (Option[E], Task[S, E]) =
    internalState match {
        case State.Ready =>
            if (coroutine.hasMoreElements())
                (Option(coroutine.nextElement()), copy(internalState = State.Running))
            else
                (None, copy(internalState = State.Running))
        case State.Running =>
            value.foreach(coroutine.send(_))
            if (coroutine.hasMoreElements())
                (Option(coroutine.nextElement()), copy(internalState = State.Running))
            else {
                signal.foreach(_.offer(Stopped(name)))
                (None, copy(internalState = State.Stopped))
            }
        case State.Stopped => (None, this)
    }

Compile Task

Worker

When a task is delgated to the worker, it passes that task by send() method (at the following line 5th) to its corresponded task queue through a shared channel tx - a LinkedBlockingQueue class that is thread-safe, and atomicty for queuing methods. The relationship between Worker and Task Queue is 1 to 1.

final case class Worker[T](
  name: String,
  tx: LinkedBlockingQueue[T]
) {
  final def send(task: T): T = { tx.put(task); task }
  ... // other methods
}

Compile Worker

Task Queue

The purpose of task queue:

  1. Fetch a Task from rx (at the line 3rd), a channel shared with Worker, and an internal list (at the line 4th)

    1.1. Check the internal list, i.e., inner, if containing any Tasks (at the line 11th)

    1.2. Try fetching with timeout from the shared queue rx (at the line 12th) otherwise

    1.3. Blocking fetch from rx indfintively (at the line 14th) until a new Task is available

  2. Add an unaccomplished Task back (at the line 19th) for later execution

final case class TaskQueue[T](
    name: String,
    rx: LinkedBlockingQueue[T],
    inner: Seq[T] = Seq.empty[T]
) {
    final def fetch(
        timeout: Duration = 0.seconds,
        blockingFetch: => T = rx.take()
    ): (T, TaskQueue[T]) = {
        val task =
            inner.headOption.getOrElse {
                tryFetch(timeout) match { // tryFetch executes rx.poll(0, TimeUnit.SECONDS) in default
                    case Some(taskInQueue) => taskInQueue
                    case None              => blockingFetch
                }
            }
       (task, copy(inner = inner.drop(1)))
    }
    final def add(task: T): (T, TaskQueue[T]) =
        (task, copy(inner = task +: inner))
    ... // other methods
}

Compile Task Queue

Scheduler

This is pretty self explanatory, this component manages how a task to be ran. Specifically, the scheduling makes use of Least-Loaded (LL) strategy.

Least-Loaded (LL) Scheduler

This project exploits least loaded scheduling strategy, which schedules a task to a least loaded worker. The primary reason comes from that the scheduling strategy employed by, e.g., Rust’s Tokio work-stealing scheduler is very complicated[1]. Least-Loaded scheduling strategy is simple yet effective, and can address the issue of starvation[2].

For LL strategy to work, two functions are required:

  1. Calculate the range of next batch

Whilst searching the next batch’s range, first keep the value of current round (at the line 5th), and find the length of worker list (at the line 6th).

Second, find the next multiple of value (from the line 7th to 8th). The line 7th by adding batch size - 1 to worker length ensures the obtained number multiple is at least as large as the next multiple, and smaller than the one after that. Then, using that number, i.e., mutliple, subtracts the modulo value for acquiring the desired next multiple of number. For instance, with the setting of 22 workers, and batch size 8, the multiple value is 29, which is larger than the next multiple value 24, but is smaller than its next multiple of value after 24, which is 32.

Third, calculate the next batch’s range. The line 9th makes sure the next value will rotate when exceeding the expected next multiple of number. And the line 10th picks up the minimum value between workers length, and next + batch size, setting the end of range value to the worker length when the next + batch size exceeds the value of worker length. Again with the setting of 22 workers, and batch size 8, when the next + batch size reaches 24, the logic picks up the worker length 22 instead.

Therefore, configuring 22 workers with 8 as its batch size, the range of next batch values in sequence should be (0, 8), (8, 16), (16, 22), and then start over from (0, 8) again.

final case class LeastLoadedScheduler[T](
   currentRound: Int = 0
) ... {
    final def nextBatch(): (Range, LeastLoadedScheduler[T]) = {
        val prev = currentRound
        val workerLength = workers.length
        val multiple = workerLength + (batchSize - 1)
        val nextMultipleOf = multiple - (multiple % batchSize)
        val next = (prev * batchSize) % nextMultipleOf
        val end = Math.min(next + batchSize, workerLength)
        (Range(next, end), copy(currentRound = currentRound + 1))
    }
}
  1. Pick up the lightest loading worker

In order to find the least loaded worker, the logic first checks if the worker length is smaller than the batch size - if true, the entire wokrer list is returned (at the line 3rd); otherwise, the next batch range is calculated (at the line 5th), and then the logic picks up the worker list based on the range given.

Second, choose the worker having the minimum queue size (at the line 8th), which is the size of LinkedBlockingQueue.

override def leastLoaded(): (Worker[T], LeastLoadedScheduler[T]) = {
    val (tmpWorkers, ...) =
        if (workers.length <= batchSize) (workers, this)
        else {
            val (range, ...) = nextBatch()
            (workers.slice(range.start, range.end), ...)
        }
    (tmpWorkers.minBy(_.size()), ...)
}

Compile Scheduler

Runtime

Finally, it is the time to orchestrate the entire flow. The Runtime object in fact performs following functions

  1. At the beginning, it creates a scheduler (at the line 5th), and a queue (at the line 6th) shared with all tasks for signalling when a task accomplishes its execution

    final def apply(): Runtime = {
        val nrOfThreads = java.lang.Runtime.getRuntime().availableProcessors()
        apply(
            nrOfThreads,
            LeastLoadedScheduler[Task[_, _]](nrOfThreads, 8),
            new LinkedBlockingQueue[Stopped]()
       )
    }
  2. Runtime instance then launches a list of threads (from the lines 17th to 23th), equipped with a task queue (at the line 20th) that

    2.1. Fetches a task (at the line 5th)

    2.2. Executes the Tasks (at the line 6th), and

    2.3. Determines if continuing this operation (at the line 12th)

    final def start(execute: Task[_, _] => (Any, Task[_, _])): Unit = {
        def consume(taskQueue: TaskQueue[Task[_, _]]): Any = {
            @tailrec
            def fnWhile(fetchTask: => (Task[_, _], ...)): Any = {
                val (newTask, ...) = fetchTask
                val (..., newTask1) = execute(newTask)
                newTask1.state() match {
                    case State.Ready | State.Running =>
                        val (..., newTaskQueue1) = newTaskQueue.add(newTask1)
                        fnWhile(newTaskQueue1.fetch())
                    case State.Stopped =>
                        if (!newTaskQueue.isEmpty()) fnWhile(newTaskQueue.fetch()) else ()
                }
            }
            fnWhile(taskQueue.fetch())
        }
        scheduler.taskQueues.foreach { taskQueue =>
            val callable = new Callable[Any] {
                @throws(classOf[RuntimeException])
                override def call(): Any = consume(taskQueue)
            }
            executors.submit(callable)
        }
    }
  3. In the end, the Runtime

    3.1. Passes tasks (at the line 11th) to the Scheduler instance

    3.2. Waits for Tasks Stopped signal (at the line 4th), and

    3.3. Terminates the entire flow when all scheduled tasks are stopped (at the line 6th)

    final def spawn(runtime: Runtime, tasks: Task[_, _]*): Unit = {
        @tailrec
        def fnWhile(rt: Runtime): Unit = {
            val stopped = rt.signal.take()
            val newRt = rt.copy(totalScheduledTasks = rt.totalScheduledTasks - 1)
            if (0 != newRt.totalScheduledTasks) fnWhile(newRt)
        }
        val newRuntime = tasks.toSeq
            .foldLeft(runtime) { (rt, task) =>
               ...
               rt.schedule(newTask)
            }
        fnWhile(newRuntime)
    }

Compile Runtime

An Example

Now it is time to see the program in action.

First, create several coroutines, and wrap those code by Task object.

val co1 = new Coroutine[String, String] {
    override def generate(): Unit = {
        `yield`("x")
        `yield`("y")
        `yield`("z")
    }
}

val co2 = new Coroutine[String, Int] {
    override def generate(): Unit = {
        `yield`(1)
        `yield`(2)
        `yield`(3)
    }
}

val co3 = new Coroutine[String, Int] {
    override def generate(): Unit = {
        `yield`(99)
        `yield`(98)
        `yield`(97)
    }
}

val co4 = new Coroutine[String, Char] {
    override def generate(): Unit = {
        `yield`('a')
        `yield`('b')
        `yield`('c')
    }
}

val t1 = Task(co1)
val t2 = Task(co2)
val t3 = Task(co3)
val t4 = Task(co4)

Second, configure Scheduler, and Runtime objects with necessary options, such as workers, batch size, and so on.

val nrOfThreads = 3
val batchSize = 2
val scheduler = LeastLoadedScheduler[Task[_, _]](nrOfThreads, batchSize)
val signal = new LinkedBlockingQueue[Stopped]()
val runtime = Runtime(nrOfThreads, scheduler, signal)

Third, verify the result based on a task’ state.

runtime.start { task =>
    val (result, newTask) = task.execute()
    task.state() match {
        case Task.State.Ready =>
            // assert statement ...
        case Task.State.Running =>
            // assert statement ...
        case Task.State.Stopped =>
    }
    (result, newTask)
}

Fourth, spawn and execute the tasks.

Runtime.spawn(runtime, t1, t2, t3, t4)

Testing Dependencies

Before running our program, following dependencies are required to install beforehand.

curl -s -L -O --create-dirs --output-dir ./concurrent/libs \
    https://repo1.maven.org/maven2/org/scala-lang/modules/scala-xml_2.13/1.2.0/scala-xml_2.13-1.2.0.jar
curl -s -L -O --create-dirs --output-dir ./concurrent/libs \
    https://repo1.maven.org/maven2/org/scala-lang/scala3-library_sjs1_3/3.7.2/scala3-library_sjs1_3-3.7.2.jar
curl -s -L -O --create-dirs --output-dir ./concurrent/libs \
    https://oss.sonatype.org/content/groups/public/org/scalatest/scalatest-app_3/3.2.17/scalatest-app_3-3.2.17.jar
curl -s -L -O --create-dirs --output-dir ./concurrent/libs \
    https://repo1.maven.org/maven2/org/scala-lang/scala-library/2.13.16/scala-library-2.13.16.jar

Test Our Program

Conclusion

We revisited some concurrency primitives that are necessary for complex software, and we learned about the internal mechanism by writing our own. Hopefully, understanding this machinery under the hood should both help you internalize ideas that can improve the software performance, but also assist you in visualizing the structure and relationship between the necessary components. With this mindset, you may solve problems from different angles, and may have a better idea when making a trade-off.

References

[1]. Announcing Nio

[2]. Least-Loaded (LL) Scheduler