This merely serves as a note taking. There may contain misconceptions or inaccurate explanation.
Async can be viewed as a combination of an event loop plus coroutines in a very simplified setup.
This project requires to run with GraalVM Espresso 24.0 with Scala 3.3.6. Other version may not be working.
In general, Coroutine[1] allows an execution to suspend and resume from where it was left off. Let’s take an example with the following code:
Create an coroutine programme provided with logic that yields values and that may receive from the caller at the line 3, 5, and 7
1.1. the line 3 produces a value 1, and then returns the control to the main function
1.2. the line 5 produces a value 2, and then returns the control to the main function
1.3. the line 7 produces a value 3, and the nreturns the control to the main function
Intruct the main programme how to generate values from the Coroutine programme, i.e., Gen, and provide values to the Gen programme to consume - from the line 14 to 18
2.1. the line 13 instantiates the Gen object
2.2. while loop checks the condition whether Gen instance has more elements that it can produce
2.3. Gen instance execute nextElement() function
producing the values, stored in yielded variable, based on
the logic at line 3, 5, and 7
2.4. main function pass values to Gen instance by its send method
1: class Gen extends Coroutine[String, Int] {
2: override def generate(): Unit = {
3: val received1 = `yield`(1)
4: println(s"Message sent from the caller: ${received1}")
5: val received2 = `yield`(2)
6: println(s"Message sent from the caller: ${received2}")
7: val received3 = `yield`(3)
8: println(s"Message sent from the caller: ${received3}")
9: }
10: }
11:
12: @main def run(): Unit = {
13: val gen = new Gen()
14: while (gen.hasMoreElements()) {
15: val yielded = gen.nextElement()
16: println("Caller receives a value: ${yielded}")
17: gen.send(s"Caller sends str ${yielded}")
18: }
19: }A simplest event loop[2] setup could be the event loop picking up a task from its backlog and executing that task until the task suspends or completes. The control then is returned to the event loop, and the event loop will pick up the next task to run, until no more pending tasks.
A task queue fetch a task (line 5)
The task, that holds logic like Gen object, is executed accordingly (line 6)
Based on the task’s state (lines 7 ~ 12), the event loop determines
3.1. to place the task back to the task queue, and to fetch the next task to run, or (lines 9 ~ 10)
3.2. to fetch the next task to run if the current task accomplishes (line 12)
1: def consume(taskQueue: TaskQueue[Task[_, _]]): Any = {
2: @tailrec
3: def fnWhile(fetchTask: => Task[_, _]...): Any = {
4:
5: val (newTask, ...) = fetchTask
6: val (_, newTask1) = execute(newTask)
7: newTask1.state() match {
8: case State.Ready | State.Running =>
9: val (_, ...) = newTaskQueue.add(newTask1)
10: fnWhile(newTaskQueue1.fetch())
11: case State.Stopped =>
12: if (0 != newTaskQueue.size()) fnWhile(newTaskQueue.fetch()) else ()
13: }
14: }
15: fnWhile(taskQueue.fetch())
16: }
17: scheduler.taskQueues.foreach { taskQueue =>
18: val callable = new Callable[Any] {
19: @throws(classOf[RuntimeException])
20: override def call(): Any = consume(taskQueue)
21: }
22: executors.submit(callable)
Unlike programming language such as Python, Many programming languages that run on JVM do not have the ability to suspend and resume the execution. GraalVM Espresso 24.0 is capable of achieving such effect through its Continuation API[3] - When a continuation is suspended, the stack is unwound and copied onto the heap as ordinary Java objects. When a continuation is resumed, those objects are put back onto the stack, ….
GraalVM’s Continuation API already offers an object called Generator, which provides functions:
emit(E element) will produce a value and suspend its execution
generate() is the main block that instructs the Generator how to execute the function
hasMoreElements() checks if Generator produce any values, and resumes its execution
nextElement() returns the element produced by the Generator
Developer’s responsibility is to fill in the actual logic in the generate(), and emit(E element) value to the caller function. Then, the main program code exploits hasMoreElements() checking if more values produced by the Generator, and calls nextElement() to obtain the actual value.
With this logic, we can create a class Coroutine that extends Generator, and a send(value: S) function that supplies the value from the caller to Generator object when necessary. Thus the functionality of Coroutine:
Provide a send(value: S) function (line 3 and 4) that supplies a value by the caller function or programme
Wrap the emit(element) (line 6) function inside a
yield(element: E) method (line 5) that returns a
sent value after the execution is resumed
1: abstract class Coroutine[S, E] extends Generator[E] {
2: var value: Option[S] = None
3: def send(value: Option[S]): Unit = this.value = value
4: def send(value: S): Unit = send(Option(value))
5: def `yield`(element: E): Option[S] = {
6: emit(element)
7: value
8: }
9: }Task itself is merely a wrapper that holds the Coroutine (line 1), described above, provided by the developer, and execute Coroutine’s code and react based on the Coroutine’s state (line 4). In addition, it is equipped with a queue (line 5), used to communicate with an external controller for signaling the end of its execution, that is assigned later before scheduling a task.
1: final case class DefaultTask[S, E](
2: coroutine: Coroutine[S, E],
3: ...
4: internalState: Task.State = Task.State.Ready,
5: signal: Option[LinkedBlockingQueue[Stopped]] = None
6: ) ...The Task’s execute() method checks the Coroutine state (line 2), and processes the logic accordinly. For instance, When the Coroutine is just created, the task executes the Coroutine (line 4 to 7); whereas it passes the value (line 9) provided at the runtime to the Coroutine until the Coroutine is Stopped. When Stopped, the Task notifies the external controller with a Stopped signal (line 13).
1: override def execute(value: Option[S] = None): (Option[E] ...) =
2: internalState match {
3: case State.Ready =>
4: if (coroutine.hasMoreElements())
5: (Option(coroutine.nextElement()), ...)
6: else
7: (None, ...)
8: case State.Running =>
9: value.foreach(coroutine.send(_))
10: if (coroutine.hasMoreElements())
11: (Option(coroutine.nextElement()), ...)
12: else {
13: signal.foreach(_.offer(Stopped(name)))
14: (None, ...)
15: }
16: case State.Stopped =>
17: (None, this)
18: }This component acts as a proxy, sitting between Scheduler
and TaskQueue. It passes a Task through the method
send(task: T) (line 5) to its correponded task queue through a
shared channel call tx (line 3), which is a
LinkedBlockingQueue. The relationship between Worker
and TaskQueue is 1 to 1.
1: final case class Worker[T](
2: name: String,
3: tx: LinkedBlockingQueue[T]
4: ) {
5: final def send(task: T): T = { tx.put(task); task }
6: ...
7: }This component stores tasks to be executed, and it serves two purposes:
Fetch a Task from rx, LinkedBlockingQueue
(line 3), shared with Worker, and an internal list (line 4)
1.1. Check the internal list if containing any task (line 8)
1.2. Try fetching with timeout from shared queue rx
(line 9) otherwise
1.3. Blocking fetch from rx indfintively until a new
Task is available
Add an unfinished Task back (line 16) for later execution
1: final case class TaskQueue[T](
2: name: String,
3: rx: LinkedBlockingQueue[T],
4: inner: Seq[T] = Seq.empty[T]
5: ) {
6: final def fetch(..., blockingFetch: => T = rx.take()): (T, ...) = {
7: val task =
8: inner.headOption.getOrElse {
9: tryFetch(timeout) match {
10: case Some(taskInQueue) => taskInQueue
11: case None => blockingFetch
12: }
13: }
14: (task, ...)
15: }
16: final def add(task: T): (T, ...) =
17: (task, copy(inner = task +: inner))
18: }As its name suggests, its primary responsibility is to schedule a task. Additionally, the scheduling mechanism bases on least loaded[4] worker logic, where the scheduler
calculates the range (line 6 and 7) per batch
1: final def nextBatch(): (Range, ...) = {
2: val prev = currentRound
3: val workerLength = workers.length
4: val multiple = workerLength + (batchSize - 1)
5: val nextMultipleOf = multiple - (multiple % batchSize)
6: val next = (prev * batchSize) % nextMultipleOf
7: val end = Math.min(next + batchSize, workerLength)
8: (Range(next, end), ...)
9: }picks up the lightest loading Worker (line 8)
1: override def leastLoaded(): (Worker[T], ...) = {
2: val (tmpWorkers, ...) =
3: if (workers.length <= batchSize) (workers, ...)
4: else {
5: val (range, sched) = nextBatch()
6: (workers.slice(range.start, range.end), ...)
7: }
8: (tmpWorkers.minBy(_.size()), ...)
9: }schedules a Task to the Worker (line 3)
1: override def schedule(task: T) ... {
2: val (worker, ...) = leastLoaded()
3: worker.send(task)
4: ...
5: }Finally, the Runtime assembles all components together, orchestraing the entire flow.
At the beginning, it creates a scheduler, and a queue shared with all tasks for signifying a task accomplishes its execution
1: final def apply(): Runtime = {
2: val nrOfThreads = java.lang.Runtime.getRuntime().availableProcessors()
3: apply(
4: nrOfThreads,
5: LeastLoadedScheduler[Task[_, _]](nrOfThreads, 8),
6: new LinkedBlockingQueue[Stopped]()
7: )
8: }It then launches a list of threads (line 17 to 23), equipped with a task queue (line 20) that
2.1. fetches (line 5)
2.2. executes the Tasks (line 6), and
2.3. determines if continuing this logic (line 12)
1: final def start(execute: Task[_, _] => (Any, Task[_, _])): Unit = {
2: def consume(taskQueue: TaskQueue[Task[_, _]]): Any = {
3: @tailrec
4: def fnWhile(fetchTask: => (Task[_, _], ...)): Any = {
5: ... fetchTask
6: ... execute(newTask)
7: newTask1.state() match {
8: case State.Ready | State.Running =>
9: ...
10: fnWhile(newTaskQueue1.fetch())
11: case State.Stopped =>
12: if (!newTaskQueue.isEmpty()) fnWhile(newTaskQueue.fetch()) else ()
13: }
14: }
15: fnWhile(taskQueue.fetch())
16: }
17: scheduler.taskQueues.foreach { taskQueue =>
18: val callable = new Callable[Any] {
19: @throws(classOf[RuntimeException])
20: override def call(): Any = consume(taskQueue)
21: }
22: executors.submit(callable)
23: }
24: }Next, it passes a task to Scheduler for scheduling
1: final def schedule(task: Task[_, _]): Runtime = copy(
2: scheduler = scheduler.schedule(task),
3: totalScheduledTasks = totalScheduledTasks + 1
4: )In the end, it waits for the Task Stopped signal (line 4), and terminates the entire flow when all scheduled tasks are stopped (line 6)
1: final def spawn(runtime: Runtime, tasks: Task[_, _]*): Unit = {
2: @tailrec
3: def fnWhile(rt: Runtime): Unit = {
4: val stopped = rt.signal.take()
5: val newRt = rt.copy(totalScheduledTasks = rt.totalScheduledTasks - 1)
6: if (0 != newRt.totalScheduledTasks) fnWhile(newRt)
7: }
8: val newRuntime = tasks.toSeq
9: .foldLeft(runtime) { (rt, task) =>
10: ...
11: rt.schedule(newTask)
12: }
13: fnWhile(newRuntime)
14: }[1]. Coroutine
[2]. A Curious Course on Coroutines and Concurrency - Part 7 Let’s Build an Operating System
[3]. Continuation API