C++20 Coroutines and io_uring - Part 2/3

In the first part of the series we learned about io_uring by writing a program that reads and parses hundreds of OBJ files from disk. In this second part of the series we will rewrite that program of by making use of C++20 coroutines.

Even thought required basic concepts are explained, this post is not meant to be an in-depth dive into C++ coroutines, the goal is rather to showcase of how coroutines can be used together with io_uring. If you are not familiar with C++ coroutines yet, Lewis Baker’s blog series is a great place to start.

Basic Idea

What we want to do is simple: implement a coroutine that loads and parses an OBJ file from disk using io_uring. When this coroutine is called it pushes a submission request into the submission queue and suspends execution, returning to the caller. Once the corresponding completion entry has arrived the coroutine is resumed and the OBJ can be parsed:

Task parseOBJFile(IOUring &uring, const ReadOnlyFile &file) {
  std::vector<char> buff(file.size());
  int status = co_await ReadFileAwaitable{uring, file, buff};
  // completion entry has arrived at this point
  // buffer has been filled and parsing can take place
  Result result{.status_code = 0, .file = file.path()};
  readObjFromBuffer(buff, result.result);
  co_return result;
}

This is our coroutine. It looks similar to a normal function, in fact, it’s just good old imperative synchronous code: allocate a buffer, read contents of file into buffer, parse contents of buffer, return result.

The coroutine is naturally not executed synchronously: it suspends execution and yields control to the caller at predetermined locations. Most notably, at the await expression in the second line: co_await ReadFileAwaitable{uring, file, buff}. This is the point where the submission entry is pushed to the submission queue and control returns to the caller. co_await is an operator that is applied to an awaitable and suspends execution. ReadFileAwaitable is such an awaitable, its task is to suspend the coroutine and register it to be woken up in the future. Once the coroutine is resumed execution continues right after the co_await expression.

Suspending Execution

ReadFileAwaitable is the awaitable we use used to suspend the coroutine. Don’t be scared we will explain what’s going on here:

class ReadFileAwaitable {
public:
  ReadFileAwaitable(IOUring &uring, const ReadOnlyFile &file,
                    const std::vector<char> &buf) {
    sqe_ = io_uring_get_sqe(uring.get());
    io_uring_prep_read(sqe_, file.fd(), buf.data(), buf.size(), 0);
  }

  auto operator co_await() {
    struct Awaiter {
      io_uring_sqe *entry;
      RequestData requestData;

      Awaitable(io_uring_sqe *sqe) : entry{sqe} {}
      bool await_ready() { return false; }
      void await_suspend(std::coroutine_handle<> handle) noexcept {
        requestData.handle = handle;
        io_uring_sqe_set_data(entry, &requestData);
      }
      int await_resume() { return requestData.statusCode; }
    };
    return Awaiter{sqe_};
  }

private:
  io_uring_sqe *sqe_;
};

ReadFileAwaitable creates a submission queue entry and pushes it into the submission queue. This makes sense, as we must push the request into the queue before suspending the coroutine.

Awaiter has two member variables: entry and requestData. entry is a pointer to the submission entry, we need it to be able to attach user data to it with io_uring_sqe_set_data. User data is arbitrary data (literally a void*) that we can attach to a submission request. The user data is propagated by the kernel as-is to the corresponding completion entry and its usually used to uniquely distinguish a completion entry and link it to the corresponding submission request.

ReadFileAwaitable overloads operator co_await(), which returns an awaiter: Awaiter. Awaiter implements the required member functions dictated by the standard: await_ready, await_suspend and await_resume.

await_ready returns always false. This means that the coroutine will always be suspended on co_await ReadFileAwaitable{}. We could have also inherited from std::suspend_always.

Now we must implement await_suspend. This function is called right after the coroutine has been suspended and receives the coroutine handle as parameter. This is a great place to write the user data to the submission queue entry. But what should we write as user data?

Let’s think for minute: we pushed a submission entry into the submission queue. There will be a corresponding completion entry arriving soon at the completion queue. Once the completion entry has arrived we must resume the coroutine that was just suspended. What do we need to resume the coroutine? The coroutine handle that has just been passed to us! Let’s write the coroutine handle as user data into the submission request!

The coroutine handle is stored inside a member variable of Awaiter,requestData of type RequestData:

struct RequestData {
  std::coroutine_handle<> handle;
  int statusCode{-1};
};

RequestData stores the coroutine handle and the status code of the read operation. The status code will be written to the Awaiter object later on, when the completion request has arrived. The user data is therefore a pointer to the requestData data member of the awaiter object.

Finally we can implement await_resume. await_resume is called right before the coroutine is resumed and returns the status code of the read operation. In other words, we can assume that requestData.statuCode has been initialized by the time await_resume is called.

The return value of await_resume is the result of the entire await expression, meaning that we can write:

int status = co_await ReadFileAwaitable{}
if (!status) {
    // ...
}

as if it were a normal read call.

Resuming Execution

Calling co_await ReadFileAwaitable puts the coroutine to sleep, but how do we wake it up? Easy, we wait for completion requests to arrive at the completion queue and after a completion entry has arrived we extract the coroutine handle inside it and use it to resume the coroutine:

int consumeCQEntries(IOUring &uring) {
  int processed{0};
  io_uring_cqe *cqe;
  unsigned head; // head of the ring buffer, unused
  io_uring_for_each_cqe(uring.get(), head, cqe) {
    auto *request_data = static_cast<RequestData *>(io_uring_cqe_get_data(cqe));
    // make sure to set the status code before resuming the coroutine
    request_data->statusCode = cqe->res;
    request_data->handle.resume(); // await_resume is called here
    ++processed;
  }
  io_uring_cq_advance(uring.get(), processed);
  return processed;
}

Before resuming the coroutine we have to write the status code into request_data. request_data is a pointer to the requestData data member in the Awaiter object that is returned in await_resume.

We write now a consumeCQEEntriesBlocking helper function which submits pending submission entries to the kernel and blocks until at least one completion entry arrives:

int consumeCQEntriesBlocking(IOUring &uring) {
  io_uring_submit_and_wait(uring.get(), 1); // blocks if queue empty
  return consumeCQEntries(uring);
}

We have learned the mechanics of suspending and resuming our coroutine, now we can write the client code that uses it to load the OBJ files.

Intuitively we must allocate some std::vector that contains the results returned by parseOBJFile, but what is the return type of parseOBJFile? What is the return type of a coroutine? It’s a coroutine type called Task.

Coroutine Type: Task

Task is the return type of our coroutine. We must implement it ourselves by defining the API demanded by the standard. Again, don’t worry we will go through it:

class Task {
public:
  struct promise_type {
    Result result;

    Task get_return_object() { return Task(this); }

    void unhandled_exception() noexcept {}

    void return_value(Result result) noexcept { result = std::move(result); }
    std::suspend_never initial_suspend() noexcept { return {}; }
    std::suspend_always final_suspend() noexcept { return {}; }
  };

  explicit Task(promise_type *promise)
      : handle_{HandleT::from_promise(*promise)} {}
  Task(Task &&other) : handle_{std::exchange(other.handle_, nullptr)} {}

  ~Task() {
    if (handle_) {
      handle_.destroy();
    }
  }

  Result getResult() const & {
    assert(handle_.done());
    return handle_.promise().result;
  }

  Result&& getResult() && {
    assert(handle_.done());
    return std::move(handle_.promise().result);
  }

  bool done() const { return handle_.done(); }

  using HandleT = std::coroutine_handle<promise_type>;
  HandleT handle_;
};

Task defines a promise type. The promise object is instantiated for every call to the coroutine and lives inside the coroutine frame. The promise object is used to transmit the result of the coroutine (or an exception if one is thrown inside the coroutine body). Hence, the promise type has a member Result result, which contains the final result of the parsing of an OBJ file:

struct Result {
  tinyobj::ObjReader result; // stores the actual parsed obj
  int status_code{0};        // the status code of the read operation
  std::string file;          // the file the OBJ was loaded from
};

result is initialized from inside the member function return_value, which is called when the coroutine reaches an co_return statement.

Task defines a convenience member function getResult() that extracts and returns the result from the promise.

promise_type must define a member function get_return_object which returns the actual coroutine object. In our case its an instance of Task. unhandled_exception is called when the coroutine body throws, we left it unimplemented as we are exception free (or aim to be). initial_suspend and final_suspend determine the initial and final suspension behavior of the coroutine, whether the coroutine should start or finish in a suspended state or not.

Task contains the coroutine handle handle_ and manages its lifetime: it destroys the coroutine frame in upon destruction by calling handle_.destroy(). It also defines a done() convenience member that tells whether the coroutine has finished executing.

C++20 coroutines are raw, this means that instead of being a finished cake, they are more like a bunch of flour, eggs and butter. In order to implement a coroutine you have to write a lot of supporting code and boilerplate, you must bake your own cake. For this reason it is usual to make use of a coroutine library like cppcoro by Lewis Baker, which implement generic version of our Task type, among a lot of more useful abstractions, greatly reducing the amount of boilerplate.

Putting it all together

Now we can implement the top-level function that parses the OBJ files using coroutines:

std::vector<Result> parseOBJFiles(const std::vector<ReadOnlyFile> &files) {
  IOUring uring{files.size()};
  std::vector<Task> tasks;
  tasks.reserve(files.size());
  for (const auto &file : files) {
    tasks.push_back(parseOBJFile(uring, file));
  }
  while (!allDone(tasks)) {
    // consume all entries in the submission queue
    // if the queue is empty block until the next completion arrives
    consumeCQEntriesBlocking(uring);
  }
  return gatherResults(tasks);
}

It allocates a vector of Tasks by executing the parseOBJFile coroutine for each file. Note that initial_suspend in promise_type returns std::suspend_never, this means that the parseOBJFile coroutine doesn’t start suspended but right away continues execution until co_await ReadFileAwaitable, where the coroutine is suspended.

Once all coroutines are suspended and the kernel is doing its work, we don’t have other option other than wait until some completion requests start appearing in the completion queue, consumeCQEEntriesBlocking wakes up the coroutines one-by-one as the corresponding completion entries start arriving.

allDone is a simple helper that checks if all coroutines have been fully executed to completion:

bool allDone(const std::vector<Task> &tasks) {
  return std::all_of(tasks.cbegin(), tasks.cend(),
                     [](const auto &t) { return t.done(); });
}

Upon resumption coroutines parse the OBJ file and return the result with co_return result;.

Finally we can extract the final results from the finished coroutines:

std::vector<Result> gatherResults(const std::vector<Task> &tasks) {
  std::vector<Result> results;
  results.reserve(tasks.size());
  for (auto &&t : tasks) {
    results.push_back(std::move(t).getResult());
  }
  return results;
}

At the end of the function’s block scope, all Tasks are destroyed, deallocating all coroutine frames.

Closing Thoughts

You may be asking yourself: what is the actual point of all this is compared to the implementation without coroutines? Its a fair question. One may argue that we have just added unnecessary boilerplate to an already straight-forward implementation. Its also not particularly more efficient: we are stillperforming parsing sequentially in a single thread.

Luckily we are not done yet. See, the beauty of coroutines is that they are very composable. Once you a have a coroutine-based implementation adding more awaitables or other coroutines is child’s play, they fit like lego stones.

In part 3 we will extend our coroutine-based implementation such that parsing is performed in parallel in a thread pool. This is were the real power of coroutines will come to light.


© 2023. All rights reserved.

Powered by Hydejack v9.1.6