読者です 読者をやめる 読者になる 読者になる

goroutineで複数の値を並列に生成して全部生成できたら結果を返す

表題のような処理はたいへん良くあって、似たようなものをみなさん書かれていると思う。自分のコードでも何度か出てきたので、ParallelGenerateという関数に抽象化しつつまとめてみたというメモ的エントリーです。

ParallelGenerate

ParallelGenerateは値の生成手段の列を受け取って、値を並列に生成し、結果の列を受け取るためのチャンネルを返す、汎用的な関数。

import (
	"sync"
)

type Generator interface {
	Generate() (interface{}, error)
}

func ParallelGenerate(generators []interface{}) chan []interface{} {
	generated := make(chan interface{})
	finish := make(chan bool)
	results := make(chan []interface{})

	go func() {
		generatedValues := make([]interface{}, 0)

		for {
			select {
			case value := <-generated:
				generatedValues = append(generatedValues, value)
			case <-finish:
				results <- generatedValues
				return
			}
		}
	}()

	go func() {
		var wg sync.WaitGroup
		for _, g := range generators {
			wg.Add(1)
			go func(g interface{}) {
				value, err := g.(Generator).Generate()
				if err != nil {
					log.Printf("Generation failed")
				}
				generated <- value
				wg.Done()
			}(g)
		}
		wg.Wait()
		finish <- true
	}()

	return results
}
  • Generator interfaceは値を生成するGenerateメソッドを要求する
  • ParallelGenerate関数はGeneratorの列を受け取り、値の生成がすべて完了したときに、生成された値の列を受信するチャンネルを返す
  • 値の生成は並列に行われる
  • 真にparallelかはGOMAXPROCS環境変数に依存するが、GOMAXPROCS=1でもgoroutineがgoのランタイムによってスケジューリングされるのでconcurrentに動作する

Generator interfaceの実装の例

HttpContentGeneratorは指定されたURLをHTTP GETして得られた結果を生成するGenerator。

import (
	"io/ioutil"
	"net/http"
)

type HttpContentGenerator struct {
	url string
}

func (g *HttpContentGenerator) Generate() (interface{}, error) {
	resp, err := http.Get(g.url)
	if err != nil {
		return "", err
	}
	defer resp.Body.Close()
	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return "", err
	}
	return string(body), nil
}

func main() {
	generators := []interface{}{
		&HttpContentGenerator{"http://www.hatena.ne.jp"},
		&HttpContentGenerator{"http://hatenablog.com"},
	}
	results := ParallelGenerate(generators)
	contents := <-results
	log.Println( contents )
}

上記のコードを実行すると、"http://www.hatena.ne.jp" と "http://hatenablog.com" へのHTTP GETが並列に実行される。両方のコンテンツがそろうとcontents変数に結果の値が代入される。

ParallelGenerateの解説

ParallelGenerateの実装としては、まず以下の様なものが考えられる。

func ParallelGenerate(generators []interface{}) []interface{} {
	generatedValues := make([]interface{}, 0)
	var wg sync.WaitGroup
	for _, g := range generators {
		wg.Add(1)
		go func(g interface{}) {
			value, err := g.(Generator).Generate()
			if err != nil {
				log.Printf("Generation failed")
			}
			generatedValues = append(generatedValues, value)
			wg.Done()
		}(g)
	}
	wg.Wait()

	return generatedValues
}
  • goroutine で生成タスクをそれぞれ同時に開始する
  • sync.WaitGroupを使って生成タスクがすべて完了するのを待つ
  • 各生成タスクは値が生成できたらgeneratedValuesに値を追加する

このように実装すると、generatedValues へのアクセスが各生成goroutine間で同期されておらず、generatedValuesに関してデータ競合が発生する。

そこで、generatedValuesの操作を、channelを通じて、別のgoroutineに依頼するようにする。

func ParallelGenerate(generators []interface{}) []interface{} {
	generated := make(chan interface{})

	go func() { // このgoroutineだけかgeneratedValuesにアクセスする
		generatedValues := make([]interface{}, 0)

		for {
			select {
			case value := <-generated:
				generatedValues = append(generatedValues, value)
		}
	}()

	go func() {
		var wg sync.WaitGroup
		for _, g := range generators {
			wg.Add(1)
			go func(g interface{}) {
				value, err := g.(Generator).Generate()
				if err != nil {
					log.Printf("Generation failed")
				}
				generated <- value // 結果はチャンネルに送る
				wg.Done()
			}(g)
		}
		wg.Wait()
	}()

	return ...
}

generatedValuesの対する操作を1つのgroutineがチャンネルを通じて担当することで、generatedValuesはgoroutine間で同期されるようになった。しかし、生成タスクによって生成された値が、すべてgeneratedValuesに追加されたのかが判定できず、生成された値の列を返すことができない。

その部分を考えたのが、ParallelGenerate の最終形となる。wg.Wait()ですべての生成タスクが完了したことを、generatedValuesを操作しているgoroutineにfinishチャンネル経由で通知する。finishチャンネルからの通知を受け取ると、完成したgeneratedValuesをresultsチャンネルに送信する。

ParallelGenerateは生成された結果の列を受け取ることのできるresultsチャンネルを返す。結果が必要なコードが必要に応じてresultsチャンネルから値を受信する。これはJavaなどのFutureのような振る舞い。

性能

goroutineを利用するとタスクが並列(もしくは並行)に動作することになっているが、実際に性能は上がっているのかはわからない。例に上げたHttpContentGeneratorとParallelGeneratorについて確認してみる。

準備

http://localhost:5000?sleep=5 のようにすると5秒まってレスポンスを返却してくれるような実験用のWebアプリケーションを準備する。

Perlを使うのであれば、以下の様な PSGIアプリケーションを準備して、

use strict;
use warnings;
use Plack::Request;
use Time::HiRes qw(sleep);

sub {
    my ($env) = @_;
    my $req = Plack::Request->new($env);

    my $sleep = $req->parameters->{sleep} || 0;
    sleep $sleep if $sleep;

    return [ 200, [], ['ok'] ];
}

以下のように、同時に複数のリクエストを処理できるように設定したHTTPサーバで動作させる。

plackup -s Starlet --max-workers=30
計測

https://gist.github.com/hakobe/8099184 のようなコードを用いて計測した。計測対象の処理は以下のとおり。

それぞれの処理を100回実行して処理の実行完了に要した時間の平均を算出した。

結果
処理内容 処理の実行完了に要した時間(ms)
処理1 - serial/sleep(1) 10030.407
処理2 - serial/sleep(0) 20.681
処理3 - parallel/sleep(1) 1012.447
処理4 - parallel/sleep(0) 14.724
考察
  • 処理1は1秒かかるリクエストを逐次的に10回実行しているため、10秒ほど処理時間がかかっている一方、処理3は1秒ほどしか処理時間がかかっておらず、確かに同時に10個のリクエストが処理されているような速度がでている。
  • すぐに結果が返却されるリクエストを10回実行した場合でも、逐次的に実行される処理2と、並列に実行される処理4の処理時間は同程度である。HTTPリクエスト程度の処理では並列化によるオーバーヘッドは無視できるようだ。

まとめ

  • goroutineで複数の値を並列に生成して全部生成できたら返すという処理はよくやられてる
  • 自分なりにParallelGenerateという関数に抽象化してみた
  • 本当に性能がよくなるのか確かめてみたら、良くなってた

実はデータ競合あるとか、チャンネル通信多くて効率わるいとかあったら教えてもらいたい。