Sunday, 25 November 2018

Coro-Async : A lite C++ coroutine networking library | Introduction

I have been working on and off on a small networking library in C++ for past few weeks. The purpose of this library was mainly to get to know the C++ coroutine feature which is implemented in Clang.

The features currently supported by the library are:
  1. TCP socket stream support. Yup, no UDP for now.
  2. IPv4 only. Will have IPv6 soon.
  3. Performing asynchronous accept, connect, read, write on the socket by awaiting.
  4. Yielding a task for a time period.
  5. Awaiting on the task which itself is a coroutine.
Things missing:
  1. UDP support. Not planning to support anything else.
  2. Low level socket option setting. Will have to get the native handle now to set it.
  3. IPv6 addressing support.
  4. Few things currently might not be thread safe if you are running `io_service` from multiple threads.
  5. Allocation and continuation related optimizations.
  6. And many other things that a proper and complete networking library needs.
All the code shown here can be found at:

In this post I will not be covering the basics of the coroutines, but will be providing code samples showing how coroutines can ease writing concurrent programs (The trouble is for the library implementors :) ). A later post will delve more into the details of its implementation.

"Hello World" of network programming

NOTE: We will be ignoring error checks in the code for the sake of conciseness.

What we will be seeing here is a concurrent, single threaded (for the most part) echo server.

The main:

int main() {
    io_service ios{};
    coro_acceptor acceptor{ios};

    std::error_code ec{};
    acceptor.open("127.0.0.1", 8080, ec);
    
    server_run(acceptor);

    std::thread thr{[&] { ios.run(); }};
    thr.join();
    return 0;
}

There are some thread related code which can be abstracted away, but I let it be like how it is for ASIO.

What's going on here is:
  1. Create an instance of io_service which basically runs a scheduler and a reactor behind the scenes (Only when its `run` method is called).
  2. Create an instance of coro_acceptor which is basically a wrapper over the async version of accept but provides an API returning types satisfying the Awaitable interface.
  3. Opening the acceptor basically means binding the port with an IP address. We will be ignoring any failures here.
  4. Pass the acceptor to a coroutine function "server_run". We will see this later.
  5. Run the io_service on a separate thread so as to not block the current/main thread.

The server_run coroutine:

coro_task_auto<void> server_run(coro_acceptor& acc)
{
  while ( true )
  {
    auto result = co_await acc.accept();
    if (result.is_error())
    {
      std::cerr << "Accept failed: " << result.error().message() << '\n';
      co_return;
    }
    handle_client(std::move(result.result()));
  }
  co_return;
}

What's going on here is:
  1. The signature of the function indicates that it returns an object of type "coro_task_auto" which is the return object type of the coroutine promise. The template parameter type is void, indicating that the function does not generate any interesting value. You can see that we do not return anything with "co_return".
  2. We await on accept in an infinite while loop. But, instead of blocking on "accept" here, we would be yielding the control back so that some other task can continue.
  3. All awaited socket operations returns a "Result" type. There are many variants of this result type which the client does not need to know about, hence the auto. The result could either be an error or the actual result value. For, the "accept" case, the return value would be a valid client side socket.
  4. Once a valid socket is obtained, handle it of to the "handle_client" coroutine and repeat.

The handle_client coroutine:

coro_task_auto<void> handle_client(coro_socket client)
{
  char buf[6]; // for only "Hello!"
  auto bref = as_buffer(buf);
  co_await client.read(6, bref);
  co_await client.write(6, bref);
  client.close();
  co_return;
}

What's going on here is:
  1. The explanation for the function signature is as explained previously.
  2. The buffer is kept fixed to 6 bytes as we know we would be only handling "Hello!". I guess it's ok for the sample.
  3. You would have noticed that the buffer is allocated on stack, but for something to be used in asynchronous way, we usually allocate it on the heap, right ? Well, yes, here too you need something like that, but that is taken care when the coroutine allocates (or might elide) for its frame object. The compiler can determine how much of memory needs to be allocated to keep all the objects created inside of the coroutine alive between suspension points.
  4. We issue the special coroutine friendly awaitable read and write calls. These doesn't block. If a write operation or read could not be made, the coroutine would be suspended till the underlying socket is ready. So, during this time, some other operation, for example, "accept" can progress.

Thats basically it! The complete code is as less as:

#include <iostream>
#include <thread>
#include "coro_async.hpp"

using namespace coro_async;

coro_task_auto<void> handle_client(coro_socket client)
{
  char buf[6]; // for only "Hello!"
  auto bref = as_buffer(buf);
  co_await client.read(6, bref);
  co_await client.write(6, bref);
  client.close();
  co_return;
}

coro_task_auto<void> server_run(coro_acceptor& acc)
{
  while ( true )
  {
    auto result = co_await acc.accept();
    if (result.is_error())
    {
      std::cerr << "Accept failed: " << result.error().message() << '\n';
      co_return;
    }
    handle_client(std::move(result.result()));
  }
  co_return;
}

int main() {
  io_service ios{};
  coro_acceptor acceptor{ios};

  std::error_code ec{};
  acceptor.open("127.0.0.1", 8080, ec);
    
  server_run(acceptor);

  std::thread thr{[&] { ios.run(); }};
  thr.join();
  return 0;
}

Now, I do completely understand the uselessness of the above code, but it's still something, right ?


Sleeping Beauty

No idea why I chose this heading, but I have no better alternative yet.

If you want to sleep without blocking the thread, then you can do this:

using namespace coro_async;
using namespace std::chrono_literals;

coro_task_auto<void> waiter(io_service& ios)
{
  coro_scheduler s{ios};
  std::cout << "start sleep" << std::endl;
  co_await s.yield_for(2s);
  std::cout << "end sleep" << std::endl;
  co_return;
}

What's going on here is:

  1. We create an instance of coro_scheduler which basically exposes API's which returns an Awaitable. For "yield_for", it returns an awaitable which suspends the coroutine, schedules a timed callback to the io_service scheduler so as to resume the coroutine after the specified time.

Waiting for other tasks

There sure comes scenarios where you have to wait for a task to finish. The task itself could be a coroutine which could suspend again on different kinds of awaitables.

Lets see an example:

coro_task_auto<void> sleeper(io_service& ios)
{
  coro_scheduler s{ios};
  std::cout << "start sleep" << std::endl;
  co_await s.yield_for(2s);
  std::cout << "end sleep" << std::endl;
  co_return;
}

coro_task_auto<int> deep_thinking(io_service& ios)
{
  coro_scheduler s{ios};
  co_await s.yield_for(2s);
  co_return 42;
}

coro_task_auto<void> waiter(io_service& ios)
{
  coro_scheduler s{ios};
  std::cout << "Wait for completion" << std::endl;
  co_await s.wait_for([&ios] () -> auto { return sleeper(ios); });
  std::cout << "Wait completed" << std::endl;

  std::cout << "Do some deep thinking" << std::endl;
  auto result = co_await s.wait_for([&ios]() -> auto { return deep_thinking(ios); });
  std::cout << "Answer after deep thinking: " << result.result() << std::endl;
  co_return;
}

Nothing much to explain here except for the lambda weirdness. That is required because the Handler passed has a requirement that no arguments should be passed.


A damn simple Redis client

This is it:

coro_task_auto<void> run_command(io_service& ios, std::string cmd)
{
  coro_connector conn{ios};
  coro_scheduler s{ios};

  auto result = co_await conn.connect("127.0.0.1", 6379);
  if (result.is_error())
  {
    std::cout << "Connection to redis failed: " << result.error().message() << std::endl;
    co_return;
  }
  std::cout << "connected to redis" << std::endl;
  auto& sock = result.result();

  auto cmd_buf = as_buffer(cmd);
  co_await sock.write(cmd.length(), cmd_buf);

  std::cout << "command written to redis" << std::endl;
  co_return;
}

Have fun!

Monday, 16 April 2018

Designing a kind of Shared Pointer in C++

What is this about

  • Designing my own variant of policy based shared_ptr class for fun.
  • Understanding the standard library implementation while codifying my design.

What is this not about

  • A replacement for std::shared_ptr type.
  • Understanding the implementation of shared_ptr in the standard libraries.
  • Tutorial on implementing a shared_ptr and thereby teaching some of the advanced concepts.

Introduction

I am aware of the implementation details of libstd++'s shared_ptr implementation to some extent. It is quite daunting to be honest even though the basic idea behind reference counted storage seems to be quite easy. 
So, one free evening I just thought to write my own version of shared_ptr to see how it feels to write one. I had few design choices in mind before I started with its implementation:
  • Should have policy based reference counting mechanism. The standard implementation which we have today makes use of atomic counters even when we are working in single threaded program. So, it is not zero cost abstraction per-say.
  • Will be the make_shared equivalent i.e. both the type and the reference count variables would be stored together in one single allocation.
  • No deleter support! Only Allocator support is there. No particular reason for this. Just wanted to avoid additional complexity of it.
  • No aliasing constructor. Its not a low hanging fruit anyways.
  • No covariance support.
  • No adoption of any other pointer types.
  • No atomic ref count policy support :D (Got lazy, but I do plan to complete it)
  • And some more which I forgot now :) 
NOTE : I wont be surprised if someone finds bugs or UB in the implementation. As said before, it is written for the purpose of understanding (read running away from boredom).

I will be presenting 4 versions of the design, each version incrementally better than its previous with additional features.

All the code presented here can be found at Cpp-Shared-Pointer
The code has been compiled and tested with clang 5 on Macbook.

Version-1

The first version just focuses on getting the shared_ptr constructor, control_block interface correct. There is no support for allocators and weak_ptr.

Control Block

This is the where the object and its associated reference counters are stored. The shared_ptr class would perform just one allocation for this control_block. There would not be two different allocations for the object and the reference counters respectively.

/*!
 * The control block object where the
 * allocated object resides along with
 * its reference count.
 */
template <
  // The allocated object type.
  typename T,
  // The ref count policy to be used.
  // Single threaded Policy or Multi Threaded Policy
  typename RefCntPolicy>
struct control_block;

This structure can be specialised on the basis of RefCntPolicy. For that purpose, we will define two additional structs used for selecting the appropriate control_block at compile time.

// Reference Counting policies
struct single_threaded_t {};
struct multi_threaded_t {};


Version-1 (and all other versions) has only support for single threaded control_block specialization.

/*!
 * Control Block specialized for Single Thread.
 */
template <typename T>
struct control_block<T, single_threaded_t>
{
  using storage_type = typename std::aligned_storage<sizeof(T), alignof(T)>::type;

  storage_type value_buf;
  size_t ref_cnt = 0;

  /**
   * Calls destructor of the embedded object.
   */
  ~control_block()
  {
    (reinterpret_cast<T*>(&value_buf))->~T();
  }
};


There are bunch of free functions for adding/ decrementing / fetching the reference count which one can lookup in the complete source. In later versions these free functions have been moved to as member functions.

The shared_ptr class design:

template 
class shared_ptr
{
public:
  /**
   * Create a shared pointer from the Args.
   */
  template <typename... Args,
           typename Cond = std::enable_if_t<
                             std::is_constructible<T, Args...>::value ||
                             std::is_convertible<T, Args...>::value>>
  explicit shared_ptr(Args&&... args);
  .
  .
  .
public: // Pointer like semantics
  /**
   */
  T* operator->() noexcept
  {
    return detail::get_type_ptr(ctrl_blk_);
  }
  .
  .

  T& operator*() noexcept
  {
    return *(detail::get_type_ptr(ctrl_blk_));
  }
  .
  .
  .
private:
  /// The control block where the pointed type actually lives
  detail::control_block<T, RefCntPolicy>* ctrl_blk_ = nullptr;
};

Constructing the object inside control block is pretty easy using placement new:

template <typename T, typename RCP>
template <typename... Args, typename Cond>
shared_ptr<T, RCP>::shared_ptr(Args&&... args)
{
  ctrl_blk_ = new detail::control_block<T, RCP>{};

  // Construct the object and increment ref count
  detail::construct(ctrl_blk_, std::forward<Args>(args)...);
  detail::add_ref(ctrl_blk_);
}

/**
 * Construct the object in its aligned storage area.
 */
template <typename T, typename... Args>
void construct(
    control_block<T, single_threaded_t>* cb,
    Args&&... args)
{
  new (&cb->value_buf) T{std::forward<Args>(args)...};
}


This with other helper functions is actually what is basically needed for implementing a shared pointer! Oh, the destructor:

template <typename T, typename RCP>
shared_ptr<T, RCP>::~shared_ptr()
{
  // moved from? not in control any more
  if (!ctrl_blk_) return;

  auto ref_cnt = detail::decr_ref(ctrl_blk_);
  if (ref_cnt == 0) {
    delete ctrl_blk_;
  }
}

For the complete implementation : https://github.com/arun11299/Cpp-Shared-Pointer/blob/master/ver_1/shared_ptr.hpp


Version-2

This is where things get serious (and ugly...)
Additions in this version:
  1. Support for Allocator
  2. Support for Deleter (but not used actually and removed in later version)
  3. Type erasure for control_block since Allocator and Deletor are not part of the control block type.

Since Allocator (and Deletor, but I am going to ignore it for the rest of the post), is not part of the type, I need to make it part of my constructor template. And as per my design, if someone wants to use this shared_ptr, he should be able to use it like:

struct MyType {
  explicit MyType(int, int) {}
  .....
};

arnml::shared_ptr<MyType> mtyp_sp{ 10, 42 };


So , we need some way to:
  1. Pass Allocator.
  2. Pass variadic number of arguments for construction of for eg: MyType
This resulted in the real pain point of the design, writing constructors using SFINAE. It took me some time to get it correct. It is probably broken for most use cases in Version-2 and Version-3. More or less fixed in Version-4.

Since it is broken in this version, I don't want waste time here to explain about the constructors. Instead we will see the change in design of control_block.

Since we need to store Allocator (Sorry, no EBO technique used) and not make it part of the Type, we need to use some kind of type erasure technique. So, I create an abstract base class :D

/*!
 */
template <typename T>
struct control_block_base
{
  using storage_type = typename std::aligned_storage<sizeof(T), alignof(T)>::type;
  /// The storage buffer to create the type `T`
  storage_type value_buf;

  /// Create a control object derived impl
  template <typename Allocator, typename Deletor, typename RefCntPolicy>
  static control_block_base* create(Allocator&& allocator, Deletor&& deletor);

  /**
   * Calls destructor of the embedded object.
   */
  virtual ~control_block_base()
  {
    (reinterpret_cast<T*>(&value_buf))->~T();
  }
  
  .
  .
  .
  ///
  virtual void add_ref()   = 0;
  ///
  virtual size_t get_ref() const = 0;
  ///
  virtual size_t dec_ref() = 0;
  ///
  virtual void destruct() noexcept = 0;
};

And the derived specialised control_block:

/*!
 * Control Block specialized for Single Thread.
 */
template <typename T, typename Allocator, typename Deletor>
struct control_block<T, Allocator, Deletor, single_threaded_t>
  : control_block_base<T>
{
  ///
  using allocator_type = typename std::allocator_traits<
    std::decay_t<Allocator>>::template rebind_alloc<control_block>;
.
.
.
.
};

Not mush to explain here. You ask, how we use this in the shared_ptr class ? Ofcourse, as pointer to the base class. You ask, then how do we create the correct specialized derived class instance of the control_block? See:
/**
 * Construct the control block.
 */
template <typename T>
template <typename Allocator, typename Deletor, typename RCP>
control_block_base<T>*
control_block_base<T>::create(
    Allocator&& allocator,
    Deletor&& deletor)
{
  using DerSelf = typename detail::control_block<
                                      T,
                                      std::decay_t<Allocator>,
                                      std::decay_t<Deletor>,
                                      RCP
                                      >;

  typename std::allocator_traits<
    std::decay_t>Allocator>>::template rebind_alloc<DerSelf> rb_alloc;

  auto address = static_cast<DerSelf*>(rb_alloc.allocate(1));
  auto ctrl_blk = new (address) DerSelf{std::forward<Allocator>(allocator),
                                        std::forward<Deletor>(deletor)};

  return static_cast<control_block_base<T>*>(ctrl_blk);
}


Version-3

This was mostly about fixing the shared_ptr constructors.  One thing which I knew but experience this time is to how frustrating it could be to write constructor taking forwarding references! Making use of SFINAE to control overloading makes it worse!!

We will see all the constructors and the need for them in Version-4.



Version-4

This version has following additions:
  • Fixed all the constructor issues (as far as I could make it run for my small set of tests).
  • Added support for weak_ptr

Constructors not constructive enough

  • The constructor for incomplete types. 

/**
 * The default constructor.
 */
explicit shared_ptr(std::nullptr_t)
{
  //...
}

This is needed to construct shared_ptr to incomplete types. The nullptr_t  is there to differentiate between a default construction of a complete type and default construction of an incomplete type. (I am sure there might be other solution for this.)
For eg:
struct A {
    A() : ptr(nullptr) {}     // (1)
    arnml::shared_ptr<A> ptr;
};

arnml::shared_ptr<A> sp_a{};  // (2)


The difference is that, for (1) there is no control_block created, but for (2), the control block is allocated and the object of type A is created inside the control block value buffer.


  • The default constructor.

/**
 * For default constructible types
 */
explicit shared_ptr()
    : shared_ptr(std::allocator<T>{})
{
  //...
}

This we have already seen. Note that it uses the default std::allocator and uses forwarding constructor which we will see later.

  • Forwarding constructor. Accepts only arguments used to create the wrapped object of type T.

  /**
   * Forwarding constructor.
   * Create a shared pointer from the Args.
   */
  template <typename First, typename... Args,
            typename Allocator=std::allocator<T>,
            typename Cond = std::enable_if_t<
                              !std::is_same<std::remove_reference_t<T>, shared_ptr>::value &&
                              arnml::detail::is_allocator<Allocator>::value &&
                              !arnml::detail::is_allocator<First>::value
                            >
           >
  explicit shared_ptr(First&& f, Args&&... args)
    : shared_ptr(Allocator{}, std::forward<First>(f), std::forward<Args>(args)...)
  {
    //....
  }

And all my problems started. Just look at the SFINAE condition. This constructor will only accept arguments (at least one) that are used to create an instance of type T. Otherwise it will fail in compilation at a static_assert which I have added in the control_block creation function. This is still _not_ correct! The allocator here is probably redundant. You can only pass default allocator with it. Then, how to pass my own allocator ? Wait for it.

  • Constructor (sink) for default constructible type. (Check that #2 forwards to this)
  /**
   * Constructor for default constructible types.
   */
  template <typename Allocator=std::allocator<T>,
            typename = std::enable_if_t<detail::is_allocator<Allocator>::value>
           >
  explicit shared_ptr(Allocator&& alloc=Allocator{});



  • Constructor (sink) for Allocator and object construction args.
  /**
   */
  template <typename... Args, typename Allocator=std::allocator<T>,
           typename Cond = std::enable_if_t<
                              !std::is_same<std::remove_reference_t<T>, shared_ptr>::value  &&
                              arnml::detail::is_allocator<Allocator>::value>>
  explicit shared_ptr(Allocator&& alloc, Args&&... args);


And then there are copy constructor, move constructor, assignment operator and move assignment operator.

The thing to learn here is that this is terrible. I will probably never try to do this in some large codebase. Why is it bad (do I really need to say) ? I will point out few:

  1. SFINAE based overloading is bad for maintenance unless it is really very small compile time checks.
  2. Bug prone. There are quite a lot of bugs or unhandled conditions for sure in the above code. It is difficult and time consuming to identify them.
  3. Fixing one overload can break other.
  4. Compilation time is important. The more nasty template stuff you do, the more you pay.
It doesn't mean you shouldn't do this. There might be legit cases where you want to do this. But make sure you think it through many times.

weak_ptr

Nothing fancy about the weak_ptr. It holds a reference to the control_block created by the shared_ptr instance.

The control_block is changed to include the new weak reference count and its associated APIs.

template <typename T, typename Allocator>
struct control_block<T, Allocator, single_threaded_t>
  : control_block_base<T>
{
  ///
  using allocator_type = typename std::allocator_traits<
    std::decay_t<Allocator>>::template rebind_alloc<control_block>;

  /// Reference counter for single threaded applications
  size_t ref_cnt_ = 0;
  /// Weak ref count
  size_t weak_ref_cnt_ = 0;
.
.
.
.


Few things to note about the behaviour of weak_ptr:
  1. If the shared reference count or the strong reference drops down to 0, but the weak reference count is not 0, then only the destructor of type T is called. The control_block is not deallocated because it has the weak reference count.
  2. In the above case, the control block is freed only when the weak reference count reaches 0.
  3. To access the embedded object of type T within control_block, the lock method must be called on the weak_ptr. It creates a new instance of shared_ptr which is valid if and only if there is a strong reference count.

Example:

  struct A
  {
    arnml::weak_ptr<A> ptr;
  };

  arnml::shared_ptr<A> a{};
  a->ptr = a;

Another Example (using the example from cppreference):

  auto observe = [](arnml::weak_ptr<int> weak) {
    if (auto observ = weak.lock()) {
      std::cout << "observe locked to get shared ptr\n";
    } else {
      std::cout << "observe failed to get a shared ptr\n";
    }
  };

  arnml::weak_ptr<int> wp;
  observe(wp);

  {
    arnml::shared_ptr<int> sp_int{42};
    wp = sp_int;
    std::cout << "weak ptr initialized with shared_ptr\n";
    observe(wp);
  }



For the complete implementation : https://github.com/arun11299/Cpp-Shared-Pointer/tree/master/ver_4

Saturday, 31 March 2018

Rust: First impressions from a C++ developer

So, even I started learning Rust 2 weeks back (yay!!) whenever I got free time. And all the time that I spent  learning it has been worthwhile.
This is not going to be a deep technical post, but just my impressions about Rust from where I come from (C++).

My Impressions

Getting Started
  • Rust is a sufficiently complex language in that, it is not as easy as Go or Python to just pickup and start. Having said that, one can still just code something useful in a day, but a language which is claiming to be a real systems language needs to be understood a lot better.
  • So, for me it would be the regular steps of learning the language and start coding basic data structures and algorithms from scratch and then start contributing to some project or create my own.
  • There are some pain points which I will come down to later.
Borrowing & Ownership
  • Even though I have been programming in C++ since before C++11 came into picture and understand and make use of its sophisticated / advanced features, I am still not comfortable enough to say that I am an expert in C++ with a straight face. So, coming from this background I did not have any difficulty in understanding the Borrowing and Ownership concepts. Those are the things we strive for in our modern C++ code. Who doesn't love RAII ?
  • But striving to achieve is lot different than being enforced by the compiler! 
  • It is really amazing if access to invalid memory address is caught at compile time. I was really happy. With C++, there are quite a lot of simple cases which still today cannot be detected by the "-f sanitizeXXX" options.
  • This is such a huge win and advantage! I must admit that early in my career, I did return a reference to a local variable and that got passed through code review as well! (The changes were quite big)

Complexity
  • This again depends from where you are coming from. If one is coming from Node/JS/Python or even plain C background,  they might get overwhelmed with the concepts.
  • And one of the main reason is because the compiler wont budge unless you do the right thing!! :)
  • For comparison, it's easier for an average C programmer to write in C++ because they continue to write in C!! They do get overwhelmed if made to code in true C++ style because thats when they understand the concepts involved in getting you Zero Cost Abstractions.
  • So, for me it is not at all as complex like C++. But still, I have a lot much left to learn.

Concurrency / Threading / Network library
  • It would have been awesome if coroutines was there. But we will be getting it soon I suppose as LLVM has it.
  • But then, it has Go like Channels! That itself is a very simple abstraction to get your things done quickly.
  • Working with the Socket level abstractions were also pretty easy. I am yet to dig into more details of it and then to MIO and Tokio libraries.
  • Not much to report here as I do not have much experience with the provided abstractions.

Lifetimes
  • This is still something which I don't think I understand to its fullest especially the Variance and Subtyping. In general, Covariant, Contravariant and Invariant types are kind of easy to understand, but at a deeper level on how to use the relevant markers is till something I do not understand.
  • I ran into this especially while trying to implement Vector data structure as put out in the nomicon book. We will see more about it bit later.
  • Otherwise lifetimes as a concept for linear type is pretty cool and might give some headache if one is going to use references a lot in uncommon ways which prohibits lifetime ellision.

Allocation
Most of what I will be putting here are questions to the fellow Rust developers. I did not find answers to those when I needed it. So don't consider it as something "you can't do in Rust". It is something which "I didn't get how to use in Rust".
  • As a systems programmer, I do want to have access to the base allocator and write my own Allocators.
  • While writing my Vector library, the nomicon book says to use "alloc" crate, but that can't be used in stable release. Then I rustup'ed to nightly release and still could not use it! The version was 1.26.
  • I want to use Allocators in stable release. How ?
  • I know there are example on how to (de)allocate using Vec. I am somehow not comfortable with that. Is it really Zero Cost ?
  • What I have done:

  •     ///
        fn allocate(count : usize) -> *mut T
        {
            let mut v: Vec<T> = Vec::with_capacity(count);
            let ptr = v.as_mut_ptr();
            mem::forget(v);
            return ptr;
        }
        ///
        fn deallocate(ptr: *mut T, len: usize, cap: usize)
        {
            unsafe {
                mem::drop(Vec::from_raw_parts(ptr, len, cap));
            }
        }
    

  • This is really not something I would like to continue doing.

Exceptions

  • To be honest, I am glad its not there. I clearly know about when to use exceptions and when to use return codes and blah blah.... But still. Writing exception safe code in C++ is still a challenge especially when you are designing data structures.
  • Result<T, E> is something I prefer compared to exceptions.
  • Its been hard for me while writing libraries to support both exceptions and error_code for the users who might prefer one over the other. I will probably start using Expected or Outcome whenever its available.
  • Do I need to even mention about the pattern matching support ? Its awesome!! Took me back to my days when I used to be Erlang developer.


Overall
  • It's a YES!!
  • There are still a lot of things I really liked but have not talked about, for eg: Traits, Enums on steroids, Pattern Matching, Functional Programming Style and generic programming. All that maybe for another day.
  • But sadly, its still going to be a language for my side projects.
  • C++ would still be my go to language for foreseeable future. With all the new features and language additions coming (creeping) in, there is barely enough time to switch.
  • Rust obviously has huge potential. But still not quite there to replace C/C++ completely. But I really hope C dies and see people (mostly average software developers) write more structured code.

Sunday, 4 March 2018

Designing Async Task Dispatch Library From Scratch (Part-3)

This is the final part of the series, where we will just creating task queueing system which will be more or less like asyncio BaseEventLoop. To reduce the complexity of the program, we would not be dealing with timed tasks or socket events.
(I had initially planned to do sockets as well, but I have another project in mind to do the same with C++ coroutines. Wait for that!)

Previous parts of the series:



A Task Queue

A tasking system in its simplest form is a queue of callbacks which needs to be called. On top of that, it would provide some APIs to the user to add some tasks or to wait till the task finishes.

What we will be achieving to do is something like below:

def task1():
    print ("Done with task1")

def my_gen():
    print ("Starting coro/gen my_gen")
    yield None
    print ("Finished coro/gen my_gen")

if __name__ == "__main__":
    print ("Starting test")
    print ("-------------")

    #Instantiate a run loop instance
    loop = TaskLoop()

    #Submit a task to the run loop
    loop.call_soon(task1)
    loop.run_once()

    #start a coroutine and finish it
    loop.run_until_complete(my_gen())

    #Stop the run loop
    loop.stop()


The example basically shows how the task loop works with a regular function and a coroutine:

  • call_soon API only takes regular functions as its argument. Anything else, it will throw an exception.
  • run_until_complete API takes only future objects and coroutines and will wait until the task is run to its completion.
  • run_once will execute the first task in the task queue if present.
  • stop will stop the task loop from executing any tasks from the task queue.

Introducing Handles

Well, I lied about queue of tasks in the previous section. Its is actually a queue of Handle objects. 
A Handle is nothing but a stored function object along with its arguments so that it can be invoked at a later time.

class Handle(object):
    """
    A wrapper for holding a callback and the
    arguments it takes.
    """
    def __init__(self, fn, loop, args):
        """
        Constructor.
        Arguments:
            fn   : The callback function. Has to be a regular function.
            loop : The loop which would call the handler.
            args : The argument pack the callback expects.
        """
        self._cb = fn
        self._args = args
        self._loop = loop
        #Set to true if the handler is cancelled
        #Cancelled handlers are not executed by the loop
        self._cancelled = False
        #Set to true if the handler has finished execution
        self._finished = False

    def __call__(self):
        """
        The function call operator.
        """
        assert self._finished == False
        try:
            self._cb(*self._args)
        except Exception as e:
            self._finished = True
            raise e

    def cancel(self):
        """
        Cancel the handle
        """
        assert self._cancelled == False
        if not self._finished:
            self._cancelled = True
        return

    def is_cancelled(self):
        """
        """
        return self._cancelled

As can be safely deducted, a handle stores only regular functions. Futures and coroutines will not be stored in it.

So you ask, how does the task loop then work with futures/coroutines ? Lets look at the implementation of run_until_complete.

run_until_complete decoded

Lets dive into its implementation directly:

    def run_until_complete(self, coro_or_future):
        """
        Run the event loop until the future
        is ready with result or exception.
        """

        if self._loop_running:
            raise RuntimeError("loop already running")

        #Transform into a future or task(inherits from Future)
        task_or_fut = transform_future(coro_or_future, self)
        assert task_or_fut is not None

        is_new_task = not isinstance(task_or_fut, Future)
        #Add a callback to stop loop once the future is ready
        task_or_fut.add_done_callback(self._stop_loop_fut)

        try:
            self.run_forever()
        except Exception as e:
            raise e


This looks a bit different from what we achieved in our previous part of this series. The difference is because of transform_future. This is basically what asyncio.ensure_future is. It takes in either a coroutine generator object or a Future. If its a future it returns it (behaves like an identity function), else, if it is a coroutine, it wraps it inside a Task object and returns that task.

Once the future is ready, _stop_loop_fut will get called which would throw StopLoopException to notify the main loop.

def transform_future(supposed_future_coro, loop):
    """
    Loop works best with futures or with Tasks
    which wraps a generator object.
    So try to introspect the type of
    `supposed_future_coro` argument and do the best
    possible conversion of it.
    1. If its is already a type of Future, then return as it is.
    2. If it is a generator, then wrap it in Task object and
       return the task instance.

    loop object is for creating the task so that it
    can add a callback.
    """
    if isinstance(supposed_future_coro, Future):
        """
        It is a future object. Nothing to do.
        """
        if loop != supposed_future_coro._loop:
            raise RuntimeError("loop provided is not equal to the loop of future")
        return supposed_future_coro
    elif inspect.isgenerator(supposed_future_coro):
        #Create a task object
        t = loop.add_task(supposed_future_coro)
        return t
    else:
        raise TypeError("Cannot create an asynchronous execution unit from the provided arg")

    assert (False and "Code Not Reached")
    return None

In the above source, the task creation logic is handled inside the loops add_task method, bit all it does is really create a Task object.

The constructor of the task object adds the _step method to the loop which gets called later.
class Task(Future):
    """
    Wraps a generator yielding a Future
    object and abstracts away the handling of future API

    This version adds loop to the task
    """
    def __init__(self, loop, gen):
        """
        """
        super().__init__(loop)
        assert (loop is not None)
        self._loop = loop
        self.gen = gen
        self._loop.call_soon(self.step)

And from here on everything goes as per what we have already seen in the previous posts. Only change is that the calling of the Task method is now automated to be don by the task loop.

The complete implementation of the task loop is:
class TaskLoop(object):
    """
    """
    def __init__(self):
        #A deque of handles
        #CPython implementation of deque is thread-safe
        self._ready = deque()
        self._loop_running = False
        pass

    def total_tasks(self):
        """
        """
        return len(self._ready)

    def run_once(self):
        """
        Execute a task from the ready queue
        """
        if len(self._ready) == 0:
            #There are no tasks to be executed
            return

        hndl = self._ready.pop()
        #Call the handler which invokes the
        #wrapped function
        hndl()

    def run_forever(self):
        """
        This is the main loop which runs forever.

        TODO: Currently just runs infinitely. Should run
        only till some work is there.
        """
        self._loop_running = True

        while True:
            try:
                self.run_once()
            except StopLoopException:
                self._loop_running = False
                break
            except Exception as e:
                print ("Error in run_forever: {}".format(str(e)))

        return

    def run_until_complete(self, coro_or_future):
        """
        Run the event loop until the future
        is ready with result or exception.
        """

        if self._loop_running:
            raise RuntimeError("loop already running")

        #Transform into a future or task(inherits from Future)
        task_or_fut = transform_future(coro_or_future, self)
        assert task_or_fut is not None

        is_new_task = not isinstance(task_or_fut, Future)
        #Add a callback to stop loop once the future is ready
        task_or_fut.add_done_callback(self._stop_loop_fut)

        try:
            self.run_forever()
        except Exception as e:
            raise e

    def add_task(self, gen_obj):
        """
        Creates a task object and returns it
        """
        t = Task(self, gen_obj)
        return t

    def call_soon(self, cb, *args):
        """
        Add the callback to the loops ready queue
        for it to be executed.
        """
        if inspect.isgeneratorfunction(cb):
            raise NotRegularFunction()

        hndl = Handle(cb, self, args)
        self._ready.append(hndl)
        return hndl

    def stop(self):
        """
        Stop the loop as soon as possible
        """
        self.call_soon(self._stop_loop)

    def _stop_loop(self):
        """
        Raise the stop loop exception
        """
        raise StopLoopException("stop the loop")

    def _stop_loop_fut(self, fut):
        """
        """
        assert fut is not None
        raise StopLoopException("stop the loop")


The complete source code for the series can be found at
https://github.com/arun11299/PyCoroutines

Hope you enjoyed it!

Saturday, 3 February 2018

Designing Async Task Dispatch Library From Scratch (Part-2)

Working with Futures, Executors and Tasks

What is a Future

In layman terms, a future is an object which can hold the value or result of some computation done asynchronously. What does that mean ?
Consider the below example

def some_task():
    time.sleep(5)
    return 42

ret1 = some_task()              #ret1 will be 42
ret2 = async_compute(some_task) #what should be ret2 ?

ret1 would be assigned with some value only after 5 seconds and during that time the thread executing the code would be blocked and cannot do anything else. This is called synchronous way of doing things. Other examples of blocking calls from your network programming 101 are accept, connect, read, write etc.

But what about ret2 ? If we are to do asynchronous programming we cannot block the executing thread to block for 5 seconds, we can do other tasks meanwhile. To achieve that, async_compute might very well dispatch off the task to another thread. If we do that, there is no way we can return the return value of some_task to ret2 (Hold off your idea about using coroutines. We will get there).

So what async_compute can return is a placeholder where the computed result would be put at some future time. This placeholder is what commonly known as future!

There are different ways to get the computed result out from a future object:

  • Via a blocking call
  • Via callbacks
Blocking call example:

ret = async_compute(some_task)
assert isinstance(ret, Future)

print (ret.result()) # Will print 42

This example is still equivalent to the blocking example as the call to result would block till the result from the asynchronous computation is not available. In our example it would block for 5 seconds.

Callback example:

def print_result(fut):
    assert isinstance(fut, Future)
    print (fut.result()) # This will not block

ret = async_compute(some_task)
assert isinstance(ret, Future)

ret.add_done_callback(print_result)

print ("Continuing with other tasks")

What we are doing here is providing a callback to the future which would get executed when the result of the asynchronous computation is made available to the future.

Future - Building Blocks

In this section we will see what are the requirements that a Future type should have and try to create one.

States of a future object
We will be discussing only the bare minimum states required for a future. There may be other states involved based upon the implementation and feature provision.

  • Pending - The future is constructed with this state. It basically indicates that the asynchronous computation has either not started or finished yet.
  • Cancelled - The future is cancelled and all the completion callbacks are called. The future that we would be implementing does not have the ability to cancel the running asynchronous process though.
  • Finished - The future has the result of the asynchronous operation ready. A future is ready when the result of the async operation is available or if the async operation finished with an exception.
Client APIs of a future type
  1. add_done_callback - For calling a function when the future state is 'Finished' or 'Cancelled'. The function must take only one argument which is the future object. The client code can access the result or the exception from the passed future argument. The purpose of adding callbacks or done callbacks to a future is to do some action in response to the completion of the async operation or many say it chaining of operations.
          For eg:
def some_long_task():
    time.sleep(10)
    return 43

def another_long_task():
    time.sleep(5)
    return 68

def after_another_long_task(fut):
    res = fut1.result()
    print ("All computations finished. Result is {}".format(res))


def after_long_task(fut):
    res = fut.result() # This would not block, result is already available
    # Trigger another asynchronous computation
    fut1 = async_compute(another_long_task)
    fut1.add_done_callback(after_another_long_task)

fut = async_compute(some_long_task)
fut.add_done_callback(after_long_task)
       
      2. set_result - This API must usually be only called by the asynchronous operation which created the future. It basically stores the computed result into the future, sets the state to 'Finished' and calls all the added callbacks.

      3. set_exception - If the asynchronous executor encounters an exception while performing our operation, it will set the exception inside the future by calling this function. The state of the future would be still set to 'Finished' and will call all the added done callbacks.

      4. cancel - Cancels the future by setting the state to 'Cancelled' and calls all the added done callbacks.

      5. result -  Blocks (or upto provided timeout) until the result is available.

      6. exception - Blocks (or upto provided timeout) until the exception is set in the future.

These APIs needs to be thread safe if they are to be called from different threads. We will be using Mutex + Condition variable to achieve it.

An Implementation

# coding: utf-8

# In[11]:


import threading
import time


# Future states:
#
# An instance of the future could be in any of these states.
# 1. PENDING : The future does not have the result/exception for the corresponding work.
# 2. RUNNING : TBD
# 3. CANCELLED : The future got cancelled before the result of the associated work was computed.
# 4. FINISHED : The associated work got finished and resulted in giving out a value or exception.

# In[12]:



PENDING   = 'pending'
RUNNING   = 'running'
CANCELLED = 'cancelled'
FINISHED  = 'finished'

FUTURE_STATES = [
    PENDING,
    RUNNING,
    CANCELLED,
    FINISHED
]


# In[13]:


class FutureCancelledError(Exception):
    """"""
    def __init__(self):
        pass


class FutureTimeoutError(Exception):
    """"""
    def __init__(self):
        pass



# In[14]:


class Future(object):
    def __init__(self):
        """"""
        self._state = PENDING
        self._condition = threading.Condition()
        self._done_callbacks = []

        self._result = None
        self._exception = None

    def add_done_callback(self, cb):
        """
        Add the callback to be executed when the future state becomes
        cancelled/finished
        """
        with self._condition:
            if self._state not in [CANCELLED, FINISHED]:
                self._done_callbacks.append(cb)
                return
        #Call immediately if result/exception already set
        cb(self)


    def result(self, timeout=None):
        """
        Blocking call on the calling thread.

        timeout: time to wait for the result to be ready.

        Throws:
        FutureCancelledError if the state of future was CANCELLED
        or became CANCELLED later.

        FutureTimeoutError if future did not become ready before
        the timeout.
        """
        with self._condition:
            if self._state in [CANCELLED]:
                raise FutureCancelledError()

            if self._state == FINISHED:
                # Already done, return the result
                return self._result

            self._condition.wait(timeout)

            if self._state in [CANCELLED]:
                raise FutureCancelledError()

            if self._state == FINISHED:
                return self._result
            else:
                return FutureTimeoutError()
        pass

    def exception(self, timeout=None):
        """
        Blocking call on the calling thread.
        """
        with self._condition:
            if self._state in [CANCELLED]:
                raise FutureCancelledError()

            if self._state == FINISHED:
                #Already done. Return the exception
                return self._exception

            self._condition.wait(timeout)

            if self._state in [CANCELLED]:
                raise FutureCancelledError()

            if self._state == FINISHED:
                return self._exception
            else:
                raise FutureTimeoutError()


    def done(self):
        """Future is finished"""
        with self._condition:
            return self._state in [CANCELLED, FINISHED]

    def cancelled(self):
        """ Is the future cancelled or not"""
        with self._condition:
            return self._state == CANCELLED

    def cancel(self):
        """Cancel the future if not already finished or running"""
        with self._condition:
            if self._state in [RUNNING, FINISHED]:
                return False
            self._set_state(CANCELLED)

            self._condition.notify_all()

        self._execute_done_callbacks()
        return True

    def set_result(self, result):
        """
        Sets the result of the work associated with this future.
        """
        with self._condition:
            self._result = result
            self._state = FINISHED

            self._condition.notify_all()

        self._execute_done_callbacks()


    def set_exception(self, exp):
        """
        Sets the exception that occurred while performing
        the work associated with this future.
        """
        with self._condition:
            self._exception = exp
            self._state = FINISHED

            self._condition.notify_all()

        self._execute_done_callbacks()

    def _set_state(self, state):
        """
        Sets the state.
        Assumes that lock is taken
        """
        self._state = state

    def _execute_done_callbacks(self):
        for cb in self._done_callbacks:
            try:
                cb(self)
            except Exception as e:
                print ("ERROR: {}".format(str(e)))



Executors

Now that we understand futures and have an implementation of it ready with us, let us use it. For that, we need to have some way to submit our tasks to "some" thing which would return us with a future object instance corresponding the to the submitted task.
This "some" thing is what we call as Executors. They are responsible for execution of the submitted tasks by the client code. There are different types of executors that one can provide:

  • Thread Executor
  • Thread Pool Executor
  • Multi Process Executor
  • Queuing Executor
NOTE: There may be more varieties of executors that I am not aware of.

For demonstration purposes we will use an extremely simple executor interface.

class Executor(object):
    """
    An interface for Executor.
    A concrete implementation of this class
    is expected to override all methods of this class.
    """
    def __init__(self):
        pass

    def submit(self, task, *args, **kwargs):
        """
        Takes a task to be executed with the arguments
        it requires.
        Returns a Future object instance.
        """
        raise NotImplementedError()


With this interface in hand let us create a simple Thread executor, which will simply execute the submitted task on a new thread.

class ThreadedExecutor(Executor):
    def __init__(self):
        super(self).__init__()
        self._thread = threading.Thread(target=self._runner)

    def submit(self, task, *args, **kwargs):
        """
        Create a new thread and run the _runner function.
        Execute the task in the _runner and return the 
        future.
        """
        f = Future()
        t = threading.Thread(target=self._runner, args=(f, task, args, kwargs,))
        t.start()
        return f

    def _runner(self, fut, task, args, kwargs):
        v = task(*args, **kwargs)
        fut.set_result(v)
        pass


This is pretty easy to understand now. Inside the submit function we do the following:

  • Create an instance of the Future.
  • Create a new thread with target function to run as _runner. Pass the future instance, task to the _runner.
  • Return the future. The client code now has the future.
  • On a separate thread _runner executes the task and once complete will store the result in the passed future instance. This results in execution of all the done callbacks methods registered with that future as explained in the previous section.

NOTE: Exception handling is not shown in the above example.
Now, the client code:
def my_long_running_task():
    time.sleep(100)
    return 42

def print_task(future):
    print (future.result()) #Result will not block here.

def schedule_task():
    future = ThreadedExecutor().submit(my_long_running_task)
    future.add_done_callback(print_result)

if __name__ == "__main__":
    schedule_task()


I hope it the use of futures make more sense in the context of Executors.


The 'Task' at hand

This is where we get back to coroutines and see its brilliance in action.

NOTE: The things I am presenting here is basically influenced by David Bleazy's talk on coroutines. You should definitely check that out.

The previous example using future with executor was good in that we could do asynchronous computation in fairly straightforward manner, but still it is not easy. To get the result at the call site i.e. where the executor was run, we either need to poll continuously to check if the future is ready with result or get blocked by calling result on the future object. Since we hate blocking or polling, we attached a done callback to the future.

We need something like this:

def perform_async_task():
    result = <magic> ThreadedExecutor().submit(my_long_running_task)
    print (result)
 

Isn't that sweet ? It is almost like a synchronous code. Assuming it works like this, if you think deeper you can see that the function when called is getting executed in the context of 2 threads. The thread which calls the perform_async_task function and the thread which executes the my_long_running_task function.

def perform_async_task():
    result = <magic> ThreadedExecutor().submit(my_long_running_task) <-- Executor thread
    print (result)  <---------- Client calling thread

Mind blown ? Lets see how to get this actually working. What should we replace the "<magic>" tag with ? Since we are going to talk about coroutines, lets replace it with a yield for now :)

In the part-1 of the series we have already seen how to control a generator function from outside using the generator handle using the next and send functions. If you have not or forgot I highly recommend going through that again.

With that knowledge we know that RHS expression of a yield statement is what is executed on next (or send based upon the state of generatorand the value sent using send is what gets assigned to the LHS of the assignment statement.
The thing to note here is that some statement like

result = yield ThreadedExecutor().submit(my_long_running_task)

requires 2 separate operations to finish (next and then send). Until then the coroutine is in suspended state. This is what makes it possible to write something like above. Lets see how it would work:

  • Consider 'G' is the generator object that we get on calling perform_async_task since it has a yield statement now (instead of magic).
  • future = next(G) would give me the future returned by the submit function of the Thread executor. Why ? Because thats the RHS expression of the yield.
  • future.add_done_callback(send_result) To the obtained future we will add a done callback which will get called when the executor has done its job.
  • And send_result is:
def send_result(future):
    send(G, future.result())


  • So when the send_result gets called, it will send the result wrapped inside future to the generator which is what get assigned to the LHS variable. Cool!!! Got it ?

Now we know what needs to be done manually to get the desired behaviour. Lets put it nicely and automate it using a class named Task.

Lets first jump into an implementation of the Task itself:

class Task(object):
    """
    Wraps a generator yielding a Future
    object and abstracts away the handling of future API
    """
    def __init__(self, gen):
        self.gen = gen

    def step(self, snd_val=None, exp=None):
        """"""
        try:
            if exp:
                self.gen.throw(exp)
            else:
                fut = self.gen.send(snd_val)
                fut.add_done_callback(self._fut_done_cb)

        except StopIteration as e:
            return e.value

    def _fut_done_cb(self, fut):
        try:
            result = fut.result()
            self.step(result, None)
        except Exception as e:
            self.step(None, e)
        pass


This is what in essence an asyncio Task is. It surely does more stuff, but this is the bare minimum stuff that gets the work done.

Lets explain its working through an example.

def perform_async_task():
    result = yield ThreadedExecutor().submit(my_long_running_task)
    return reult

t = Task(perform_async_task())
t.step()

## OR

Task(perform_async_task()).step()    


As one can see, we have got our asynchronous code look like a synchronous one without having any hairy callbacks and futures involved in the client code. All of those details are handled by the task.

What really happening is:

  • The constructor of the task takes the generator object and stores it. That make the Task a little more than a wrapper over a generator.
  • Now we call the step function of the task. The step function calls generator send function with None value for the first time which is basically next call on the generator.
  • This results in the execution of the RHS expression of the yield which is the executor submit function.
  • The submit function returns the future object associated with the asynchronous operation. This is what fut variable holds inside step function.
  • To the obtained future, we add a done callback which is basically the second member function of the Task class, _fut_done_cb. What this means is that, when the asynchronous operation finishes, _fut_done_cb will get called.
  • Note that until _fut_done_cb is called the coroutine perform_async_task is suspended. It is not blocked.
  • So when the asynchronous operation finishes, _fut_done_cb gets executed, which gets the result from the future and calls the step function again with the obtained result.
  • So, the step function executes again calls the send function on the generator, but this time with the result of the async operation instead of None.
  • The above send actually sets the result of yield to the LHS inside perform_async_task.
  • With that due to the absence of any further yields inside perform_async_task, send will throw a StopIteration exception.
  • The return value of perform_async_task will be returned by the step function.

Try it for yourself!




Saturday, 6 January 2018

Designing Async Task Dispatch Library From Scratch (Part-1)

What the series is about

  • Understanding how generators work
  • Understand how future object works
  • How to create a task for asynchronous execution
  • Creating a simple but almost complete tasking and networking library

What the series is _not_ about

  • Understanding basic usage of generators. Reader is expected to have used it before.
  • Creating a library rivalling asyncio or anything else. I am only doing this to understand the whole thing better from a system programmers perspective.

The 'yield' Keyword

Lets take the below simple example of a generator:

def my_gen():
    ret = yield 5
    return ret

Few questions that comes to my mind are:

  • What happens when the 'yield' expression gets executed ?
  • What do we get as the return value of this generator-function ?
  • What more is there to this generator-function other than just yielding 5 ?
To answer above questions and to get enlightened we need to know how a generator-function is different from a normal function.

Difference between a generator-function and a regular function

NOTE: Generators are of iterable types. Iterables as a concept is something we would not be looking at in this post.

Few of the very important differences are listed below. There are more difference than what are listed below but out of scope for the current topic.
  1. Suspension and resume capability
          Unlike regular function, a generator-function can be suspended in between its execution and can be resumed from the same point. That means all the states are saved before getting suspended so that they can be reused when the generator-function is resumed. Its the presence of the yield expression inside a function that provides this capability. The yield is basically the suspension and resumption point.

      2. Returns a generator on calling a generator-function

          Unlike regular function which returns some value (or nothing) on execution, a generator-function returns a generator object.  This generator object is what we will make use or abuse to control how the generator-function should execute.

     3. Suspension and resumption of generator-function can be controlled

         As said above, using the generator object one can control ow to resume and what value to send to it from the client side. We will see a lot more on this in the next section.

Deconstructing a generator-function

Lets take this hello-world example:

def my_gen():
    val = yield "hello, world!"
    return val

There are 2 ways that you can work with this generator-function. Lets see the hard way first:

# Prime the generator
g = my_gen()

#Get the yield value
res = next(g)
print ("my_gen yielded result: {}".format(res))

try:
    next(g)
except StopIteration as e:
    print ("my_gen generator function returned: {}".format(e.value))

This would print:

my_gen yielded result: hello, world!
my_gen generator function returned: None

Hmm...whats going on ? The assignment clearly did not work as the return value is printed as "None"!
Now is a good time to explain about controlling a generator-function using the generator object.

A generator instance has 3 methods defined on it:

  • next
  • send
  • throw
The next  method will proceed to the next 'yield' statement present in the function and executes whatever expression is there to its right side. The return of which is passed to the called of the next method not to the variable on the left hand side of the assignment statement.

The throw method is what will be used by the client/scheduler program to pass exceptions to the generator-function.

The send method is what makes the generator behave as a coroutine. Thanks to PEP-342. 
It is similar to what next does in proceeding to the next yield  statement in the function, but as the name suggests, it can send a value as the result of the execution of the right hand side of the assignment statement.
That means, its completely under my control on what to set the value of the variable val as ? Yup!!

See this in action:

g = my_gen()
next(g) # Yields "hello, world!"
try:
    g.send(42) # Sends in a value 42 which gets assigned to val and does next(g)
except StopIteration as e:
    print ("my_gen returned: {}".format(e.value))

It prints:
my_gen returned: 42

NOTE: In the above example I am using the free function next instead of the member function. The free function operates on an iterable object which the generator is and calls its __next__ method.

Yeah, fetching the return value of a generator-function is pretty weird. Agreed. It is set as the value field of the StopIteration exception.  Remember this.

Isn't this pretty freakishly powerful ? In fact this ability to "send" values to the generator is what makes it a coroutine and is used for creating a tasking system like asyncio.

So, lets reiterate on what is happening with the above simple generator-function:

  1. Prime the generator by calling the function first. That gives us a generator object to work with.
  2. Calling next on the generator object will go to the next yield statement in the function and return the output of its expression to its right side. At this point the function is suspended. It needs to be resumed by calling any of the 3 methods next/send/resume.
  3. At this suspended state, the control is back to client. We want to pass some random value to the generator. If the yield is used as an assignment statement, then the sent value would get assigned to it, otherwise it would just behave like regular next call.

Adding a pictorial view if it helps at all:



As can be seen in the above picture, I have divided the generator function into two blocks.
The client/scheduler code dealing with the generator does below things:
a. Prime the generator function to get the generator object.

b. Do a next on it, which calls the __next__ method of the generator, which in turn goes to the first yield statement inside the generator function and returns the output of the RHS expression, None if no RHS expression is present. In our example, it returns 5.

c. Now we want to set the value of res to something meaningful. This is done by the send method, which places the value 42 on res and also calls the __next__ method of the generator.

d. Since no more yields are present, it will throw a StopIteration exception having the return value of the generator function set in its value member.


I still don't see the point

So, how is all these freaky things going to make me any useful as a programmer you ask.
Lets see a small example of some database driver that you wrote. The task is to fetch some rows from the DB and do something with the rows.

The client of your basic DB driver would probably write something like this:

class DDBResultFetcher(object):
    def __init__(self, rows_cb):
        self.instance = None
        self.is_connected = False
        self._cb = rows_cb
    
    def connect(self):
        if not self.instance:
            instance = MyDBDriver.get_correct_instance()
        
        if self.connected:
            return
        
        self.instance.connect()
        self.is_connected = True
    
    @property
    def connected(self):
        return is_connected
    
    def on_rows_cb(self, rows):
        self._cb(rows)
    
if __name__ == "__main__":
    
    def manage_rows(rows):
        for row in rows:
            #Do something with the rows
            ....
            
    db_f = DDBResultFetcher(manage_rows)
    db_f.connect()
    
    db_f.run()
    
    ##Additional code to check if the results have been completely fetched
    ##or not
    ......
           
Even though I have not added many error or exception checks and still many handling of cases missing, the code to write from a users perspective is quite a lot, error prone and can become unmanageable.

How would a generator/coroutine based DB driver can make my code look like:

def fetch_results_from_db():
    instance = MyDBDriver.get_correct_instance()
    
    if not instance.connected():
        instance.connect()
    
    while True:
        rows = yield instance
        if not rows:
            break
        # Do something with the rows
        ...

My user code is so much simpler now! The onus is now on the library writer to provide mechanisms to interact with this generator-function to use the yield suspension and resumption to send the rows obtained from the DB.

How to do that ? Wait for the next instalment of the series...

NOTE: My intention here is not to propagate the idea that "free functions are better than classes". No, its just an example.


Bonus Section: But still how its works behind the scenes ? 

We will look here at some disassembled output and some C code to gain a little more better understanding of what is going on. It's for those people who cant sleep without seeing a bit of assembly or C/C++ code.




Or (the easier way)

In [7]: dis.dis(gen_obj.gi_code)
  2           0 LOAD_CONST               1 (5)
              3 YIELD_VALUE
              4 STORE_FAST               0 (x)

  3           7 LOAD_FAST                0 (x)
             10 RETURN_VALUE


There are lots of resources on the internet where you can understand what the above output means, but I will just briefly mention about the significance of each column.
As can be seen, there are 5 columns:

  • Column 1 : The line number in the python source file which the bunch of assembly statements represent.
  • Column 2 : The offset in the byte code (gen_obj.gi_code.co_code) where the opcode is present.
  • Column 3 : The opcode name.
  • Column 4 : The index into the attributes of the code object. Its bit more involved, but out of scope for this discussion.
  • Column 5 : The constants and the variable name based upon the opcode. This is only for helping mere humans.
Now, with this much information at hand lets take a look at the disassemble output again. 
  • Load a constant value '5'.
  • Yield. Here is where the current frame is saved along with all its context which is in the gi_code object.
  • On issuing next or send, resume the saved frame and store the sent value ('None' in case of next and send without any argument) into the variable named 'x'.
  • Read the value in variable 'x' and return it.

More Fun with GDB

Now we know that the stack frame for a generator function is persisted on heap so that it can be loaded and resumed again.

NOTE: From whatever I could understand from the cPython code, all stack frames are allocated on heap. But of course nothing should be written on that assumption.

We will see now what happens when I am sending a value to a generator. Things we are expecting to happen are:
  • Load the frame associated with the generator.
  • Send the value ( ? )
  • Run the frame till next yield or throw StopIteration exception
  • In case of yield, go back to the previous frame of execution.
For this, I have a put a breakpoint on _PyGen_Send function which can be found in Objects/genobject.c source of CPython.  It internally calls gen_send_ex which is present in the same source file.

The source code we are debugging:

def my_gen():
    x = yield

g = my_gen()
next(g)
g.send(42)


Lets see what happens in gen_send_ex. We will only see the relevant portions of the code.


static PyObject *
gen_send_ex(PyGenObject *gen, PyObject *arg, int exc)
{
    PyThreadState *tstate = PyThreadState_GET();
    PyFrameObject *f = gen->gi_frame;
    PyObject *result;
     .
     .
     .
}

We are getting the current thread state and the frame associated with the generator object.


    if (f->f_lasti == -1) {
        if (arg && arg != Py_None) {
            char *msg = "can't send non-None value to a "
                        "just-started generator";
            if (PyCoro_CheckExact(gen))
                msg = "can't send non-None value to a "
                      "just-started coroutine";
            PyErr_SetString(PyExc_TypeError, msg);
            return NULL;
        }
    } else {
        /* Push arg onto the frame's value stack */
        result = arg ? arg : Py_None;
        Py_INCREF(result);
        *(f->f_stacktop++) = result;
    }

Here we are checking the value of the last executed instruction on the generator function frame (f->f_lasti). If it's set to -1 it means the generator has not moved to its first yield statement.

Lets check the state of our generator stack frame (which is variable 'f'):

(gdb) p *f
$18 = {
  ob_base = {
    ob_base = {
      _ob_next = 0x7fcf85618d48,
      _ob_prev = 0x7fcf855825a8,
      ob_refcnt = 1,
      ob_type = 0x88cc20 
    },
    ob_size = 20
  },
  f_back = 0x0,
  f_code = 0x7fcf86660100,
  f_builtins = 0x7fcf86784b48,
  f_globals = 0x7fcf8673ac98,
  f_locals = 0x0,
  f_valuestack = 0x2916478,
  f_stacktop = 0x2916478,
  f_trace = 0x0,
  f_exc_type = 0x0,
  f_exc_value = 0x0,
  f_exc_traceback = 0x0,
  f_gen = 0x7fcf855825a8,
  f_lasti = 3,
  f_lineno = 1,
  f_iblock = 0,
  f_executing = 0 '\000',
  f_blockstack = {{
      b_type = -875836469,
      b_handler = -875836469,
      b_level = -875836469
    } },
  f_localsplus = {0x0}
}

Check the highlighted portion. The blue field says that we are at the first line of the frame, which is true, we are at the yield statement which is first line of our generator function my_gen.

Since our generator has already reached the yield (because we already executed next on the generator before sending any value), lets focus on the else portion:

    ...
    } else {
        /* Push arg onto the frame's value stack */
        result = arg ? arg : Py_None;
        Py_INCREF(result);
        *(f->f_stacktop++) = result;
    }

Hmm...its just putting the passed value on the top of the stack! Now that should result in a STORE, right ? Lets check the disassembly:

In [3]: dis.dis(my_gen)
  2           0 LOAD_CONST               0 (None)
              3 YIELD_VALUE
              4 STORE_FAST               0 (x)
              7 LOAD_CONST               0 (None)
             10 RETURN_VALUE

Yup, thats looks about correct and as expected. The sent value is getting stored in variable x.

From here, python should execute the current generator frame and should do some bookkeeping to get back to the previous frame which called the generator next/send once it yields again or finishes.

    /* Generators always return to their most recent caller, not
     * necessarily their creator. */
    Py_XINCREF(tstate->frame);
    assert(f->f_back == NULL);
    f->f_back = tstate->frame;

    gen->gi_running = 1;
    result = PyEval_EvalFrameEx(f, exc);
    gen->gi_running = 0;

The current frame is saved in the generator's frame as highlighted  and PyEval_EvalFrameEx will continue with the execution of the frame.

I have jumped through lots of details here but this should be a good start for anyone interested in digging deeper.

Part-2 About Futures, Executors and Tasks