Introduction

When writing concurrent programs, have you ever wondered how its mechanism may work under the hood? I have a similar question. Here is the journey of exploration. The code can be found at codeberg

Concepts

Before starting, two concepts are important

  1. Coroutine

  2. Event Loop

Coroutine

Coroutine, according to Wikipedia, allows an execution to be suspended, and resumed from where it was left off. From the code snippet below, we can observe that the coroutine gen yields values at the line 4th, 13th, 18th, and 20th, and the main thread notifies the coroutine by send method at the line 4th, 15th; gen instance can then output the received value to console.

  1: def gen(beg: int, end: int) -> Generator[int, Any, str]:
  2:     idx = beg
  3:     while idx < end:
  4:         received = yield idx
  5:         if received:
  6:             print("received:", received)
  7:         idx += 1
  8:     return f"final result is {idx}"
  9:
 10: gen_fn = gen(0, 3)
 11: inspect.isgenerator(gen_fn) 
 12: True 
 13: next(gen_fn)
 14: 0
 15: gen_fn.send("a param from caller!")
 16: received: a param from caller!
 17: 1
 18: next(gen_fn) 
 19: 2
 20: next(gen_fn)
 21: ---------------------------------------------------------------------------
 22: StopIteration                             Traceback (most recent call last)
 23: Cell In[...], line 1
 24: ----> 1 next(gen_fn)
 25:
 26: StopIteration: final result is 3

Thus, it can be viewed as a generalized subroutine in favor of cooperative multitasking. A higher level workflow between coroutine(s), and the main thread can be roughly sketched with the following image.

Coroutine cooperates with the main thread

Event Loop

Beside the component coroutine, the entire system needs a way to manage the execution of coruotines submitted. A simplest setup is create an event loop that picks up a coroutine from its backlog, and execute that coroutine until the coroutine suspends, or completes. The control flow is then returned to the event loop, which picks up the next coroutine to run, repeating the same operation, until no more pending tasks. The pseudocode could be something like this:

SET all coroutines TO event loop's backlog somewhere in the system
WHILE event loop's backlog size > 0 DO
  GET a coroutine from event loop's backlog
  EXECUTE the coroutine
  IF running == coroutine state THEN
    PUT the coroutine back to the event loop's backlog
  ELSE IF done == coroutine state THEN
    PASS
  END IF

or in Python code, as below, where

  1: class Scheduler: # https://dabeaz.com/coroutines/Coroutines.pdf#page=58
  2:     def __init__(self):
  3:         self.ready = Queue()
  4:         ...
  5:     def schedule(self,task):
  6:         self.ready.put(task) 
  7:     def mainloop(self):
  8:         while self.taskmap: # {task_id: task, ...}
  9:           task = self.ready.get()
 10:           result = task.run()
 11:           self.schedule(task)

Higher Overview

The diagram below sketches a higher overview relationship between the components this project is going to use.

Class Diagram

Components

Task

Task itself is merely a wrapper that checks and holds the Generator provided by the developer. It is attached a queue used to communicate with Runtime for signaling the end of its execution, assigned later before being scheduled.

The execute method checks the generator state, processing accordingly. For instance, When the generator is just created, execute the generator, whereas it passes the value provided at the runtime to the generator until the generator hits the StopIteration. When reaching to the StopIteration, the Task notifies the Runtime with a Stopped signal.

  1: def execute(self, value: any = None) -> any:
  2:     result: any = None
  3:     match inspect.getgeneratorstate(self.generator):
  4:         case "GEN_CREATED":
  5:             result = next(self.generator)
  6:             ...
  7:         case "GEN_SUSPENDED":
  8:             try:
  9:                 result = self.generator.send(value)
 10:                 ...
 11:             except StopIteration as si:
 12:                 ...
 13:                 result = si.value
 14:                 self.signal.put(Stopped(id(self), self.name))
 15:         case ...:
 16:             ...
 17:     return result

Worker

This component acts as a proxy that passes the scheduled Task to the corresponded Task Queue through a shared queue object, plus a counter that records the number of tasks stored in the Task Queue. The relationship between Worker and Task Queue is 1 on 1.

  1: class Worker:
  2:     tx: Queue
  3:     size: Int

Task Queue

The purpose of task queue:

  1. Fetch a Task from rx at the line 1st

    1.1. Check the internal queue inner if containing any Tasks at the line 7th

    1.2. Try fetching from the rx without blocking at the line 14th

    1.3. Blocking fetch from fx indefinitely at the line 17th until a new Task is available

  2. Add an unaccomplished Task back for later execution at the line 19th

  1: class TaskQueue:
  2:     def __init__(... rx: Queue, inner: deque = deque()):  
  3:         self.rx: Queue = rx
  4:         self.inner: deque = inner
  5:         ... 
  6:      def fetch(self) -> Task:
  7:          if len(self.inner) > 0:
  8:              return self.inner.popleft()
  9:          (task, error) = self.try_fetch()
 10:          if not error:
 11:              return task
 12:          return self.blocking_fetch()
 13:      def try_fetch(self) -> Tuple[Task, Empty]:
 14:          ...task: Task = self.rx.get_nowait()
 15:          return task, ... # if error return None, empty
 16:      def blocking_fetch(self) -> Task:
 17:          return self.rx.get()
 18:      def add(self, task: Task) -> Task:
 19:          self.inner.append(task)
 20:          return task

Scheduler

This is pretty self explanatory, this component manages how a task to be ran. Specifically, the scheduling makes use of Least-Loaded (LL) strategy.

Least-Loaded (LL) Scheduler

This project exploits least loaded scheduling strategy, which schedules a task to a least loaded worker. The primary reason comes from that the scheduling strategy employed by, e.g., Rust’s Tokio work-stealing scheduler is very complicated[1]. Least-Loaded scheduling strategy is simple yet effective, and can address the issue of starvation[2].

For LL strategy to work, two functions are required:

  1. Calculates the range per batch

Whilst searching the next batch’s range, first keep the value of current round (at the line 5th), and find the length of worker list (at the line 6th).

Second, find the next multiple of value (from the line 7th to 8th). The line 7th by adding batch size - 1 to worker length ensures the obtained number multiple is at least as large as the next multiple, and smaller than the one after that. Then, using that number, i.e., mutliple, subtracts the modulo value for acquiring the desired next multiple of number. For instance, with the setting of 22 workers, and batch size 8, the multiple value is 29, which is larger than the next multiple value 24, but is smaller than its next multiple of value after 24, which is 32.

Third, calculate the next batch’s range. The line 9th makes sure the next value will rotate when exceeding the expected next multiple of number. And the line 10th picks up the minimum value between workers length, and next + batch size, setting the end of range value to the worker length when the next + batch size exceeds the value of worker length. Again with the setting of 22 workers, and batch size 8, when the next + batch size reaches 24, the logic picks up the worker length 22 instead.

Therefore, configuring 22 workers with 8 as its batch size, the range of next batch values in sequence should be (0, 8), (8, 16), (16, 22), and then start over from (0, 8) again.

  1: def next_range(self) -> Tuple[int, int]:
  2:     prev = self.init_next_batch # in __init__() function, self.init_next_batch: int = 0
  3:     self.init_next_batch += 1
  4:     worker_len = len(self.workers)
  5:     next_multiple_of = worker_len + (-worker_len % self.batch_size)
  6:     next = (prev * self.batch_size) % next_multiple_of
  7:     end = min(next + self.batch_size, worker_len)
  8:     return next, end
  1. Pick up the lightest loading worker

In order to find the least loaded worker, the logic first checks if the worker length is smaller than the batch size - if true, the entire worker list is returned at the line 4th; otherwise, the next batch range is calculated at the line 6th, and then the logic picks up the worker list based on the range given.

Second, choose the worker having the minimum queue size (at the lines 15th).

  1: def least_loaded_worker(self) -> Worker:
  2:     workers: list[Worker] = []
  3:     if len(self.workers) <= self.batch_size:
  4:         workers = self.workers
  5:     else:
  6:         next, end = self.next_range()
  7:         workers = self.workers[next:end]
  8:     target_idx: int = 0
  9:     min_qsize: int = math.inf
 10:     for idx, worker in enumerate(workers):
 11:         queue_size = worker.size.value
 12:         if queue_size < min_qsize:
 13:             min_qsize = queue_size
 14:             target_idx = idx
 15:     return workers[target_idx]

Runtime

Finally, it is the time to orchestrate the entire flow. The Runtime object in fact performs following functions

  1. At the beginning, it creates a scheduler and a queue, shared with all tasks for signalling when a task accomplishes its execution

      1: scheduler = Scheduler(worker_size=3, batch_size=2)
      2: runtime = Runtime(scheduler=scheduler, task_signal=Queue())
  2. Runtime instance then launches a list of threads, from the line 10th to 28th, equipped with a task queue at the line 13th that

    2.1. Fetches a task at the line 3th

    2.2. Executes the Tasks at the line 4th, and

    2.3. Determines if continuing this operation at the line 9th

      1: def start(self) -> None:
      2:     def consume(task_queue: TaskQueue) -> any:
      3:         while task := task_queue.fetch():
      4:             _ = task.execute()
      5:             match task.state:
      6:                 case State.Running:
      7:                     task_queue.add(task)
      8:                 case State.Stop: 
      9:                     pass
     10:     for idx, task_queue in enumerate(self.scheduler.task_queues):
     11:         thread = threading.Thread(
     12:             target=consume,
     13:             args=(task_queue,),
     14:             daemon=True,
     15:         )
     16:         self.threads.append(thread)
     17:     for thread in self.threads:
     18:         thread.start()
  3. In the end, the Runtime

    3.1. Passes tasks at the line 5th to the Scheduler instance

    3.2. Waits for Tasks Stopped signal at the line 8th, and

    3.3. Terminates the entire flow when all scheduled tasks are stopped at the line 10th

      1: def spawn(self, tasks: list[Task]) -> None:
      2:     self.total_tasks_length = len(tasks)
      3:     for task in tasks:
      4:         task.signal = self.task_signal
      5:          self.scheduler.schedule(task)
      6: def join(self) -> None:
      7:     stopped_tasks = self.total_tasks_length
      8:     while _ := self.task_signal.get():
      9:         stopped_tasks -= 1
     10:         if 0 == stopped_tasks:
     11:             break

References

[1]. A Curious Course on Coroutines and Concurrency - Part 7 Let’s Build an Operating System

[2]. Nio