When writing concurrent programs, have you ever wondered how its mechanism may work under the hood? I have a similar question. Here is the journey of exploring an alternative apporach to monadic operations style employeed by libraries such as ZIO Fiber, cats-effect Fiber.
Before starting, two concepts are important
Coroutine, according to Wikipedia, allows an execution to be suspended, and resumed from where it was left off. From the code snippet below, we can observe that the coroutine Gen yields values at the lines 3rd, 5th, and 7th, and the main thread notifies the coroutine by send method at the line 17th; Gen instance can then output those values to console at the lines 4th, 6th, and 8th.
class Gen extends Coroutine[String, Int] {
override def generate(): Unit = {
val received1 = `yield`(1)
println(s"Message sent from the caller: ${received1}")
val received2 = `yield`(2)
println(s"Message sent from the caller: ${received2}")
val received3 = `yield`(3)
println(s"Message sent from the caller: ${received3}")
}
}
@main def run(): Unit = {
val gen = new Gen()
while (gen.hasMoreElements()) {
val yielded = gen.nextElement()
println("Caller receives a value: ${yielded}")
gen.send(s"Caller sends str ${yielded}")
}
}Thus, it can be viewed as a generalized subroutine in favor of cooperative multitasking. A higher level workflow between coroutine(s), and the main thread can be roughly sketeched with the following image.
Beside the component coroutine, the entire system needs a way to manage the execution of coruotines submitted. A simplest setup is create an event loop that picks up a coroutine from its backlog, and execute that coroutine until the coroutine suspends, or completes. The control flow is then returned to the event loop, which picks up the next coroutine to run, repeating the same operation, until no more pending tasks. The pseudocode could be something like this:
SET all coroutines TO event loop's backlog somewhere in the system
WHILE event loop's backlog size > 0 DO
GET a coroutine from event loop's backlog
EXECUTE the coroutine
IF running == coroutine state THEN
PUT the coroutine back to the event loop's backlog
ELSE IF done == coroutine state THEN
PASS
END IF
Scala version’s code snippet with some detail omitted can be referred to as below.
First, the code fetches a task, i.e., coroutine, from its corresponded task queue at the line 5th
Second, the code executes that task at the line 6th
Third, the code checks the task’s state, and act accordingly from the line 7th to 13th - if the task is in Ready or Running state, the code places the task back to the task queue, continouing the program by fetching the next task to run; whereas if the task accomplishes its execution, the code repeats the same flow by fetching the next task to run, or the code exits when no more tasks in the task queue
def consume(taskQueue: TaskQueue[Task[_, _]]): Any = {
@tailrec
def fnWhile(fetchTask: => Task[_, _]...): Any = {
val (newTask, ...) = fetchTask
val (_, newTask1) = execute(newTask)
newTask1.state() match {
case State.Ready | State.Running =>
val (_, ...) = newTaskQueue.add(newTask1)
fnWhile(newTaskQueue1.fetch())
case State.Stopped =>
if (0 != newTaskQueue.size()) fnWhile(newTaskQueue.fetch()) else ()
}
}
fnWhile(taskQueue.fetch())
}
scheduler.taskQueues.foreach { taskQueue =>
val callable = new Callable[Any] {
@throws(classOf[RuntimeException])
override def call(): Any = consume(taskQueue)
}
executors.submit(callable)
}[!CAUTION] The code in the repository merely tests against specific versions of Scala and GraalVM specified in this section. Other versions may or may not be working as expected. Also, at the writeup, build tools such as sbt, maven, and gradle did not work, so some manual operations are necessary.
In order to achieve the goal, following two softwares are required to install before proceeding further - one is GraalVM, the other Scala.
Scala 3.3.6
GraalVM provides Continuation API, based on Truffleo framework, allowing the program to suspend, resume, and serialize its state to external storages.
After instllation, configure and export related environment variables.
export JAVA_HOME=/path/to/graalvm-espresso-dir
export SCALA_HOME=/path/to/scala-3.3.6
export PATH=$JAVA_HOME/bin:$SCALA_HOME/bin:$PATH
java --version # Check if output continas information like (build 21-espresso-24.2.0, mixed mode)
scala --version # Check if output contains information like version 3.3.6The layout of the folder structure actually is similar to that of conventional build tools employeed by sbt, maven, and so on.
concurrent4s
├── concurrent
│ ├── libs
│ │ └── *.jar
│ └── src
│ │ └── main
│ │ └── scala
│ │ └── concurrent
│ │ └── *.scala
│ └── target
│ └── classes
│ └── continuations
│ └── *.tasty and *.class
├── continuations
│ ├── libs
│ │ └── *.jar
│ └── src
│ │ └── main
│ │ └── scala
│ │ └── continuations
│ │ └── *.scala
│ └── target
│ └── classes
│ └── continuations
│ └── *.tasty and *.class
└── MakefileBefore starting, please execute following commands
# Crate concurrent directories
mkdir -p concurrent/src/{main,test}/scala/concurrent
mkdir -p concurrent/target/{classes,test-classes}
mkdir concurrent/libs
# Crate continuations directories
mkdir -p continuations/src/main/scala/continuations
mkdir -p continuations/target/classes
mkdir continuations/libsThe installation of required depdendent jar can be referred to at the Makefile install-continuations-libraries target. Otherwise, please use the curl command to install the jar file manully.
# continuations dependencies
curl -s -L -O --create-dirs --output-dir ./continuations/libs \
https://repo1.maven.org/maven2/org/graalvm/espresso/continuations/24.2.2/continuations-24.2.2.jarThe diagram below sketeches a higher overview relationship between the components this project is going to use.
Task is the model upon which all other components operate
Worker serves as a proxy sitting between Scheduler and Task Queue
Task Queue stores tasks to be executed
Scheduler’s responsibility is to schedule tasks
Runtime assembles all components together, hosting an environemt for necessary operations
Although GraalVM provdes Continuation API, the API itself does not
contain any Coroutine classes. Fortunately, this API offers
Generator
class, which provides several critical methods -
generate(), emit(),
hasMoreElements(), and nextElement(). The
first method is a place where a developer fills in their program’s
logic, inside which the code can execute emit() producing a
value to the caller if needed, and then suspend the program execution,
whilst the developer can also exploit the last two methods checking if
the Generator object emits more values in the caller
function.
Therefore, the frist thing is to create a Coroutine class. The steps include
Adding the send() method (at the lines 3th, and 4th) for storing the value sent from the caller side, which will call the Coroutine instance during runtime execution
Creating the yield() method that will emit a value (at the line 6th) if one or multiple elements to produce, and pass the value (at the line 7th) sent from the caller back to the Coroutine if any
abstract class Coroutine[S, E] extends Generator[E] {
var value: Option[S] = None
def send(value: Option[S]): Unit = this.value = value
def send(value: S): Unit = send(Option(value))
def `yield`(element: E): Option[S] = {
emit(element)
value
}
}Add the Coroutine
code to continuations/src/main/scala/continuations
directory
Compile the code with
Make compile-continuations target, or
Execute
scalac \
-d ./continuations/target/classes \
-cp ./:./continuations/libs/continuations-24.2.2.jar \
`find ./continuations/src/main/scala -name '*.scala'`Back to the Task component, in fact it is merely a wrapper of the coroutine mentioned above, plus self defined States shown as below -
Ready is the initial state during the creation of Task
Running denotes this coroutine is in execution at the moment
Stopped means the current coroutine is neither at ready state nor at running state, e.g., the coroutine accomplishes its execution
enum State {
case Ready, Stopped, Running
}The Task state transits starting from Ready, to Running, and then to Stopped.
With these states, the system can then determine whether to emit (at the lines 5th, and 11th below), or to send (at the line 9th) values during program execution.
override def execute(value: Option[S] = None): (Option[E], Task[S, E]) =
internalState match {
case State.Ready =>
if (coroutine.hasMoreElements())
(Option(coroutine.nextElement()), copy(internalState = State.Running))
else
(None, copy(internalState = State.Running))
case State.Running =>
value.foreach(coroutine.send(_))
if (coroutine.hasMoreElements())
(Option(coroutine.nextElement()), copy(internalState = State.Running))
else {
signal.foreach(_.offer(Stopped(name)))
(None, copy(internalState = State.Stopped))
}
case State.Stopped => (None, this)
}Add the Task
code to concurrent/src/main/scala/concurrent/
directory
Compile the code with
Make compile-concurrent target, or
Execute
scalac \
-d ./concurrent/target/classes \
-cp ./:./continuations/libs/continuations-24.2.2.jar:./continuations/target/classes \
`find ./concurrent/src/main/scala -name '*.scala'`When a task is delgated to the worker, it passes that task by
send() method (at the following line 5th) to its
corresponded task queue through a shared channel tx - a
LinkedBlockingQueue
class that is thread-safe,
and atomicty
for queuing methods. The relationship between Worker and
Task Queue is 1 to 1.
final case class Worker[T](
name: String,
tx: LinkedBlockingQueue[T]
) {
final def send(task: T): T = { tx.put(task); task }
... // other methods
}Add the Worker
code to concurrent/src/main/scala/concurrent/
directory
Compile the code with
Make compile-concurrent target, or
Execute
scalac \
-d ./concurrent/target/classes \
-cp ./:./continuations/libs/continuations-24.2.2.jar:./continuations/target/classes \
`find ./concurrent/src/main/scala -name '*.scala'`The purpose of task queue:
Fetch a Task from rx (at the line 3rd), a channel shared with Worker, and an internal list (at the line 4th)
1.1. Check the internal list, i.e., inner, if containing any Tasks (at the line 11th)
1.2. Try fetching with timeout from the shared queue rx (at the line 12th) otherwise
1.3. Blocking fetch from rx indfintively (at the line 14th) until a new Task is available
Add an unaccomplished Task back (at the line 19th) for later execution
final case class TaskQueue[T](
name: String,
rx: LinkedBlockingQueue[T],
inner: Seq[T] = Seq.empty[T]
) {
final def fetch(
timeout: Duration = 0.seconds,
blockingFetch: => T = rx.take()
): (T, TaskQueue[T]) = {
val task =
inner.headOption.getOrElse {
tryFetch(timeout) match { // tryFetch executes rx.poll(0, TimeUnit.SECONDS) in default
case Some(taskInQueue) => taskInQueue
case None => blockingFetch
}
}
(task, copy(inner = inner.drop(1)))
}
final def add(task: T): (T, TaskQueue[T]) =
(task, copy(inner = task +: inner))
... // other methods
}Add the Task
Queue code to
concurrent/src/main/scala/concurrent/ directory
Compile the code with
Make compile-concurrent target, or
Execute
scalac \
-d ./concurrent/target/classes \
-cp ./:./continuations/libs/continuations-24.2.2.jar:./continuations/target/classes \
`find ./concurrent/src/main/scala -name '*.scala'`This is pretty self explanatory, this component manages how a task to be ran. Specifically, the scheduling makes use of Least-Loaded (LL) strategy.
This project exploits least loaded scheduling strategy, which schedules a task to a least loaded worker. The primary reason comes from that the scheduling strategy employed by, e.g., Rust’s Tokio work-stealing scheduler is very complicated[1]. Least-Loaded scheduling strategy is simple yet effective, and can address the issue of starvation[2].
For LL strategy to work, two functions are required:
Whilst searching the next batch’s range, first keep the value of current round (at the line 5th), and find the length of worker list (at the line 6th).
Second, find the next multiple of value (from the line 7th to 8th).
The line 7th by adding batch size - 1 to worker
length ensures the obtained number multiple is at
least as large as the next multiple, and smaller than the one after
that. Then, using that number, i.e., mutliple, subtracts
the modulo value for acquiring the desired next multiple of number. For
instance, with the setting of 22 workers, and batch size 8, the
multiple value is 29, which is larger than the next
multiple value 24, but is smaller than its next multiple of value after
24, which is 32.
Third, calculate the next batch’s range. The line 9th makes sure the
next value will rotate when exceeding the expected next multiple of
number. And the line 10th picks up the minimum value between
workers length, and next + batch size,
setting the end of range value to the worker length when the
next + batch size exceeds the value of worker
length. Again with the setting of 22 workers, and batch size 8,
when the next + batch size reaches 24, the logic picks up
the worker length 22 instead.
Therefore, configuring 22 workers with 8 as its batch size, the range of next batch values in sequence should be (0, 8), (8, 16), (16, 22), and then start over from (0, 8) again.
final case class LeastLoadedScheduler[T](
currentRound: Int = 0
) ... {
final def nextBatch(): (Range, LeastLoadedScheduler[T]) = {
val prev = currentRound
val workerLength = workers.length
val multiple = workerLength + (batchSize - 1)
val nextMultipleOf = multiple - (multiple % batchSize)
val next = (prev * batchSize) % nextMultipleOf
val end = Math.min(next + batchSize, workerLength)
(Range(next, end), copy(currentRound = currentRound + 1))
}
}In order to find the least loaded worker, the logic first checks if the worker length is smaller than the batch size - if true, the entire wokrer list is returned (at the line 3rd); otherwise, the next batch range is calculated (at the line 5th), and then the logic picks up the worker list based on the range given.
Second, choose the worker having the minimum queue size (at the line 8th), which is the size of LinkedBlockingQueue.
override def leastLoaded(): (Worker[T], LeastLoadedScheduler[T]) = {
val (tmpWorkers, ...) =
if (workers.length <= batchSize) (workers, this)
else {
val (range, ...) = nextBatch()
(workers.slice(range.start, range.end), ...)
}
(tmpWorkers.minBy(_.size()), ...)
}Add the Scheduler
code to concurrent/src/main/scala/concurrent/
directory
Compile the code with
Make compile-concurrent target, or
Execute
scalac \
-d ./concurrent/target/classes \
-cp ./:./continuations/libs/continuations-24.2.2.jar:./continuations/target/classes \
`find ./concurrent/src/main/scala -name '*.scala'`Finally, it is the time to orchestrate the entire flow. The Runtime object in fact performs following functions
At the beginning, it creates a scheduler (at the line 5th), and a queue (at the line 6th) shared with all tasks for signalling when a task accomplishes its execution
final def apply(): Runtime = {
val nrOfThreads = java.lang.Runtime.getRuntime().availableProcessors()
apply(
nrOfThreads,
LeastLoadedScheduler[Task[_, _]](nrOfThreads, 8),
new LinkedBlockingQueue[Stopped]()
)
}Runtime instance then launches a list of threads (from the lines 17th to 23th), equipped with a task queue (at the line 20th) that
2.1. Fetches a task (at the line 5th)
2.2. Executes the Tasks (at the line 6th), and
2.3. Determines if continuing this operation (at the line 12th)
final def start(execute: Task[_, _] => (Any, Task[_, _])): Unit = {
def consume(taskQueue: TaskQueue[Task[_, _]]): Any = {
@tailrec
def fnWhile(fetchTask: => (Task[_, _], ...)): Any = {
val (newTask, ...) = fetchTask
val (..., newTask1) = execute(newTask)
newTask1.state() match {
case State.Ready | State.Running =>
val (..., newTaskQueue1) = newTaskQueue.add(newTask1)
fnWhile(newTaskQueue1.fetch())
case State.Stopped =>
if (!newTaskQueue.isEmpty()) fnWhile(newTaskQueue.fetch()) else ()
}
}
fnWhile(taskQueue.fetch())
}
scheduler.taskQueues.foreach { taskQueue =>
val callable = new Callable[Any] {
@throws(classOf[RuntimeException])
override def call(): Any = consume(taskQueue)
}
executors.submit(callable)
}
}In the end, the Runtime
3.1. Passes tasks (at the line 11th) to the Scheduler instance
3.2. Waits for Tasks Stopped signal (at the line 4th), and
3.3. Terminates the entire flow when all scheduled tasks are stopped (at the line 6th)
final def spawn(runtime: Runtime, tasks: Task[_, _]*): Unit = {
@tailrec
def fnWhile(rt: Runtime): Unit = {
val stopped = rt.signal.take()
val newRt = rt.copy(totalScheduledTasks = rt.totalScheduledTasks - 1)
if (0 != newRt.totalScheduledTasks) fnWhile(newRt)
}
val newRuntime = tasks.toSeq
.foldLeft(runtime) { (rt, task) =>
...
rt.schedule(newTask)
}
fnWhile(newRuntime)
}Add the Runtime
code to concurrent/src/main/scala/concurrent/
directory
Compile the code with
Make compile-concurrent target, or
Execute
scalac \
-d ./concurrent/target/classes \
-cp ./:./continuations/libs/continuations-24.2.2.jar:./continuations/target/classes \
`find ./concurrent/src/main/scala -name '*.scala'`Now it is time to see the program in action.
First, create several coroutines, and wrap those code by Task object.
val co1 = new Coroutine[String, String] {
override def generate(): Unit = {
`yield`("x")
`yield`("y")
`yield`("z")
}
}
val co2 = new Coroutine[String, Int] {
override def generate(): Unit = {
`yield`(1)
`yield`(2)
`yield`(3)
}
}
val co3 = new Coroutine[String, Int] {
override def generate(): Unit = {
`yield`(99)
`yield`(98)
`yield`(97)
}
}
val co4 = new Coroutine[String, Char] {
override def generate(): Unit = {
`yield`('a')
`yield`('b')
`yield`('c')
}
}
val t1 = Task(co1)
val t2 = Task(co2)
val t3 = Task(co3)
val t4 = Task(co4)Second, configure Scheduler, and Runtime objects with necessary options, such as workers, batch size, and so on.
val nrOfThreads = 3
val batchSize = 2
val scheduler = LeastLoadedScheduler[Task[_, _]](nrOfThreads, batchSize)
val signal = new LinkedBlockingQueue[Stopped]()
val runtime = Runtime(nrOfThreads, scheduler, signal)Third, verify the result based on a task’ state.
runtime.start { task =>
val (result, newTask) = task.execute()
task.state() match {
case Task.State.Ready =>
// assert statement ...
case Task.State.Running =>
// assert statement ...
case Task.State.Stopped =>
}
(result, newTask)
}Fourth, spawn and execute the tasks.
Runtime.spawn(runtime, t1, t2, t3, t4)Before running our program, following dependencies are required to install beforehand.
curl -s -L -O --create-dirs --output-dir ./concurrent/libs \
https://repo1.maven.org/maven2/org/scala-lang/modules/scala-xml_2.13/1.2.0/scala-xml_2.13-1.2.0.jar
curl -s -L -O --create-dirs --output-dir ./concurrent/libs \
https://repo1.maven.org/maven2/org/scala-lang/scala3-library_sjs1_3/3.7.2/scala3-library_sjs1_3-3.7.2.jar
curl -s -L -O --create-dirs --output-dir ./concurrent/libs \
https://oss.sonatype.org/content/groups/public/org/scalatest/scalatest-app_3/3.2.17/scalatest-app_3-3.2.17.jar
curl -s -L -O --create-dirs --output-dir ./concurrent/libs \
https://repo1.maven.org/maven2/org/scala-lang/scala-library/2.13.16/scala-library-2.13.16.jarAdd the RuntimeSpec
code to concurrent/src/test/scala/concurrent/
directory
Compile the code with
Make compile-test-concurrent target, or
Execute
scalac \
-d ./concurrent/target/test-classes \
-cp ./:./continuations/libs/continuations-24.2.2.jar:./continuations/target/classes:./concurrent/libs/scalatest-app_3-3.2.17.jar:./concurrent/target/classes \
`find ./concurrent/src/test/scala -name '*.scala'`Test the code with
Make run-test-concurrent target, or
Execute
java \
--experimental-options \
--java.Continuum=true \
-cp ./:./continuations/libs/continuations-24.2.2.jar:./continuations/target/classes:./concurrent/libs/scala3-library_sjs1_3-3.7.2.jar:./concurrent/libs/scala-library-2.13.16.jar:./concurrent/libs/scalatest-app_3-3.2.17.jar:./concurrent/libs/scala-xml_2.13-1.2.0.jar:./concurrent/target/classes:./concurrent/target/test-classes \
org.scalatest.tools.Runner -R concurrent/target/test-classes -o -w concurrentWe revisited some concurrency primitives that are necessary for complex software, and we learned about the internal mechanism by writing our own. Hopefully, understanding this machinery under the hood should both help you internalize ideas that can improve the software performance, but also assist you in visualizing the structure and relationship between the necessary components. With this mindset, you may solve problems from different angles, and may have a better idea when making a trade-off.
[1]. Announcing Nio