スキップしてメイン コンテンツに移動

libcoro で並行処理プログラムを書く

libcoro という C のライブラリがある。Perl Mongers にはおなじみ (だった) 協調スレッド実装である Coro.pm のバックエンドとして使われているライブラリで、作者は Coro と同じく Marc Lehmann 氏。

coro というのは Coroutine (コルーチン) の略で、要するに処理の進行を明示的に中断して別の実行コンテキストに切り替えたり、そこからさらに再開できる機構のことである。言語やプラットフォームによって Fiber と呼ばれるものとほぼ同義。

(ネイティヴ) スレッドとの違いはとどのつまり並行処理と並列処理の違いで、スレッドは同時に複数の実行コンテキストが進行し得るがコルーチンはある時点では複数の実行コンテキストのうち高々一つだけが実行され得る。 スレッドに対するコルーチンの利点は主に理解のし易さにある。スレッドの実行中断と再開は予測不可能なタイミングで起こるため、メモリその他の共有資源へのアクセスが常に競合し得る。一方コルーチンは自発的に実行を中断するまでプロセスの資源を独占しているため、コンテキスト・スイッチをまたがない限り共有資源の排他制御や同期などを考えなくて良い。

同時に一つのコルーチンしか実行されないということは、プロセッサのコア数に対して処理がスケールアウトしないことを意味する。ただしシングルスレッドのプログラムでも IO などの間はプロセッサが遊んでいるため、非同期 IO とコルーチンを組み合わせるなどして待ち時間に別の処理を行わせることで効率を高められることが多い。 また1コアでの性能に関しては、コンテキスト・スイッチの回数が減り、またスイッチング自体もユーザモードで完結するため、スレッドよりも高速である場合が多い。このため「軽量スレッド」とも呼ばれることがある。

libcoro の特徴

C で利用できるコルーチン実装は複数あって、Wikipedia にある Coroutine の記事 を見ても片手では足りない数が挙げられている。

libcoro がその中でどう特徴付けられるかというとポータビリティが挙げられる。

実装のバックエンドは Windows の Fiber や POSIX の ucontext の他、setjmp/longjmppthread 果てはアセンブラによる実装が選択でき、API は共通である。 少なくとも setjmp/longjmp は C90 の標準ライブラリ関数なので現代の OS であれば利用できるはずだ。

ライブラリはヘッダファイルと実装を収めたソースコードファイル1つずつからなる。 CVS レポジトリには Makefile すら含まれていない。ビルドするにはバックエンドを選択するプリプロセッサマクロを定義するだけで良い:

# CORO_SJLJ は setjmp/longjmp を使った実装
bash-3.2$ clang -DCORO_SJLJ -c -o coro.o coro.c
bash-3.2$ ar crs libcoro.a coro.o

あとは使用するプログラムとリンクするだけ:

# マクロはライブラリのコンパイル時と同じものを与える必要がある
bash-3.2$ clang++ --std=c++11 -DCORO_SJLJ -I. coro_usage.cc -L. -lcoro -static

API

シンプルなライブラリにはシンプルな API しかない。できるのは「一つのコルーチンから別のコルーチンを指定してコンテキスト・スイッチする」ことだけである。

コルーチンを一つ生成して実行するには以下の手順を踏む。ドキュメントはヘッダファイル内のコメントのみだが素晴らしく詳細である。

1. スタックを初期化する

まずコルーチンが使用する専用のスタックを確保する。スタック領域を確保して coro_stack 構造体を初期化する関数 coro_stack_alloc を使用する。 スタックは第二引数に指定した個数のポインタが保持できる大きさになる。要するに指定した数に8倍したバイト数が確保される。通常は考えるのが面倒なので0を指定するとよしなに確保してくれる。 戻り値は確保の成否を返す。

struct coro_stack stack;
if (!coro_stack_alloc(&stack, 0)) {
  perror("coro_stack_alloc");
}

2. コルーチンを作成する

コルーチン自身は coro_context 型で表現される。これを初期化する関数は coro_create である。

第一引数に初期化したいコルーチンへのポインタを指定する。残りの引数はコルーチンとして実行すべき関数、コルーチンに渡す引数へのポインタ、確保したポインタのサイズ、そして確保したスタック領域へのポインタである。 最後の二つは前もって確保した coro_stacksptrssze メンバがそれぞれ対応する。malloc やなんかで勝手に確保したメモリ領域を渡すと落ちるので注意。

coro_context context;
coro_create(
    &context,
    [](void *) { ... },
    nullptr,
    stack.sptr,
    stack.ssze);

C++ ユーザに残念なお知らせ: 関数として lambda は使えるが変数はキャプチャできない。何故かというに、coro_create の第二引数の型 coro_funcvoid (*)(void *) の typedef に過ぎないので lambda から coro_func への型変換 operator void(*)(void *) が必要だからである (ところでこれが合法なメンバ関数名なのはひどすぎると思う)。

coro_func が受け取る void * には coro_create の第三引数が渡される。外部の環境 (というより呼出し元のコール・スタック) をコルーチン内から参照したければここに必要なものを詰めてお土産に持たせることになる。このあたりは C なので仕方がない。

3. コルーチンを実行する

コルーチンの実行を開始するにはちょっと工夫が要る。libcoro にはコンテキスト・スイッチする関数しかないので、スイッチ元となるコルーチンが要るからである。

それで実行開始と終了のために特別な「空の」コルーチンを生成する必要がある。これは coro_context を null ポインタと0で初期化することで得られる:

coro_context empty;
coro_create(&empty, nullptr, nullptr, nullptr, 0);

これで準備ができたので、空のコルーチンから目的のコルーチンへコンテキスト・スイッチする。その際に使うのは coro_transfer である:

coro_transfer(&empty, &context);

以降再び coro_transfer が呼ばれるまで context が表すコルーチンが独占的に実行される。 コルーチンの実行を終了するときは空のコルーチンへ再度コンテキスト・スイッチすれば良い。

4. リソースを開放する

処理が終わったらメモリの片付けをする。コルーチンは coro_destroy で破棄し、対応するスタックは coro_stack_free で開放する:

coro_destroy(context);
coro_stack_free(stack);

coro_destroy(empty);  // 空のコルーチンに対応するスタックはないので coro_stack_free は不要

注意点

コルーチンへの引数が void * なのでどうやっても型チェックは効かない。また渡したオブジェクトの寿命をコルーチンの実行終了まで保たせるのはプログラマの責任である。

外部の環境をコルーチン内から参照できないので、コンテキスト・スイッチしたいときに coro_transfer に渡す自分自身をどうやって指定するかが問題になる。手っ取り早いのは static coro_context contexts[MAX_COROS] でも作ってまとめておく方法である。真面目にやるなら汎用のスレッドローカルストレージに類する機構を作ってそこに入れておくのが良いと思う。 あるいはコルーチンへの引数としてコルーチン自身へのポインタを渡しても良い。この場合スイッチ先のコルーチンか、あるいはそれを決めるディスパッチャのようなものを一緒に渡す必要がある。

サンプル

お決まりの producer/consumer のサンプル・プログラムを書いた。

libcoro の上に直接並行処理プログラムを書くのはさすがに辛いものがあるので、ちょっと高水準なライブラリを書いてそれを使うことにした。C で書くには人生が短すぎるので C++ で書いた。

単純なラウンドロビン・ディスパッチャを実装してコンテキスト・スイッチする先を考えなくても良いようにした。真面目に並行処理するなら優先度つきキューやらコルーチンの実行状態表やら導入してもっとマシなスケジューリングが要るが考えたくない。 コルーチン間の通信には Coro ライクな Channel 機構を作ってそれを利用した。

自前ライブラリの実装が150行。ユーザーコードである main が30行。標準外ライブラリへの依存はない:

#include <algorithm>
#include <cerrno>
#include <cstring>
#include <memory>
#include <queue>
#include <stdexcept>
#include "coro.h"
class Coro {
public:
Coro(const Coro&) = delete;
Coro& operator=(const Coro&) = delete;
Coro()
: context_(new coro_context),
stack_(new coro_stack { .sptr = nullptr, .ssze = 0 }) {
coro_create(context_.get(), nullptr, nullptr, stack_->sptr, stack_->ssze);
}
Coro(Coro&& other) = default;
Coro(coro_func coro, void *arg = nullptr, std::size_t stack_size = 0)
: context_(new coro_context), stack_(new coro_stack) {
if (!coro_stack_alloc(stack_.get(), stack_size)) {
char error_str[256];
strerror_r(errno, error_str, sizeof(error_str) / sizeof(error_str[0]));
throw std::runtime_error(error_str);
}
coro_create(context_.get(), coro, arg, stack_->sptr, stack_->ssze);
}
~Coro() {
if (context_) { coro_destroy(context_.get()); }
if (stack_) { coro_stack_free(stack_.get()); }
}
Coro& operator=(Coro&& other) = default;
bool operator==(const Coro& other) const noexcept {
return context_ == other.context_ && stack_ == other.stack_;
}
bool operator!=(const Coro& other) const noexcept {
return !operator==(other);
}
coro_context& Context() { return *context_; }
std::size_t StackSize() const { return stack_->ssze; }
void Transfer(Coro& next) {
coro_transfer(context_.get(), &next.Context());
}
private:
std::unique_ptr<coro_context> context_;
std::unique_ptr<coro_stack> stack_;
};
class Scheduler {
public:
class Terminator {
public:
~Terminator() { Scheduler::GetInstance().Terminate(); }
};
Scheduler(const Scheduler&) = delete;
Scheduler(Scheduler&&) = delete;
Scheduler& operator=(const Scheduler&) = delete;
Scheduler& operator=(Scheduler&&) = delete;
void Cede() {
if (!Running()) { return; }
if (queue_.size() <= 1) { return; }
queue_.push(std::move(queue_.front()));
queue_.pop();
queue_.back().Transfer(queue_.front());
}
static Scheduler& GetInstance() {
static Scheduler instance;
return instance;
}
void Register(
coro_func coro, void *arg = nullptr, std::size_t stack_size = 0) {
queue_.emplace(coro, arg, stack_size);
}
void Run() {
if (Running()) { return; }
if (queue_.empty()) { return; }
running_ = true;
main_.Transfer(queue_.front());
}
bool Running() const { return running_; }
void Terminate() {
if (!Running()) { return; }
Coro terminated = std::move(queue_.front());
queue_.pop();
Coro& next = queue_.empty() ? main_ : queue_.front();
if (next == main_) { running_ = false; }
terminated.Transfer(next);
}
private:
Scheduler() = default;
Coro main_;
std::queue<Coro> queue_;
bool running_ = false;
};
template <typename T>
class Channel {
public:
Channel(std::size_t capacity = 0) : capacity_(capacity) {}
void Close() { closed_ = true; }
bool Closed() const { return closed_; }
bool Empty() const { return queue_.empty(); }
T Get() {
while (!Closed() && Empty()) { Scheduler::GetInstance().Cede(); }
if (Closed() && Empty()) {
throw std::runtime_error("Trying fetch data from closed channel.");
}
T value(std::move(queue_.front()));
queue_.pop();
return value;
}
void Put(T value) {
while (CapacityIsSet() && queue_.size() >= capacity_) {
Scheduler::GetInstance().Cede();
}
queue_.push(value);
}
private:
bool CapacityIsSet() const { return capacity_ > 0; }
bool closed_ = false;
std::size_t capacity_;
std::queue<T> queue_;
};
#include <iostream>
int main() {
auto& sched = Scheduler::GetInstance();
Channel<int> ch(5);
sched.Register(
[](void *arg) {
Scheduler::Terminator guard;
auto ch = reinterpret_cast<Channel<int> *>(arg);
for (int i = 0; i < 10; ++i) {
ch->Put(i);
std::cout << "Producer: " << i << std::endl;
}
ch->Close();
},
reinterpret_cast<void *>(&ch));
sched.Register(
[](void *arg) {
Scheduler::Terminator guard;
auto ch = reinterpret_cast<Channel<int> *>(arg);
while (!(ch->Closed() && ch->Empty())) {
int got = ch->Get();
std::cout << "Consumer: " << got << std::endl;
}
},
reinterpret_cast<void *>(&ch));
sched.Run();
return 0;
}
view raw coro.cc hosted with ❤ by GitHub

コメント

このブログの人気の投稿

BuckleScript が ReScript に改称し独自言語を導入した

Via: BuckleScript Good and Bad News - Psellos OCaml / ReasonML 文法と標準ライブラリを採用した JavaScript トランスパイラである BuckleScript が ReScript に改称した。 公式サイトによると改称の理由は、 Unifying the tools in one coherent platform and core team allows us to build features that wouldn’t be possible in the original BuckleScript + Reason setup. (単一のプラットフォームとコアチームにツールを統合することで従来の BuckleScript + Reason 体制では不可能であった機能開発が可能になる) とのこと。要は Facebook が主導する外部プロジェクトである ReasonML に依存せずに開発を進めていくためにフォークするという話で、Chromium のレンダリングエンジンが Apple の WebKit から Google 主導の Blink に切り替わったのと似た動機である (プログラミング言語の分野でも Object Pascal が Pascal を逸脱して Delphi Language になったとか PLT Scheme (の第一言語) が RnRS とは別路線に舵を切って Racket になったとか、割とよくある話である。) 公式ブログの Q&A によると OCaml / ReasonML 文法のサポートは継続され、既存の BuckleScript プロジェクトは問題なくビルドできるとのこと。ただし現時点で公式ドキュメントは ReScript 文法のみに言及しているなど、サポート水準のティアを分けて ReScript 文法を優遇することで移行を推進していく方針である。 上流である OCaml の更新は取り込み、AST の互換性も維持される。将来 ReScript から言語機能が削除されることは有り得るが、OCaml / ReasonML からは今日の BuckleScript が提供する機能すべてにアクセスできる。 現時点における ReScript の ...

macOS で GUI 版 Emacs を使う設定

macOS であっても端末エミュレータ上で CLI 版 Emacs を使っているプログラマは多いと思うが、端末側に修飾キーを取られたり東アジア文字の文字幅判定が狂ってウィンドウ描画が崩れたりなどしてあまり良いことがない。 それなら GUI 版の Emacs.app を使った方がマウスも使える上に treemacs などはアイコンも表示されてリッチな UI になる。 しかし何事も完璧とはいかないもので、CLI だと問題なかったものが GUI だと面倒になることがある。その最大の原因はシェルの子プロセスではないという点である。つまり macOS の GUI アプリケーションは launchd が起動しその環境変数やワーキングディレクトリを引き継ぐので、ファイルを開こうとしたらホームディレクトリ ( ~/ ) でなくルートディレクトリ ( / ) を見に行くし、ホームディレクトリなり /opt/local なりに好き勝手にインストールしたツールを run-* 関数やら shell やら flycheck やらで実行しようとしてもパスが通っていない。 ワーキングディレクトリに関しては簡単な解決策があり、 default-directory という変数をホームディレクトリに設定すれば良い。ただし起動時にスプラッシュスクリーンを表示する設定の場合、このバッファのワーキングディレクトリは command-line-default-directory で設定されており、デフォルト値が解決される前に適用されてしまうので併せて明示的に初期化する必要がある: (setq default-directory "~/") (setq command-line-default-directory "~/") 次にパスの問題だが、まさにこの問題を解決するために exec-path-from-shell というパッケージがある。これを使うとユーザのシェル設定を推定し、ログインシェルとして起動した場合の環境変数 PATH と MANPATH を取得して Emacs 上で同じ値を setenv する、という処理をやってくれる。MELPA にあるので package-install するだけで使えるようになる。 このパッケージは GUI ...

Perl のサブルーチンシグネチャ早見表

Perl のサブルーチン引数といえば実引数への参照を保持する特殊配列 @_ を手続き的に分解するのが長らくの伝統だった。これはシェルの特殊変数 $@ に由来する意味論で、おそらく JavaScript の arguments 変数にも影響を与えている。 すべての Perl サブルーチンはプロトタイプ宣言がない限りリスト演算子なので、この流儀は一種合理的でもあるのだが、実用的にそれで良いかというとまったくそうではないという問題があった; 結局大多数のサブルーチンは定数個の引数を取るので、それを参照する形式的パラメータが宣言できる方が都合が良いのである。 そういうわけで実験的に導入されたサブルーチンシグネチャ機能により形式的パラメータが宣言できるようになったのは Perl 5.20 からである。その後 Perl 5.28 において出現位置がサブルーチン属性の後に移動したことを除けば Perl 5.34 リリース前夜の今まで基本的に変わっておらず、未だに実験的機能のままである。 おまじない シグネチャは前方互換性を持たない (構文的にプロトタイプと衝突している) 実験的機能なのでデフォルトでは無効になっている。 そのため明示的にプラグマで利用を宣言しなければならない: use feature qw/signatures/; no warnings qw/experimental::signatures/; どの途みんな say 関数のために使うので feature プラグマは問題ないだろう。実験的機能を断りなしに使うと怒られるので、 no warnings で確信犯であることをアピールする必要がある。 これでプラグマのスコープにおいてサブルーチンシグネチャ (と :prototype 属性; 後述) が利用可能になり、 従来のプロトタイプ構文が無効になる。 使い方 対訳を載せておく。シグネチャの方は実行時に引数チェックを行うので厳密には等価でないことに注意: # Old School use feature qw/signatures/ 1 sub f { my ($x) = @_; ... } sub f($x) { ... } 2 sub f { my ($x, undef, $y) = @_...

Perl 7 より先に Perl 5.34 が出るぞという話

Perl 5 の次期バージョンとして一部後方互換でない変更 (主に間接オブジェクト記法の削除とベストプラクティスのデフォルトでの有効化) を含んだメジャーバージョンアップである Perl 7 がアナウンスされたのは昨年の 6 月 のことだったが、その前に Perl 5 の次期周期リリースである Perl 5.34 が 5 月にリリース予定 である。 現在開発版は Perl 5.33.8 がリリースされておりユーザから見える変更は凍結、4 月下旬の 5.33.9 で全コードが凍結され 5 月下旬に 5.34.0 としてリリース予定とのこと。 そういうわけで事前に新機能の予習をしておく。 8進数数値リテラルの新構文 見た瞬間「マジかよ」と口に出た。これまで Perl はプレフィクス 0 がついた数値リテラルを8進数と見做してきたが、プレフィクスに 0o (zero, small o) も使えるようになる。 もちろんこれは2進数リテラルの 0b や 16進数リテラルの 0x との一貫性のためである。リテラルと同じ解釈で文字列を数値に変換する組み込み関数 oct も` 新構文を解するようになる。 昨今無数の言語に取り入れられているリテラル記法ではあるが、この記法の問題は o (small o) と 0 (zero) の区別が難しいことで、より悪いことに大文字も合法である: 0O755 Try / Catch 構文 Perl 5 のリリース以来 30 年ほど待たれた実験的「新機能」である。 Perl 5 における例外処理が特別な構文でなかったのは予約語を増やさない配慮だったはずだが、TryCatch とか Try::Tiny のようなモジュールが氾濫して当初の意図が無意味になったというのもあるかも知れない。 use feature qw/ try / ; no warnings qw/ experimental::try / ; try { failable_operation(); } catch ( $e ) { recover_from_error( $e ); } Raku (former Perl 6) だと CATCH (大文字なことに注意) ブロックが自分の宣言されたスコープ内で投げられた例外を捕らえる...

Project Euler - Problem 35

問題 原文 How many circular primes are there below one million? 日本語訳 100万未満の巡回素数は何個か? 解答 回転させた数値がすべて素数ということは、すべての桁が奇数でなければいけません(ただし2を除く)。 追記 匿名氏にコメントでご指摘頂いたのでコードを一部修正しました。 いずれかの桁に5がある場合も、回転させると必ず5の倍数が現れるので除外できます。 もっと追記 前の修正に間違いが入っているのをご指摘頂いたので修正しました。 5自体は素数なので、巻き添えで除外してはいけません。 #!/usr/bin/env perl use strict; use warnings; use feature qw/say state/; use List::MoreUtils qw/all none/; sub is_prime($) { state %memos; my $n = shift; return 0 if $n < 2; return 1 if $n == 2; return 1 if $n == 3; return $memos{$n} if exists $memos{$n}; $memos{$n} = none { $n % $_ == 0 } 2 .. sqrt $n; } sub rotate($) { my $n = shift; substr($n, 1) . substr($n, 0, 1); } sub rotations($) { my $n = shift; my %seen = ($n => 1); $seen{$n} = 1 until exists $seen{$n = rotate $n}; keys %seen; } sub is_circular_prime($) { state %memos; my $n = shift; return 0 if $n =~ /[024568]/ and $n != 2 and $n != 5; return $memos{$n} if exists $memos{$n}; my ...