This merely serves as a note taking. There may contain misconceptions or inaccurate explanation.
It can be viewed as a combination of an event loop plus coroutines in a very simplified setup.
In general, it allows a program to suspend and resume from where it
was left off. With following Python code, it can be observed that the
generator gen will be suspended at the line 4 when
executing the code at the line 13, and 18. Apart from that, it can also
receive the external parameter, i.e., received, shown at the
line 4, whilst executing the line 15.
1: def gen(beg: int, end: int) -> Generator[int, Any, str]:
2: idx = beg
3: while idx < end:
4: received = yield idx
5: if received:
6: print("received:", received)
7: idx += 1
8: return f"final result is {idx}"
9:
10: gen_fn = gen(0, 3)
11: inspect.isgenerator(gen_fn)
12: True
13: next(gen_fn)
14: 0
15: gen_fn.send("a param from caller!")
16: received: a param from caller!
17: 1
18: next(gen_fn)
19: 2
20: next(gen_fn)
21: ---------------------------------------------------------------------------
22: StopIteration Traceback (most recent call last)
23: Cell In[...], line 1
24: ----> 1 next(gen_fn)
25:
26: StopIteration: final result is 3An event loop picks up a task from its backlog and execute that task until the task suspends or completes. The control then is returned to the event loop. Next, the event loop will pick up the next task to run, until no more pending tasks.
1: class Scheduler: // https://dabeaz.com/coroutines/Coroutines.pdf#page=58
2: def __init__(self):
3: self.ready = Queue()
4: ...
5: def schedule(self,task):
6: self.ready.put(task)
7: def mainloop(self):
8: while self.taskmap: # {task_id: task, ...}
9: task = self.ready.get()
10: result = task.run()
11: self.schedule(task)
Task itself is merely a wrapper that checks and holds the Generator provided by the developer. It is attached a queue used to communicate with Runtime for signaling the end of its execution, assigned later before being scheduled.
The execute method checks the generator state,
processing accordinly. For instance, When the generator is just created,
execute the generator, whereas it passes the value provided at
the runtime to the generator until the generator hits the StopIteration.
When reaching to the StopIteration, the Task notifies the Runtime with a
Stopped signal.
1: def execute(self, value: any = None) -> any:
2: result: any = None
3: match inspect.getgeneratorstate(self.generator):
4: case "GEN_CREATED":
5: result = next(self.generator)
6: ...
7: case "GEN_SUSPENDED":
8: try:
9: result = self.generator.send(value)
10: ...
11: except StopIteration as si:
12: ...
13: result = si.value
14: self.signal.put(Stopped(id(self), self.name))
15: case ...:
16: ...
17: return resultThis component acts as a proxy that passes the scheduled Task to the correponded Task Queue through a shared queue object, plus a counter that records the number of tasks stored in the Task Queue. The relationship between Worker and Task Queue is 1 on 1.
1: class Worker:
2: tx: Queue
3: size: IntIt serves two purposes:
Fetch a Task
Add the unfinished Task back for later execution
1: while task := task_queue.fetch():
2: _ = task.execute()
3: match task.state:
4: case State.Running:
5: task_queue.add(task)
6: case State.Stopped:
7: ... As its name suggests, the primary responsibility is to schedule the task. Additionally, the scheduling mechanism bases on least loaded worker logic, where the scheduler
calculates the range per batch
1: prev = self.init_next_batch
2: self.init_next_batch += 1
3: worker_len = len(self.workers)
4: next_multiple_of = worker_len + (-worker_len % self.batch_size)
5: next = (prev * self.batch_size) % next_multiple_of
6: end = min(next + self.batch_size, worker_len)picks up the lightest loading Worker
1: target_idx: int = None
2: min_qsize: int = math.inf
3: for idx, worker in enumerate(workers):
4: queue_size = worker.size.value
5: if queue_size < min_qsize:
6: min_qsize = queue_size
7: target_idx = idx
8: return workers[target_idx]schedules the Task
1: worker = self.least_loaded_worker()
2: worker.send(task)Finally, the Runtime steps in assembling all components together, orchestraing the entire flow.
At the beginning, it creates the scheduler
1: scheduler = Scheduler(worker_size=3, batch_size=2)
2. runtime = Runtime(scheduler=scheduler)It then launches a list of threads, equipped with a Task Queue that fetch, and execute the Tasks
1: def consume(task_queue: TaskQueue) -> any:
2: while task := task_queue.fetch():
3: _ = task.execute()
4: match task.state:
5: case State.Running:
6: task_queue.add(task)
7: case ...: ...
8:
9: for idx, task_queue in enumerate(self.scheduler.task_queues):
10: thread = threading.Thread(
11: target=consume,
12: args=(task_queue,),
13: daemon=True,
14: )
15: self.threads.append(thread)Next, it pass the task to Scheduler for scheduling
1: self.total_tasks_length = len(tasks)
2: for task in tasks:
3: task.signal = self.task_signal
4: self.scheduler.schedule(task)In the end, it waits for the Task Stopped signal, and terminates the entire flow
1: stopped_tasks = self.total_tasks_length
2: while _ := self.task_signal.get():
3: stopped_tasks -= 1
4: if 0 == stopped_tasks:
5: break[1]. A Curious Course on Coroutines and Concurrency - Part 7 Let’s Build an Operating System
[2]. Nio