This is the final part of the series, where we will just creating task queueing system which will be more or less like asyncio BaseEventLoop. To reduce the complexity of the program, we would not be dealing with timed tasks or socket events.
(I had initially planned to do sockets as well, but I have another project in mind to do the same with C++ coroutines. Wait for that!)
Previous parts of the series:
The example basically shows how the task loop works with a regular function and a coroutine:
As can be safely deducted, a handle stores only regular functions. Futures and coroutines will not be stored in it.
So you ask, how does the task loop then work with futures/coroutines ? Lets look at the implementation of run_until_complete.
This looks a bit different from what we achieved in our previous part of this series. The difference is because of transform_future. This is basically what asyncio.ensure_future is. It takes in either a coroutine generator object or a Future. If its a future it returns it (behaves like an identity function), else, if it is a coroutine, it wraps it inside a Task object and returns that task.
Once the future is ready, _stop_loop_fut will get called which would throw StopLoopException to notify the main loop.
In the above source, the task creation logic is handled inside the loops add_task method, bit all it does is really create a Task object.
The constructor of the task object adds the _step method to the loop which gets called later.
And from here on everything goes as per what we have already seen in the previous posts. Only change is that the calling of the Task method is now automated to be don by the task loop.
The complete implementation of the task loop is:
The complete source code for the series can be found at
https://github.com/arun11299/PyCoroutines
Hope you enjoyed it!
(I had initially planned to do sockets as well, but I have another project in mind to do the same with C++ coroutines. Wait for that!)
Previous parts of the series:
A Task Queue
A tasking system in its simplest form is a queue of callbacks which needs to be called. On top of that, it would provide some APIs to the user to add some tasks or to wait till the task finishes.
What we will be achieving to do is something like below:
def task1(): print ("Done with task1") def my_gen(): print ("Starting coro/gen my_gen") yield None print ("Finished coro/gen my_gen") if __name__ == "__main__": print ("Starting test") print ("-------------") #Instantiate a run loop instance loop = TaskLoop() #Submit a task to the run loop loop.call_soon(task1) loop.run_once() #start a coroutine and finish it loop.run_until_complete(my_gen()) #Stop the run loop loop.stop()
The example basically shows how the task loop works with a regular function and a coroutine:
- call_soon API only takes regular functions as its argument. Anything else, it will throw an exception.
- run_until_complete API takes only future objects and coroutines and will wait until the task is run to its completion.
- run_once will execute the first task in the task queue if present.
- stop will stop the task loop from executing any tasks from the task queue.
Introducing Handles
Well, I lied about queue of tasks in the previous section. Its is actually a queue of Handle objects.
A Handle is nothing but a stored function object along with its arguments so that it can be invoked at a later time.
class Handle(object): """ A wrapper for holding a callback and the arguments it takes. """ def __init__(self, fn, loop, args): """ Constructor. Arguments: fn : The callback function. Has to be a regular function. loop : The loop which would call the handler. args : The argument pack the callback expects. """ self._cb = fn self._args = args self._loop = loop #Set to true if the handler is cancelled #Cancelled handlers are not executed by the loop self._cancelled = False #Set to true if the handler has finished execution self._finished = False def __call__(self): """ The function call operator. """ assert self._finished == False try: self._cb(*self._args) except Exception as e: self._finished = True raise e def cancel(self): """ Cancel the handle """ assert self._cancelled == False if not self._finished: self._cancelled = True return def is_cancelled(self): """ """ return self._cancelled
As can be safely deducted, a handle stores only regular functions. Futures and coroutines will not be stored in it.
So you ask, how does the task loop then work with futures/coroutines ? Lets look at the implementation of run_until_complete.
run_until_complete decoded
Lets dive into its implementation directly:
def run_until_complete(self, coro_or_future): """ Run the event loop until the future is ready with result or exception. """ if self._loop_running: raise RuntimeError("loop already running") #Transform into a future or task(inherits from Future) task_or_fut = transform_future(coro_or_future, self) assert task_or_fut is not None is_new_task = not isinstance(task_or_fut, Future) #Add a callback to stop loop once the future is ready task_or_fut.add_done_callback(self._stop_loop_fut) try: self.run_forever() except Exception as e: raise e
This looks a bit different from what we achieved in our previous part of this series. The difference is because of transform_future. This is basically what asyncio.ensure_future is. It takes in either a coroutine generator object or a Future. If its a future it returns it (behaves like an identity function), else, if it is a coroutine, it wraps it inside a Task object and returns that task.
Once the future is ready, _stop_loop_fut will get called which would throw StopLoopException to notify the main loop.
def transform_future(supposed_future_coro, loop): """ Loop works best with futures or with Tasks which wraps a generator object. So try to introspect the type of `supposed_future_coro` argument and do the best possible conversion of it. 1. If its is already a type of Future, then return as it is. 2. If it is a generator, then wrap it in Task object and return the task instance. loop object is for creating the task so that it can add a callback. """ if isinstance(supposed_future_coro, Future): """ It is a future object. Nothing to do. """ if loop != supposed_future_coro._loop: raise RuntimeError("loop provided is not equal to the loop of future") return supposed_future_coro elif inspect.isgenerator(supposed_future_coro): #Create a task object t = loop.add_task(supposed_future_coro) return t else: raise TypeError("Cannot create an asynchronous execution unit from the provided arg") assert (False and "Code Not Reached") return None
In the above source, the task creation logic is handled inside the loops add_task method, bit all it does is really create a Task object.
The constructor of the task object adds the _step method to the loop which gets called later.
class Task(Future): """ Wraps a generator yielding a Future object and abstracts away the handling of future API This version adds loop to the task """ def __init__(self, loop, gen): """ """ super().__init__(loop) assert (loop is not None) self._loop = loop self.gen = gen self._loop.call_soon(self.step)
And from here on everything goes as per what we have already seen in the previous posts. Only change is that the calling of the Task method is now automated to be don by the task loop.
The complete implementation of the task loop is:
class TaskLoop(object): """ """ def __init__(self): #A deque of handles #CPython implementation of deque is thread-safe self._ready = deque() self._loop_running = False pass def total_tasks(self): """ """ return len(self._ready) def run_once(self): """ Execute a task from the ready queue """ if len(self._ready) == 0: #There are no tasks to be executed return hndl = self._ready.pop() #Call the handler which invokes the #wrapped function hndl() def run_forever(self): """ This is the main loop which runs forever. TODO: Currently just runs infinitely. Should run only till some work is there. """ self._loop_running = True while True: try: self.run_once() except StopLoopException: self._loop_running = False break except Exception as e: print ("Error in run_forever: {}".format(str(e))) return def run_until_complete(self, coro_or_future): """ Run the event loop until the future is ready with result or exception. """ if self._loop_running: raise RuntimeError("loop already running") #Transform into a future or task(inherits from Future) task_or_fut = transform_future(coro_or_future, self) assert task_or_fut is not None is_new_task = not isinstance(task_or_fut, Future) #Add a callback to stop loop once the future is ready task_or_fut.add_done_callback(self._stop_loop_fut) try: self.run_forever() except Exception as e: raise e def add_task(self, gen_obj): """ Creates a task object and returns it """ t = Task(self, gen_obj) return t def call_soon(self, cb, *args): """ Add the callback to the loops ready queue for it to be executed. """ if inspect.isgeneratorfunction(cb): raise NotRegularFunction() hndl = Handle(cb, self, args) self._ready.append(hndl) return hndl def stop(self): """ Stop the loop as soon as possible """ self.call_soon(self._stop_loop) def _stop_loop(self): """ Raise the stop loop exception """ raise StopLoopException("stop the loop") def _stop_loop_fut(self, fut): """ """ assert fut is not None raise StopLoopException("stop the loop")
The complete source code for the series can be found at
https://github.com/arun11299/PyCoroutines
Hope you enjoyed it!
No comments:
Post a Comment