Java並行処理プログラミング ―その「基盤」と「最新API」を究める― をぱらぱらめくっていて出てきた、以下二つをCoroで実装してみた。
- 並行処理タスクの実行方法を抽象化した Executor
- タスクの生成とその消費を分離する CompletionService
実装とか例とかぺたぺた貼ってたらすごい長くなっちった。 gistにもあります > http://gist.github.com/191924 。
まとめ (長いので先に結論)
ExecutorやCompletionServiceのようなパターンにそってプログラムをかくと、すっきりと並行処理プログラムを構成できて良い感じ。Coroでも簡単に実装できる。
これまでやられたことに名前を付けただけという話もあるけど、名前をつけて区別して理解しておくと、コード読むときの思考が整理されて良いとおもう。
Executor
Executorとは?
- タスクの実行を抽象化するもの (必ず並行実行される必要はない)
- Executorにタスクの実行を依頼すると実装に応じたアルゴリズムを用いてタスクを実行してくれる
- JavaのExecutorの実装のひとつのThreadPoolExecutorは、タスクの実行を依頼するとそれらをスレッドで並行に実行してくれる
- タスク == Worker 、 Executor == WorkerManager って感じ
使い方
タスクを関数リファレンスで渡すとstartしたときにそれらが並列で実行される。Executorを初期化するときにわたす数値は、Executorが同時に実行できるタスクの最大数を指す。
my $executor = Executor->new(10); # 最大同時に10タスクが動作する for (0..$all_task_nums) { $executor->execute(sub { # 並行に実行するタスク }); } $executor->start; # executor を start
実装
基本的には、関数リファレンスでわたされたタスクをasyncで非同期実行する仕組み。asyncで実行するときにCoro::Semaphoreでつくったセマフォからガードを取得することで、同時に実行するたすくの数を制御する。
> http://gist.github.com/191924 の下の方のExecutor.pm
package Executor; use strict; use warnings; use Class::Accessor qw(antlers); use Coro; use Coro::AnyEvent; use Coro::Channel; use Coro::Semaphore; has lock => (is => 'ro'); sub new { my $class = shift; my $size = shift || 10; my $self = $class->SUPER::new({ lock => Coro::Semaphore->new($size), @_, }); $self; } sub start { schedule } sub execute { my ($self, $code) = @_; my $coro = async { my $guard = $self->lock->guard; $code->(); }; } sub submit { my ($self, $code, $on_done) = @_; my $f = $self->create_future($code, $on_done); $f; } sub create_future { my $self = shift; my $code = shift; my $on_done = shift; my $result_store = Coro::Channel->new; my $result = sub { $result_store->get(); }; $self->execute( sub { my $r = $code->(); $result_store->put($r); $on_done->($result) if $on_done; }); $result; } 1;
CompletionService
CompletionServiceとは
- タスクの生成とその消費を分離する
- Executorでタスクの処理結果を得るにはタスクをひとつひとつポーリングしないといけない
- => タスクの生成ごとに結果を消費する準備がいる
- CompletionServiceはExecutorにqueueを足したようなもので、タスクが完了すると結果がqueueに入る
- => タスクの処理結果を消費するにはそのqueueをみていれば良い
- queueのトップの値をpollしておいて、値が入ったときに取り出すようすれば、タスクが完了するやいなや、その結果を使った次の処理をおこなうことができる
使い方
事前にExecutorを作っておき、CompletionServiceに渡して使う。CompletionServiceオブジェクトのtakeメソッドを呼ぶと、結果queueから値を取り出す。queueが空っぽだと、takeはブロックして結果がはいるまで待つ。
以下の例は、データのダウンロードを担当するCompletionServiceと、それをファイルに保存するCompletionServiceの二つを走らせている。ダウンロード用サービスのタスクが完了すると、ダウンロード用サービスのqueueからタスクの結果がtakeできるようになる。
takeをpollしておけば、ダウンロード用サービスのタスクが終わるやいなや、その結果を利用した次の処理を行うことができる。
CompletionServiceを使うと、ひとつの大きな処理を小さなタスクに切り刻んでそれらを順番に実行させ、かつ各々のタスクは並行に動作するみたいな処理を書くことが出来る。うまくタスクを分けると並列度が向上すると思う(この例だとほとんどIOバウンドなのでそんなに向上してなさそう)。Executorは共通なので、同時に走るタスクの総数は1箇所で制御できる。
use Executor; use CompletionService; my $executor = Executor->new(10); # 全体で動作するWorkerの総数 # ダウンロード用サービス my $download_service = CompletionService->new( executor => $executor, ); # ファイル保存用サービス my $filesave_service = CompletionService->new( executor => $executor, ); for my $url (@urls) { $download_service->submit(sub { # $url のデータをメモリ上にダウンロードする処理 }); } # takeをpollしておき # 終了したタスクから結果をとりだす while (my $r = $download_service->take) { my $result = $r->(); $filesave_service->submit( sub { # $result から得られたデータをファイルに記録する }); } $executor->start;
実装
内部にもつExecutorにタスクを投げる。タスクの終了にフックしてqueueの実装であるCoro::Channelに結果を値をいれる。takeは実際にはCoro::Channel#getを呼んでいる。Coro::Channel#getはCoro::Channel#sizeが0であれば値が取得できるまでブロックする。
package CompletionService; use strict; use warnings; use Class::Accessor qw(antlers); use Coro; use Coro::AnyEvent; use Coro::Channel; use Executor; has queue => (is => 'ro'); has executor => (is => 'ro'); sub new { my $class = shift; my $self = $class->SUPER::new({ queue => Coro::Channel->new, executor => Executor->new, @_, }); $self; } sub start { shift->executor->start } sub take { my ($self) = @_; $self->queue->get; } sub submit { my ($self, $code) = @_; $self->executor->submit($code, sub { my $result = shift; $self->queue->put($result); }); } 1;