Saturday, 6 January 2018

Designing Async Task Dispatch Library From Scratch (Part-1)

What the series is about

  • Understanding how generators work
  • Understand how future object works
  • How to create a task for asynchronous execution
  • Creating a simple but almost complete tasking and networking library

What the series is _not_ about

  • Understanding basic usage of generators. Reader is expected to have used it before.
  • Creating a library rivalling asyncio or anything else. I am only doing this to understand the whole thing better from a system programmers perspective.

The 'yield' Keyword

Lets take the below simple example of a generator:

def my_gen():
    ret = yield 5
    return ret

Few questions that comes to my mind are:

  • What happens when the 'yield' expression gets executed ?
  • What do we get as the return value of this generator-function ?
  • What more is there to this generator-function other than just yielding 5 ?
To answer above questions and to get enlightened we need to know how a generator-function is different from a normal function.

Difference between a generator-function and a regular function

NOTE: Generators are of iterable types. Iterables as a concept is something we would not be looking at in this post.

Few of the very important differences are listed below. There are more difference than what are listed below but out of scope for the current topic.
  1. Suspension and resume capability
          Unlike regular function, a generator-function can be suspended in between its execution and can be resumed from the same point. That means all the states are saved before getting suspended so that they can be reused when the generator-function is resumed. Its the presence of the yield expression inside a function that provides this capability. The yield is basically the suspension and resumption point.

      2. Returns a generator on calling a generator-function

          Unlike regular function which returns some value (or nothing) on execution, a generator-function returns a generator object.  This generator object is what we will make use or abuse to control how the generator-function should execute.

     3. Suspension and resumption of generator-function can be controlled

         As said above, using the generator object one can control ow to resume and what value to send to it from the client side. We will see a lot more on this in the next section.

Deconstructing a generator-function

Lets take this hello-world example:

def my_gen():
    val = yield "hello, world!"
    return val

There are 2 ways that you can work with this generator-function. Lets see the hard way first:

# Prime the generator
g = my_gen()

#Get the yield value
res = next(g)
print ("my_gen yielded result: {}".format(res))

try:
    next(g)
except StopIteration as e:
    print ("my_gen generator function returned: {}".format(e.value))

This would print:

my_gen yielded result: hello, world!
my_gen generator function returned: None

Hmm...whats going on ? The assignment clearly did not work as the return value is printed as "None"!
Now is a good time to explain about controlling a generator-function using the generator object.

A generator instance has 3 methods defined on it:

  • next
  • send
  • throw
The next  method will proceed to the next 'yield' statement present in the function and executes whatever expression is there to its right side. The return of which is passed to the called of the next method not to the variable on the left hand side of the assignment statement.

The throw method is what will be used by the client/scheduler program to pass exceptions to the generator-function.

The send method is what makes the generator behave as a coroutine. Thanks to PEP-342. 
It is similar to what next does in proceeding to the next yield  statement in the function, but as the name suggests, it can send a value as the result of the execution of the right hand side of the assignment statement.
That means, its completely under my control on what to set the value of the variable val as ? Yup!!

See this in action:

g = my_gen()
next(g) # Yields "hello, world!"
try:
    g.send(42) # Sends in a value 42 which gets assigned to val and does next(g)
except StopIteration as e:
    print ("my_gen returned: {}".format(e.value))

It prints:
my_gen returned: 42

NOTE: In the above example I am using the free function next instead of the member function. The free function operates on an iterable object which the generator is and calls its __next__ method.

Yeah, fetching the return value of a generator-function is pretty weird. Agreed. It is set as the value field of the StopIteration exception.  Remember this.

Isn't this pretty freakishly powerful ? In fact this ability to "send" values to the generator is what makes it a coroutine and is used for creating a tasking system like asyncio.

So, lets reiterate on what is happening with the above simple generator-function:

  1. Prime the generator by calling the function first. That gives us a generator object to work with.
  2. Calling next on the generator object will go to the next yield statement in the function and return the output of its expression to its right side. At this point the function is suspended. It needs to be resumed by calling any of the 3 methods next/send/resume.
  3. At this suspended state, the control is back to client. We want to pass some random value to the generator. If the yield is used as an assignment statement, then the sent value would get assigned to it, otherwise it would just behave like regular next call.

Adding a pictorial view if it helps at all:



As can be seen in the above picture, I have divided the generator function into two blocks.
The client/scheduler code dealing with the generator does below things:
a. Prime the generator function to get the generator object.

b. Do a next on it, which calls the __next__ method of the generator, which in turn goes to the first yield statement inside the generator function and returns the output of the RHS expression, None if no RHS expression is present. In our example, it returns 5.

c. Now we want to set the value of res to something meaningful. This is done by the send method, which places the value 42 on res and also calls the __next__ method of the generator.

d. Since no more yields are present, it will throw a StopIteration exception having the return value of the generator function set in its value member.


I still don't see the point

So, how is all these freaky things going to make me any useful as a programmer you ask.
Lets see a small example of some database driver that you wrote. The task is to fetch some rows from the DB and do something with the rows.

The client of your basic DB driver would probably write something like this:

class DDBResultFetcher(object):
    def __init__(self, rows_cb):
        self.instance = None
        self.is_connected = False
        self._cb = rows_cb
    
    def connect(self):
        if not self.instance:
            instance = MyDBDriver.get_correct_instance()
        
        if self.connected:
            return
        
        self.instance.connect()
        self.is_connected = True
    
    @property
    def connected(self):
        return is_connected
    
    def on_rows_cb(self, rows):
        self._cb(rows)
    
if __name__ == "__main__":
    
    def manage_rows(rows):
        for row in rows:
            #Do something with the rows
            ....
            
    db_f = DDBResultFetcher(manage_rows)
    db_f.connect()
    
    db_f.run()
    
    ##Additional code to check if the results have been completely fetched
    ##or not
    ......
           
Even though I have not added many error or exception checks and still many handling of cases missing, the code to write from a users perspective is quite a lot, error prone and can become unmanageable.

How would a generator/coroutine based DB driver can make my code look like:

def fetch_results_from_db():
    instance = MyDBDriver.get_correct_instance()
    
    if not instance.connected():
        instance.connect()
    
    while True:
        rows = yield instance
        if not rows:
            break
        # Do something with the rows
        ...

My user code is so much simpler now! The onus is now on the library writer to provide mechanisms to interact with this generator-function to use the yield suspension and resumption to send the rows obtained from the DB.

How to do that ? Wait for the next instalment of the series...

NOTE: My intention here is not to propagate the idea that "free functions are better than classes". No, its just an example.


Bonus Section: But still how its works behind the scenes ? 

We will look here at some disassembled output and some C code to gain a little more better understanding of what is going on. It's for those people who cant sleep without seeing a bit of assembly or C/C++ code.




Or (the easier way)

In [7]: dis.dis(gen_obj.gi_code)
  2           0 LOAD_CONST               1 (5)
              3 YIELD_VALUE
              4 STORE_FAST               0 (x)

  3           7 LOAD_FAST                0 (x)
             10 RETURN_VALUE


There are lots of resources on the internet where you can understand what the above output means, but I will just briefly mention about the significance of each column.
As can be seen, there are 5 columns:

  • Column 1 : The line number in the python source file which the bunch of assembly statements represent.
  • Column 2 : The offset in the byte code (gen_obj.gi_code.co_code) where the opcode is present.
  • Column 3 : The opcode name.
  • Column 4 : The index into the attributes of the code object. Its bit more involved, but out of scope for this discussion.
  • Column 5 : The constants and the variable name based upon the opcode. This is only for helping mere humans.
Now, with this much information at hand lets take a look at the disassemble output again. 
  • Load a constant value '5'.
  • Yield. Here is where the current frame is saved along with all its context which is in the gi_code object.
  • On issuing next or send, resume the saved frame and store the sent value ('None' in case of next and send without any argument) into the variable named 'x'.
  • Read the value in variable 'x' and return it.

More Fun with GDB

Now we know that the stack frame for a generator function is persisted on heap so that it can be loaded and resumed again.

NOTE: From whatever I could understand from the cPython code, all stack frames are allocated on heap. But of course nothing should be written on that assumption.

We will see now what happens when I am sending a value to a generator. Things we are expecting to happen are:
  • Load the frame associated with the generator.
  • Send the value ( ? )
  • Run the frame till next yield or throw StopIteration exception
  • In case of yield, go back to the previous frame of execution.
For this, I have a put a breakpoint on _PyGen_Send function which can be found in Objects/genobject.c source of CPython.  It internally calls gen_send_ex which is present in the same source file.

The source code we are debugging:

def my_gen():
    x = yield

g = my_gen()
next(g)
g.send(42)


Lets see what happens in gen_send_ex. We will only see the relevant portions of the code.


static PyObject *
gen_send_ex(PyGenObject *gen, PyObject *arg, int exc)
{
    PyThreadState *tstate = PyThreadState_GET();
    PyFrameObject *f = gen->gi_frame;
    PyObject *result;
     .
     .
     .
}

We are getting the current thread state and the frame associated with the generator object.


    if (f->f_lasti == -1) {
        if (arg && arg != Py_None) {
            char *msg = "can't send non-None value to a "
                        "just-started generator";
            if (PyCoro_CheckExact(gen))
                msg = "can't send non-None value to a "
                      "just-started coroutine";
            PyErr_SetString(PyExc_TypeError, msg);
            return NULL;
        }
    } else {
        /* Push arg onto the frame's value stack */
        result = arg ? arg : Py_None;
        Py_INCREF(result);
        *(f->f_stacktop++) = result;
    }

Here we are checking the value of the last executed instruction on the generator function frame (f->f_lasti). If it's set to -1 it means the generator has not moved to its first yield statement.

Lets check the state of our generator stack frame (which is variable 'f'):

(gdb) p *f
$18 = {
  ob_base = {
    ob_base = {
      _ob_next = 0x7fcf85618d48,
      _ob_prev = 0x7fcf855825a8,
      ob_refcnt = 1,
      ob_type = 0x88cc20 
    },
    ob_size = 20
  },
  f_back = 0x0,
  f_code = 0x7fcf86660100,
  f_builtins = 0x7fcf86784b48,
  f_globals = 0x7fcf8673ac98,
  f_locals = 0x0,
  f_valuestack = 0x2916478,
  f_stacktop = 0x2916478,
  f_trace = 0x0,
  f_exc_type = 0x0,
  f_exc_value = 0x0,
  f_exc_traceback = 0x0,
  f_gen = 0x7fcf855825a8,
  f_lasti = 3,
  f_lineno = 1,
  f_iblock = 0,
  f_executing = 0 '\000',
  f_blockstack = {{
      b_type = -875836469,
      b_handler = -875836469,
      b_level = -875836469
    } },
  f_localsplus = {0x0}
}

Check the highlighted portion. The blue field says that we are at the first line of the frame, which is true, we are at the yield statement which is first line of our generator function my_gen.

Since our generator has already reached the yield (because we already executed next on the generator before sending any value), lets focus on the else portion:

    ...
    } else {
        /* Push arg onto the frame's value stack */
        result = arg ? arg : Py_None;
        Py_INCREF(result);
        *(f->f_stacktop++) = result;
    }

Hmm...its just putting the passed value on the top of the stack! Now that should result in a STORE, right ? Lets check the disassembly:

In [3]: dis.dis(my_gen)
  2           0 LOAD_CONST               0 (None)
              3 YIELD_VALUE
              4 STORE_FAST               0 (x)
              7 LOAD_CONST               0 (None)
             10 RETURN_VALUE

Yup, thats looks about correct and as expected. The sent value is getting stored in variable x.

From here, python should execute the current generator frame and should do some bookkeeping to get back to the previous frame which called the generator next/send once it yields again or finishes.

    /* Generators always return to their most recent caller, not
     * necessarily their creator. */
    Py_XINCREF(tstate->frame);
    assert(f->f_back == NULL);
    f->f_back = tstate->frame;

    gen->gi_running = 1;
    result = PyEval_EvalFrameEx(f, exc);
    gen->gi_running = 0;

The current frame is saved in the generator's frame as highlighted  and PyEval_EvalFrameEx will continue with the execution of the frame.

I have jumped through lots of details here but this should be a good start for anyone interested in digging deeper.

Part-2 About Futures, Executors and Tasks


7 comments: