Sunday, 25 November 2018

Coro-Async : A lite C++ coroutine networking library | Introduction

I have been working on and off on a small networking library in C++ for past few weeks. The purpose of this library was mainly to get to know the C++ coroutine feature which is implemented in Clang.

The features currently supported by the library are:
  1. TCP socket stream support. Yup, no UDP for now.
  2. IPv4 only. Will have IPv6 soon.
  3. Performing asynchronous accept, connect, read, write on the socket by awaiting.
  4. Yielding a task for a time period.
  5. Awaiting on the task which itself is a coroutine.
Things missing:
  1. UDP support. Not planning to support anything else.
  2. Low level socket option setting. Will have to get the native handle now to set it.
  3. IPv6 addressing support.
  4. Few things currently might not be thread safe if you are running `io_service` from multiple threads.
  5. Allocation and continuation related optimizations.
  6. And many other things that a proper and complete networking library needs.
All the code shown here can be found at:

In this post I will not be covering the basics of the coroutines, but will be providing code samples showing how coroutines can ease writing concurrent programs (The trouble is for the library implementors :) ). A later post will delve more into the details of its implementation.

"Hello World" of network programming

NOTE: We will be ignoring error checks in the code for the sake of conciseness.

What we will be seeing here is a concurrent, single threaded (for the most part) echo server.

The main:

int main() {
    io_service ios{};
    coro_acceptor acceptor{ios};

    std::error_code ec{};
    acceptor.open("127.0.0.1", 8080, ec);
    
    server_run(acceptor);

    std::thread thr{[&] { ios.run(); }};
    thr.join();
    return 0;
}

There are some thread related code which can be abstracted away, but I let it be like how it is for ASIO.

What's going on here is:
  1. Create an instance of io_service which basically runs a scheduler and a reactor behind the scenes (Only when its `run` method is called).
  2. Create an instance of coro_acceptor which is basically a wrapper over the async version of accept but provides an API returning types satisfying the Awaitable interface.
  3. Opening the acceptor basically means binding the port with an IP address. We will be ignoring any failures here.
  4. Pass the acceptor to a coroutine function "server_run". We will see this later.
  5. Run the io_service on a separate thread so as to not block the current/main thread.

The server_run coroutine:

coro_task_auto<void> server_run(coro_acceptor& acc)
{
  while ( true )
  {
    auto result = co_await acc.accept();
    if (result.is_error())
    {
      std::cerr << "Accept failed: " << result.error().message() << '\n';
      co_return;
    }
    handle_client(std::move(result.result()));
  }
  co_return;
}

What's going on here is:
  1. The signature of the function indicates that it returns an object of type "coro_task_auto" which is the return object type of the coroutine promise. The template parameter type is void, indicating that the function does not generate any interesting value. You can see that we do not return anything with "co_return".
  2. We await on accept in an infinite while loop. But, instead of blocking on "accept" here, we would be yielding the control back so that some other task can continue.
  3. All awaited socket operations returns a "Result" type. There are many variants of this result type which the client does not need to know about, hence the auto. The result could either be an error or the actual result value. For, the "accept" case, the return value would be a valid client side socket.
  4. Once a valid socket is obtained, handle it of to the "handle_client" coroutine and repeat.

The handle_client coroutine:

coro_task_auto<void> handle_client(coro_socket client)
{
  char buf[6]; // for only "Hello!"
  auto bref = as_buffer(buf);
  co_await client.read(6, bref);
  co_await client.write(6, bref);
  client.close();
  co_return;
}

What's going on here is:
  1. The explanation for the function signature is as explained previously.
  2. The buffer is kept fixed to 6 bytes as we know we would be only handling "Hello!". I guess it's ok for the sample.
  3. You would have noticed that the buffer is allocated on stack, but for something to be used in asynchronous way, we usually allocate it on the heap, right ? Well, yes, here too you need something like that, but that is taken care when the coroutine allocates (or might elide) for its frame object. The compiler can determine how much of memory needs to be allocated to keep all the objects created inside of the coroutine alive between suspension points.
  4. We issue the special coroutine friendly awaitable read and write calls. These doesn't block. If a write operation or read could not be made, the coroutine would be suspended till the underlying socket is ready. So, during this time, some other operation, for example, "accept" can progress.

Thats basically it! The complete code is as less as:

#include <iostream>
#include <thread>
#include "coro_async.hpp"

using namespace coro_async;

coro_task_auto<void> handle_client(coro_socket client)
{
  char buf[6]; // for only "Hello!"
  auto bref = as_buffer(buf);
  co_await client.read(6, bref);
  co_await client.write(6, bref);
  client.close();
  co_return;
}

coro_task_auto<void> server_run(coro_acceptor& acc)
{
  while ( true )
  {
    auto result = co_await acc.accept();
    if (result.is_error())
    {
      std::cerr << "Accept failed: " << result.error().message() << '\n';
      co_return;
    }
    handle_client(std::move(result.result()));
  }
  co_return;
}

int main() {
  io_service ios{};
  coro_acceptor acceptor{ios};

  std::error_code ec{};
  acceptor.open("127.0.0.1", 8080, ec);
    
  server_run(acceptor);

  std::thread thr{[&] { ios.run(); }};
  thr.join();
  return 0;
}

Now, I do completely understand the uselessness of the above code, but it's still something, right ?


Sleeping Beauty

No idea why I chose this heading, but I have no better alternative yet.

If you want to sleep without blocking the thread, then you can do this:

using namespace coro_async;
using namespace std::chrono_literals;

coro_task_auto<void> waiter(io_service& ios)
{
  coro_scheduler s{ios};
  std::cout << "start sleep" << std::endl;
  co_await s.yield_for(2s);
  std::cout << "end sleep" << std::endl;
  co_return;
}

What's going on here is:

  1. We create an instance of coro_scheduler which basically exposes API's which returns an Awaitable. For "yield_for", it returns an awaitable which suspends the coroutine, schedules a timed callback to the io_service scheduler so as to resume the coroutine after the specified time.

Waiting for other tasks

There sure comes scenarios where you have to wait for a task to finish. The task itself could be a coroutine which could suspend again on different kinds of awaitables.

Lets see an example:

coro_task_auto<void> sleeper(io_service& ios)
{
  coro_scheduler s{ios};
  std::cout << "start sleep" << std::endl;
  co_await s.yield_for(2s);
  std::cout << "end sleep" << std::endl;
  co_return;
}

coro_task_auto<int> deep_thinking(io_service& ios)
{
  coro_scheduler s{ios};
  co_await s.yield_for(2s);
  co_return 42;
}

coro_task_auto<void> waiter(io_service& ios)
{
  coro_scheduler s{ios};
  std::cout << "Wait for completion" << std::endl;
  co_await s.wait_for([&ios] () -> auto { return sleeper(ios); });
  std::cout << "Wait completed" << std::endl;

  std::cout << "Do some deep thinking" << std::endl;
  auto result = co_await s.wait_for([&ios]() -> auto { return deep_thinking(ios); });
  std::cout << "Answer after deep thinking: " << result.result() << std::endl;
  co_return;
}

Nothing much to explain here except for the lambda weirdness. That is required because the Handler passed has a requirement that no arguments should be passed.


A damn simple Redis client

This is it:

coro_task_auto<void> run_command(io_service& ios, std::string cmd)
{
  coro_connector conn{ios};
  coro_scheduler s{ios};

  auto result = co_await conn.connect("127.0.0.1", 6379);
  if (result.is_error())
  {
    std::cout << "Connection to redis failed: " << result.error().message() << std::endl;
    co_return;
  }
  std::cout << "connected to redis" << std::endl;
  auto& sock = result.result();

  auto cmd_buf = as_buffer(cmd);
  co_await sock.write(cmd.length(), cmd_buf);

  std::cout << "command written to redis" << std::endl;
  co_return;
}

Have fun!