C++20 Coroutines and io_uring - Part 3/3

You have made it to the last part of the series. In part 2 we wrote a coroutine-based program that loads and parses a list of OBJ files using coroutines and io_uring. The program still has a big disadvantage: its CPU-bound. Parsing of OBJ files, the most costly part of the algorithm, is performed sequentially on a single thread.

The root of the problem is that our coroutines are resumed on the main thread. Ideally we would like to wake them up in different threads, so that multiple parsings can be conducted in parallel.

This is exactly what we will do. We will add a second await expression inside our coroutine: co_await pool.schedule(). This will cause our coroutine to be suspended and scheduled to be resumed in a thread pool:

Task parseOBJFile(IOUring &uring, const ReadOnlyFile &file, ThreadPool &pool) {
  std::vector<char> buff(file.size());
  int status = co_await ReadFileAwaitable(uring, file, buff);
  co_await pool.schedule();
  // This is now running on a worker thread
  Result result{.status_code = 0, .file = file.path()};
  readObjFromBuffer(buff, result.result);
  co_return result;
}

ThreadPool

ThreadPool builds the core of our multi-threaded implementation.

ThreadPool wraps around a bshoshany Thread Pool’s object. Its API is very simple, you use push_task to schedule tasks to be run on the thread pool.

class ThreadPool {
public:
  auto schedule() {
    struct Awaiter : std::suspend_always {
      BS::thread_pool &tpool;
      Awaitable(BS::thread_pool &pool) : tpool{pool} {}
      void await_suspend(std::coroutine_handle<> handle) {
        tpool.push_task([handle, this]() { handle.resume(); });
      }
    };
    return Awaiter{pool_};
  }

  size_t numUnfinishedTasks() const { return pool_.get_tasks_total(); }

private:
  BS::thread_pool pool_;
};

ThreadPool defines a member function schedule() which returns an instance of Awaiter. When a coroutine co_awaits the Awaiter object it’s immediately suspended (note that Awaiter inherits from std::suspend_always) and schedules it to be resumed in a worker thread in await_suspend.

This is beautiful! By simply writing co_await pool.schedule() we can wake up the current coroutine in a different thread, greatly improving the performance of our application. Parsing of OBJ files happens in parallel.

Multi-Threaded Implementation

That’s it, now we can write the top-level function that uses our new coroutine to load and parse the OBJ files:

std::vector<Result> coroutinesThreadPool(const std::vector<ReadOnlyFile> &files) {
  IOUring uring{files.size()};
  ThreadPool pool;
  std::vector<Task> tasks;
  tasks.reserve(files.size());
  for (const auto &file : files) {
    tasks.push_back(parseOBJFile(uring, file, pool));
  }
  io_uring_submit(uring.get());
  while (pool.numUnfinishedTasks() > 0 || !allDone(tasks)) {
    // consume entries in the completion queue
    // return immediately if the queue is empty
    consumeCQEntriesNonBlocking(uring);
  }

  return gatherResults(tasks);
}

We initialize a thread pool and io_uring instance. We call parseOBJFile for each file, which will to fill up the submission queue. Later we submit the requests to the kernel using io_uring_submit().

Once the kernel is reading the files in the background, coroutines are waken up as the corresponding completion entries arrive. This happens in consumeCQEntriesNonBlocking:

int consumeCQEntriesNonBlocking(IOUring &uring) {
  io_uring_cqe *temp;
  if (io_uring_peek_cqe(uring.get(), &temp) == 0) {
    return consumeCQEntries(uring);
  }
  return 0;
}

Which uses io_uring_peek_cqe to peek for existing entries in the completion queue and exits immediately if its empty.

We continue to wait until all coroutine have finalized. Since allDone is a linear check, we include an early-exit to avoid calling it that often: pool.numUnfinishedTasks() > 0. If there are unfinished tasks scheduled in the thread pool its obvious that we are not done yet.

Closing Words

We have made it! Hopefully by know you can appreciate how cool coroutines are, once you have an existing coroutine, adding extra co_awaits is child’s play.

You can find the entire code for the blog series here.


© 2023. All rights reserved.

Powered by Hydejack v9.1.6