When writing concurrent programs, have you ever wondered how its mechanism may work under the hood? I have a similar question. Here is the journey of exploration. The code can be found at codeberg
Before starting, two concepts are important
Coroutine, according to Wikipedia, allows an execution to be suspended, and resumed from where it was left off. From the code snippet below, we can observe that the coroutine gen yields values at the line 4th, 13th, 18th, and 20th, and the main thread notifies the coroutine by send method at the line 4th, 15th; gen instance can then output the received value to console.
1: def gen(beg: int, end: int) -> Generator[int, Any, str]:
2: idx = beg
3: while idx < end:
4: received = yield idx
5: if received:
6: print("received:", received)
7: idx += 1
8: return f"final result is {idx}"
9:
10: gen_fn = gen(0, 3)
11: inspect.isgenerator(gen_fn)
12: True
13: next(gen_fn)
14: 0
15: gen_fn.send("a param from caller!")
16: received: a param from caller!
17: 1
18: next(gen_fn)
19: 2
20: next(gen_fn)
21: ---------------------------------------------------------------------------
22: StopIteration Traceback (most recent call last)
23: Cell In[...], line 1
24: ----> 1 next(gen_fn)
25:
26: StopIteration: final result is 3Thus, it can be viewed as a generalized subroutine in favor of cooperative multitasking. A higher level workflow between coroutine(s), and the main thread can be roughly sketched with the following image.
Beside the component coroutine, the entire system needs a way to manage the execution of coruotines submitted. A simplest setup is create an event loop that picks up a coroutine from its backlog, and execute that coroutine until the coroutine suspends, or completes. The control flow is then returned to the event loop, which picks up the next coroutine to run, repeating the same operation, until no more pending tasks. The pseudocode could be something like this:
SET all coroutines TO event loop's backlog somewhere in the system
WHILE event loop's backlog size > 0 DO
GET a coroutine from event loop's backlog
EXECUTE the coroutine
IF running == coroutine state THEN
PUT the coroutine back to the event loop's backlog
ELSE IF done == coroutine state THEN
PASS
END IF
or in Python code, as below, where
First, the program retrieves a task from the queue at the line 9th
Second, the program executes the task at the line 10th
Third, the program schedules the task at the line 11th by placing the task back to the queue - later the code can be further refined to check the task state in determining to continue running the task or not
1: class Scheduler: # https://dabeaz.com/coroutines/Coroutines.pdf#page=58
2: def __init__(self):
3: self.ready = Queue()
4: ...
5: def schedule(self,task):
6: self.ready.put(task)
7: def mainloop(self):
8: while self.taskmap: # {task_id: task, ...}
9: task = self.ready.get()
10: result = task.run()
11: self.schedule(task)The diagram below sketches a higher overview relationship between the components this project is going to use.
Task is the model upon which all other components operate
Worker serves as a proxy sitting between Scheduler and Task Queue
Task Queue stores tasks to be executed
Scheduler’s responsibility is to schedule tasks
Runtime assembles all components together, hosting an environemt for necessary operations
Task itself is merely a wrapper that checks and holds the Generator provided by the developer. It is attached a queue used to communicate with Runtime for signaling the end of its execution, assigned later before being scheduled.
The execute method checks the generator state,
processing accordingly. For instance, When the generator is just
created, execute the generator, whereas it passes the value
provided at the runtime to the generator until the generator hits the
StopIteration. When reaching to the StopIteration, the Task notifies the
Runtime with a Stopped signal.
1: def execute(self, value: any = None) -> any:
2: result: any = None
3: match inspect.getgeneratorstate(self.generator):
4: case "GEN_CREATED":
5: result = next(self.generator)
6: ...
7: case "GEN_SUSPENDED":
8: try:
9: result = self.generator.send(value)
10: ...
11: except StopIteration as si:
12: ...
13: result = si.value
14: self.signal.put(Stopped(id(self), self.name))
15: case ...:
16: ...
17: return resultThis component acts as a proxy that passes the scheduled Task to the corresponded Task Queue through a shared queue object, plus a counter that records the number of tasks stored in the Task Queue. The relationship between Worker and Task Queue is 1 on 1.
1: class Worker:
2: tx: Queue
3: size: IntThe purpose of task queue:
Fetch a Task from rx at the line 1st
1.1. Check the internal queue inner if containing any Tasks at the line 7th
1.2. Try fetching from the rx without blocking at the line 14th
1.3. Blocking fetch from fx indefinitely at the line 17th until a new Task is available
Add an unaccomplished Task back for later execution at the line 19th
1: class TaskQueue:
2: def __init__(... rx: Queue, inner: deque = deque()):
3: self.rx: Queue = rx
4: self.inner: deque = inner
5: ...
6: def fetch(self) -> Task:
7: if len(self.inner) > 0:
8: return self.inner.popleft()
9: (task, error) = self.try_fetch()
10: if not error:
11: return task
12: return self.blocking_fetch()
13: def try_fetch(self) -> Tuple[Task, Empty]:
14: ...task: Task = self.rx.get_nowait()
15: return task, ... # if error return None, empty
16: def blocking_fetch(self) -> Task:
17: return self.rx.get()
18: def add(self, task: Task) -> Task:
19: self.inner.append(task)
20: return taskThis is pretty self explanatory, this component manages how a task to be ran. Specifically, the scheduling makes use of Least-Loaded (LL) strategy.
This project exploits least loaded scheduling strategy, which schedules a task to a least loaded worker. The primary reason comes from that the scheduling strategy employed by, e.g., Rust’s Tokio work-stealing scheduler is very complicated[1]. Least-Loaded scheduling strategy is simple yet effective, and can address the issue of starvation[2].
For LL strategy to work, two functions are required:
Whilst searching the next batch’s range, first keep the value of current round (at the line 5th), and find the length of worker list (at the line 6th).
Second, find the next multiple of value (from the line 7th to 8th).
The line 7th by adding batch size - 1 to worker
length ensures the obtained number multiple is at
least as large as the next multiple, and smaller than the one after
that. Then, using that number, i.e., mutliple, subtracts
the modulo value for acquiring the desired next multiple of number. For
instance, with the setting of 22 workers, and batch size 8, the
multiple value is 29, which is larger than the next
multiple value 24, but is smaller than its next multiple of value after
24, which is 32.
Third, calculate the next batch’s range. The line 9th makes sure the
next value will rotate when exceeding the expected next multiple of
number. And the line 10th picks up the minimum value between
workers length, and next + batch size,
setting the end of range value to the worker length when the
next + batch size exceeds the value of worker
length. Again with the setting of 22 workers, and batch size 8,
when the next + batch size reaches 24, the logic picks up
the worker length 22 instead.
Therefore, configuring 22 workers with 8 as its batch size, the range of next batch values in sequence should be (0, 8), (8, 16), (16, 22), and then start over from (0, 8) again.
1: def next_range(self) -> Tuple[int, int]:
2: prev = self.init_next_batch # in __init__() function, self.init_next_batch: int = 0
3: self.init_next_batch += 1
4: worker_len = len(self.workers)
5: next_multiple_of = worker_len + (-worker_len % self.batch_size)
6: next = (prev * self.batch_size) % next_multiple_of
7: end = min(next + self.batch_size, worker_len)
8: return next, endIn order to find the least loaded worker, the logic first checks if the worker length is smaller than the batch size - if true, the entire worker list is returned at the line 4th; otherwise, the next batch range is calculated at the line 6th, and then the logic picks up the worker list based on the range given.
Second, choose the worker having the minimum queue size (at the lines 15th).
1: def least_loaded_worker(self) -> Worker:
2: workers: list[Worker] = []
3: if len(self.workers) <= self.batch_size:
4: workers = self.workers
5: else:
6: next, end = self.next_range()
7: workers = self.workers[next:end]
8: target_idx: int = 0
9: min_qsize: int = math.inf
10: for idx, worker in enumerate(workers):
11: queue_size = worker.size.value
12: if queue_size < min_qsize:
13: min_qsize = queue_size
14: target_idx = idx
15: return workers[target_idx]Finally, it is the time to orchestrate the entire flow. The Runtime object in fact performs following functions
At the beginning, it creates a scheduler and a queue, shared with all tasks for signalling when a task accomplishes its execution
1: scheduler = Scheduler(worker_size=3, batch_size=2)
2: runtime = Runtime(scheduler=scheduler, task_signal=Queue())Runtime instance then launches a list of threads, from the line 10th to 28th, equipped with a task queue at the line 13th that
2.1. Fetches a task at the line 3th
2.2. Executes the Tasks at the line 4th, and
2.3. Determines if continuing this operation at the line 9th
1: def start(self) -> None:
2: def consume(task_queue: TaskQueue) -> any:
3: while task := task_queue.fetch():
4: _ = task.execute()
5: match task.state:
6: case State.Running:
7: task_queue.add(task)
8: case State.Stop:
9: pass
10: for idx, task_queue in enumerate(self.scheduler.task_queues):
11: thread = threading.Thread(
12: target=consume,
13: args=(task_queue,),
14: daemon=True,
15: )
16: self.threads.append(thread)
17: for thread in self.threads:
18: thread.start()In the end, the Runtime
3.1. Passes tasks at the line 5th to the Scheduler instance
3.2. Waits for Tasks Stopped signal at the line 8th, and
3.3. Terminates the entire flow when all scheduled tasks are stopped at the line 10th
1: def spawn(self, tasks: list[Task]) -> None:
2: self.total_tasks_length = len(tasks)
3: for task in tasks:
4: task.signal = self.task_signal
5: self.scheduler.schedule(task)
6: def join(self) -> None:
7: stopped_tasks = self.total_tasks_length
8: while _ := self.task_signal.get():
9: stopped_tasks -= 1
10: if 0 == stopped_tasks:
11: break[1]. A Curious Course on Coroutines and Concurrency - Part 7 Let’s Build an Operating System
[2]. Nio