Sunday 4 March 2018

Designing Async Task Dispatch Library From Scratch (Part-3)

This is the final part of the series, where we will just creating task queueing system which will be more or less like asyncio BaseEventLoop. To reduce the complexity of the program, we would not be dealing with timed tasks or socket events.
(I had initially planned to do sockets as well, but I have another project in mind to do the same with C++ coroutines. Wait for that!)

Previous parts of the series:



A Task Queue

A tasking system in its simplest form is a queue of callbacks which needs to be called. On top of that, it would provide some APIs to the user to add some tasks or to wait till the task finishes.

What we will be achieving to do is something like below:

def task1():
    print ("Done with task1")

def my_gen():
    print ("Starting coro/gen my_gen")
    yield None
    print ("Finished coro/gen my_gen")

if __name__ == "__main__":
    print ("Starting test")
    print ("-------------")

    #Instantiate a run loop instance
    loop = TaskLoop()

    #Submit a task to the run loop
    loop.call_soon(task1)
    loop.run_once()

    #start a coroutine and finish it
    loop.run_until_complete(my_gen())

    #Stop the run loop
    loop.stop()


The example basically shows how the task loop works with a regular function and a coroutine:

  • call_soon API only takes regular functions as its argument. Anything else, it will throw an exception.
  • run_until_complete API takes only future objects and coroutines and will wait until the task is run to its completion.
  • run_once will execute the first task in the task queue if present.
  • stop will stop the task loop from executing any tasks from the task queue.

Introducing Handles

Well, I lied about queue of tasks in the previous section. Its is actually a queue of Handle objects. 
A Handle is nothing but a stored function object along with its arguments so that it can be invoked at a later time.

class Handle(object):
    """
    A wrapper for holding a callback and the
    arguments it takes.
    """
    def __init__(self, fn, loop, args):
        """
        Constructor.
        Arguments:
            fn   : The callback function. Has to be a regular function.
            loop : The loop which would call the handler.
            args : The argument pack the callback expects.
        """
        self._cb = fn
        self._args = args
        self._loop = loop
        #Set to true if the handler is cancelled
        #Cancelled handlers are not executed by the loop
        self._cancelled = False
        #Set to true if the handler has finished execution
        self._finished = False

    def __call__(self):
        """
        The function call operator.
        """
        assert self._finished == False
        try:
            self._cb(*self._args)
        except Exception as e:
            self._finished = True
            raise e

    def cancel(self):
        """
        Cancel the handle
        """
        assert self._cancelled == False
        if not self._finished:
            self._cancelled = True
        return

    def is_cancelled(self):
        """
        """
        return self._cancelled

As can be safely deducted, a handle stores only regular functions. Futures and coroutines will not be stored in it.

So you ask, how does the task loop then work with futures/coroutines ? Lets look at the implementation of run_until_complete.

run_until_complete decoded

Lets dive into its implementation directly:

    def run_until_complete(self, coro_or_future):
        """
        Run the event loop until the future
        is ready with result or exception.
        """

        if self._loop_running:
            raise RuntimeError("loop already running")

        #Transform into a future or task(inherits from Future)
        task_or_fut = transform_future(coro_or_future, self)
        assert task_or_fut is not None

        is_new_task = not isinstance(task_or_fut, Future)
        #Add a callback to stop loop once the future is ready
        task_or_fut.add_done_callback(self._stop_loop_fut)

        try:
            self.run_forever()
        except Exception as e:
            raise e


This looks a bit different from what we achieved in our previous part of this series. The difference is because of transform_future. This is basically what asyncio.ensure_future is. It takes in either a coroutine generator object or a Future. If its a future it returns it (behaves like an identity function), else, if it is a coroutine, it wraps it inside a Task object and returns that task.

Once the future is ready, _stop_loop_fut will get called which would throw StopLoopException to notify the main loop.

def transform_future(supposed_future_coro, loop):
    """
    Loop works best with futures or with Tasks
    which wraps a generator object.
    So try to introspect the type of
    `supposed_future_coro` argument and do the best
    possible conversion of it.
    1. If its is already a type of Future, then return as it is.
    2. If it is a generator, then wrap it in Task object and
       return the task instance.

    loop object is for creating the task so that it
    can add a callback.
    """
    if isinstance(supposed_future_coro, Future):
        """
        It is a future object. Nothing to do.
        """
        if loop != supposed_future_coro._loop:
            raise RuntimeError("loop provided is not equal to the loop of future")
        return supposed_future_coro
    elif inspect.isgenerator(supposed_future_coro):
        #Create a task object
        t = loop.add_task(supposed_future_coro)
        return t
    else:
        raise TypeError("Cannot create an asynchronous execution unit from the provided arg")

    assert (False and "Code Not Reached")
    return None

In the above source, the task creation logic is handled inside the loops add_task method, bit all it does is really create a Task object.

The constructor of the task object adds the _step method to the loop which gets called later.
class Task(Future):
    """
    Wraps a generator yielding a Future
    object and abstracts away the handling of future API

    This version adds loop to the task
    """
    def __init__(self, loop, gen):
        """
        """
        super().__init__(loop)
        assert (loop is not None)
        self._loop = loop
        self.gen = gen
        self._loop.call_soon(self.step)

And from here on everything goes as per what we have already seen in the previous posts. Only change is that the calling of the Task method is now automated to be don by the task loop.

The complete implementation of the task loop is:
class TaskLoop(object):
    """
    """
    def __init__(self):
        #A deque of handles
        #CPython implementation of deque is thread-safe
        self._ready = deque()
        self._loop_running = False
        pass

    def total_tasks(self):
        """
        """
        return len(self._ready)

    def run_once(self):
        """
        Execute a task from the ready queue
        """
        if len(self._ready) == 0:
            #There are no tasks to be executed
            return

        hndl = self._ready.pop()
        #Call the handler which invokes the
        #wrapped function
        hndl()

    def run_forever(self):
        """
        This is the main loop which runs forever.

        TODO: Currently just runs infinitely. Should run
        only till some work is there.
        """
        self._loop_running = True

        while True:
            try:
                self.run_once()
            except StopLoopException:
                self._loop_running = False
                break
            except Exception as e:
                print ("Error in run_forever: {}".format(str(e)))

        return

    def run_until_complete(self, coro_or_future):
        """
        Run the event loop until the future
        is ready with result or exception.
        """

        if self._loop_running:
            raise RuntimeError("loop already running")

        #Transform into a future or task(inherits from Future)
        task_or_fut = transform_future(coro_or_future, self)
        assert task_or_fut is not None

        is_new_task = not isinstance(task_or_fut, Future)
        #Add a callback to stop loop once the future is ready
        task_or_fut.add_done_callback(self._stop_loop_fut)

        try:
            self.run_forever()
        except Exception as e:
            raise e

    def add_task(self, gen_obj):
        """
        Creates a task object and returns it
        """
        t = Task(self, gen_obj)
        return t

    def call_soon(self, cb, *args):
        """
        Add the callback to the loops ready queue
        for it to be executed.
        """
        if inspect.isgeneratorfunction(cb):
            raise NotRegularFunction()

        hndl = Handle(cb, self, args)
        self._ready.append(hndl)
        return hndl

    def stop(self):
        """
        Stop the loop as soon as possible
        """
        self.call_soon(self._stop_loop)

    def _stop_loop(self):
        """
        Raise the stop loop exception
        """
        raise StopLoopException("stop the loop")

    def _stop_loop_fut(self, fut):
        """
        """
        assert fut is not None
        raise StopLoopException("stop the loop")


The complete source code for the series can be found at
https://github.com/arun11299/PyCoroutines

Hope you enjoyed it!

No comments:

Post a Comment