libcoro という C のライブラリがある。Perl Mongers にはおなじみ (だった) 協調スレッド実装である Coro.pm のバックエンドとして使われているライブラリで、作者は Coro と同じく Marc Lehmann 氏。
coro というのは Coroutine (コルーチン) の略で、要するに処理の進行を明示的に中断して別の実行コンテキストに切り替えたり、そこからさらに再開できる機構のことである。言語やプラットフォームによって Fiber と呼ばれるものとほぼ同義。
(ネイティヴ) スレッドとの違いはとどのつまり並行処理と並列処理の違いで、スレッドは同時に複数の実行コンテキストが進行し得るがコルーチンはある時点では複数の実行コンテキストのうち高々一つだけが実行され得る。 スレッドに対するコルーチンの利点は主に理解のし易さにある。スレッドの実行中断と再開は予測不可能なタイミングで起こるため、メモリその他の共有資源へのアクセスが常に競合し得る。一方コルーチンは自発的に実行を中断するまでプロセスの資源を独占しているため、コンテキスト・スイッチをまたがない限り共有資源の排他制御や同期などを考えなくて良い。
同時に一つのコルーチンしか実行されないということは、プロセッサのコア数に対して処理がスケールアウトしないことを意味する。ただしシングルスレッドのプログラムでも IO などの間はプロセッサが遊んでいるため、非同期 IO とコルーチンを組み合わせるなどして待ち時間に別の処理を行わせることで効率を高められることが多い。 また1コアでの性能に関しては、コンテキスト・スイッチの回数が減り、またスイッチング自体もユーザモードで完結するため、スレッドよりも高速である場合が多い。このため「軽量スレッド」とも呼ばれることがある。
libcoro の特徴
C で利用できるコルーチン実装は複数あって、Wikipedia にある Coroutine の記事 を見ても片手では足りない数が挙げられている。
libcoro がその中でどう特徴付けられるかというとポータビリティが挙げられる。
実装のバックエンドは Windows の Fiber や POSIX の ucontext の他、setjmp
/longjmp
に pthread
果てはアセンブラによる実装が選択でき、API は共通である。 少なくとも setjmp
/longjmp
は C90 の標準ライブラリ関数なので現代の OS であれば利用できるはずだ。
ライブラリはヘッダファイルと実装を収めたソースコードファイル1つずつからなる。 CVS レポジトリには Makefile すら含まれていない。ビルドするにはバックエンドを選択するプリプロセッサマクロを定義するだけで良い:
# CORO_SJLJ は setjmp/longjmp を使った実装
bash-3.2$ clang -DCORO_SJLJ -c -o coro.o coro.c
bash-3.2$ ar crs libcoro.a coro.o
あとは使用するプログラムとリンクするだけ:
# マクロはライブラリのコンパイル時と同じものを与える必要がある
bash-3.2$ clang++ --std=c++11 -DCORO_SJLJ -I. coro_usage.cc -L. -lcoro -static
API
シンプルなライブラリにはシンプルな API しかない。できるのは「一つのコルーチンから別のコルーチンを指定してコンテキスト・スイッチする」ことだけである。
コルーチンを一つ生成して実行するには以下の手順を踏む。ドキュメントはヘッダファイル内のコメントのみだが素晴らしく詳細である。
1. スタックを初期化する
まずコルーチンが使用する専用のスタックを確保する。スタック領域を確保して coro_stack
構造体を初期化する関数 coro_stack_alloc
を使用する。 スタックは第二引数に指定した個数のポインタが保持できる大きさになる。要するに指定した数に8倍したバイト数が確保される。通常は考えるのが面倒なので0を指定するとよしなに確保してくれる。 戻り値は確保の成否を返す。
struct coro_stack stack;
if (!coro_stack_alloc(&stack, 0)) {
perror("coro_stack_alloc");
}
2. コルーチンを作成する
コルーチン自身は coro_context
型で表現される。これを初期化する関数は coro_create
である。
第一引数に初期化したいコルーチンへのポインタを指定する。残りの引数はコルーチンとして実行すべき関数、コルーチンに渡す引数へのポインタ、確保したポインタのサイズ、そして確保したスタック領域へのポインタである。 最後の二つは前もって確保した coro_stack
の sptr
と ssze
メンバがそれぞれ対応する。malloc
やなんかで勝手に確保したメモリ領域を渡すと落ちるので注意。
coro_context context;
coro_create(
&context,
[](void *) { ... },
nullptr,
stack.sptr,
stack.ssze);
C++ ユーザに残念なお知らせ: 関数として lambda は使えるが変数はキャプチャできない。何故かというに、coro_create
の第二引数の型 coro_func
は void (*)(void *)
の typedef に過ぎないので lambda から coro_func
への型変換 operator void(*)(void *)
が必要だからである (ところでこれが合法なメンバ関数名なのはひどすぎると思う)。
coro_func
が受け取る void *
には coro_create
の第三引数が渡される。外部の環境 (というより呼出し元のコール・スタック) をコルーチン内から参照したければここに必要なものを詰めてお土産に持たせることになる。このあたりは C なので仕方がない。
3. コルーチンを実行する
コルーチンの実行を開始するにはちょっと工夫が要る。libcoro にはコンテキスト・スイッチする関数しかないので、スイッチ元となるコルーチンが要るからである。
それで実行開始と終了のために特別な「空の」コルーチンを生成する必要がある。これは coro_context
を null ポインタと0で初期化することで得られる:
coro_context empty;
coro_create(&empty, nullptr, nullptr, nullptr, 0);
これで準備ができたので、空のコルーチンから目的のコルーチンへコンテキスト・スイッチする。その際に使うのは coro_transfer
である:
coro_transfer(&empty, &context);
以降再び coro_transfer
が呼ばれるまで context
が表すコルーチンが独占的に実行される。 コルーチンの実行を終了するときは空のコルーチンへ再度コンテキスト・スイッチすれば良い。
4. リソースを開放する
処理が終わったらメモリの片付けをする。コルーチンは coro_destroy
で破棄し、対応するスタックは coro_stack_free
で開放する:
coro_destroy(context);
coro_stack_free(stack);
coro_destroy(empty); // 空のコルーチンに対応するスタックはないので coro_stack_free は不要
注意点
コルーチンへの引数が void *
なのでどうやっても型チェックは効かない。また渡したオブジェクトの寿命をコルーチンの実行終了まで保たせるのはプログラマの責任である。
外部の環境をコルーチン内から参照できないので、コンテキスト・スイッチしたいときに coro_transfer
に渡す自分自身をどうやって指定するかが問題になる。手っ取り早いのは static coro_context contexts[MAX_COROS]
でも作ってまとめておく方法である。真面目にやるなら汎用のスレッドローカルストレージに類する機構を作ってそこに入れておくのが良いと思う。 あるいはコルーチンへの引数としてコルーチン自身へのポインタを渡しても良い。この場合スイッチ先のコルーチンか、あるいはそれを決めるディスパッチャのようなものを一緒に渡す必要がある。
サンプル
お決まりの producer/consumer のサンプル・プログラムを書いた。
libcoro の上に直接並行処理プログラムを書くのはさすがに辛いものがあるので、ちょっと高水準なライブラリを書いてそれを使うことにした。C で書くには人生が短すぎるので C++ で書いた。
単純なラウンドロビン・ディスパッチャを実装してコンテキスト・スイッチする先を考えなくても良いようにした。真面目に並行処理するなら優先度つきキューやらコルーチンの実行状態表やら導入してもっとマシなスケジューリングが要るが考えたくない。 コルーチン間の通信には Coro ライクな Channel 機構を作ってそれを利用した。
自前ライブラリの実装が150行。ユーザーコードである main
が30行。標準外ライブラリへの依存はない:
#include <algorithm> | |
#include <cerrno> | |
#include <cstring> | |
#include <memory> | |
#include <queue> | |
#include <stdexcept> | |
#include "coro.h" | |
class Coro { | |
public: | |
Coro(const Coro&) = delete; | |
Coro& operator=(const Coro&) = delete; | |
Coro() | |
: context_(new coro_context), | |
stack_(new coro_stack { .sptr = nullptr, .ssze = 0 }) { | |
coro_create(context_.get(), nullptr, nullptr, stack_->sptr, stack_->ssze); | |
} | |
Coro(Coro&& other) = default; | |
Coro(coro_func coro, void *arg = nullptr, std::size_t stack_size = 0) | |
: context_(new coro_context), stack_(new coro_stack) { | |
if (!coro_stack_alloc(stack_.get(), stack_size)) { | |
char error_str[256]; | |
strerror_r(errno, error_str, sizeof(error_str) / sizeof(error_str[0])); | |
throw std::runtime_error(error_str); | |
} | |
coro_create(context_.get(), coro, arg, stack_->sptr, stack_->ssze); | |
} | |
~Coro() { | |
if (context_) { coro_destroy(context_.get()); } | |
if (stack_) { coro_stack_free(stack_.get()); } | |
} | |
Coro& operator=(Coro&& other) = default; | |
bool operator==(const Coro& other) const noexcept { | |
return context_ == other.context_ && stack_ == other.stack_; | |
} | |
bool operator!=(const Coro& other) const noexcept { | |
return !operator==(other); | |
} | |
coro_context& Context() { return *context_; } | |
std::size_t StackSize() const { return stack_->ssze; } | |
void Transfer(Coro& next) { | |
coro_transfer(context_.get(), &next.Context()); | |
} | |
private: | |
std::unique_ptr<coro_context> context_; | |
std::unique_ptr<coro_stack> stack_; | |
}; | |
class Scheduler { | |
public: | |
class Terminator { | |
public: | |
~Terminator() { Scheduler::GetInstance().Terminate(); } | |
}; | |
Scheduler(const Scheduler&) = delete; | |
Scheduler(Scheduler&&) = delete; | |
Scheduler& operator=(const Scheduler&) = delete; | |
Scheduler& operator=(Scheduler&&) = delete; | |
void Cede() { | |
if (!Running()) { return; } | |
if (queue_.size() <= 1) { return; } | |
queue_.push(std::move(queue_.front())); | |
queue_.pop(); | |
queue_.back().Transfer(queue_.front()); | |
} | |
static Scheduler& GetInstance() { | |
static Scheduler instance; | |
return instance; | |
} | |
void Register( | |
coro_func coro, void *arg = nullptr, std::size_t stack_size = 0) { | |
queue_.emplace(coro, arg, stack_size); | |
} | |
void Run() { | |
if (Running()) { return; } | |
if (queue_.empty()) { return; } | |
running_ = true; | |
main_.Transfer(queue_.front()); | |
} | |
bool Running() const { return running_; } | |
void Terminate() { | |
if (!Running()) { return; } | |
Coro terminated = std::move(queue_.front()); | |
queue_.pop(); | |
Coro& next = queue_.empty() ? main_ : queue_.front(); | |
if (next == main_) { running_ = false; } | |
terminated.Transfer(next); | |
} | |
private: | |
Scheduler() = default; | |
Coro main_; | |
std::queue<Coro> queue_; | |
bool running_ = false; | |
}; | |
template <typename T> | |
class Channel { | |
public: | |
Channel(std::size_t capacity = 0) : capacity_(capacity) {} | |
void Close() { closed_ = true; } | |
bool Closed() const { return closed_; } | |
bool Empty() const { return queue_.empty(); } | |
T Get() { | |
while (!Closed() && Empty()) { Scheduler::GetInstance().Cede(); } | |
if (Closed() && Empty()) { | |
throw std::runtime_error("Trying fetch data from closed channel."); | |
} | |
T value(std::move(queue_.front())); | |
queue_.pop(); | |
return value; | |
} | |
void Put(T value) { | |
while (CapacityIsSet() && queue_.size() >= capacity_) { | |
Scheduler::GetInstance().Cede(); | |
} | |
queue_.push(value); | |
} | |
private: | |
bool CapacityIsSet() const { return capacity_ > 0; } | |
bool closed_ = false; | |
std::size_t capacity_; | |
std::queue<T> queue_; | |
}; | |
#include <iostream> | |
int main() { | |
auto& sched = Scheduler::GetInstance(); | |
Channel<int> ch(5); | |
sched.Register( | |
[](void *arg) { | |
Scheduler::Terminator guard; | |
auto ch = reinterpret_cast<Channel<int> *>(arg); | |
for (int i = 0; i < 10; ++i) { | |
ch->Put(i); | |
std::cout << "Producer: " << i << std::endl; | |
} | |
ch->Close(); | |
}, | |
reinterpret_cast<void *>(&ch)); | |
sched.Register( | |
[](void *arg) { | |
Scheduler::Terminator guard; | |
auto ch = reinterpret_cast<Channel<int> *>(arg); | |
while (!(ch->Closed() && ch->Empty())) { | |
int got = ch->Get(); | |
std::cout << "Consumer: " << got << std::endl; | |
} | |
}, | |
reinterpret_cast<void *>(&ch)); | |
sched.Run(); | |
return 0; | |
} |
コメント
コメントを投稿