sync.ErrGroupで複数のgoroutineを制御する

Golangの並行処理は強力である一方で同期処理を慎重に実装する必要がある.“Go 言語における並行処理の構築部材”にまとめられているようにGolangは様々な方法でそれを実装することができる.実現したいタスクに合わせてこれらを適切に選択する必要がある.

この同期処理の機構として新たにgolang.org/x/sync/errgroupというパッケージが登場した.実際に自分のツールで使ってみて便利だったので簡単に紹介する.

使いどころ

時間のかかる1つのタスクを複数のサブタスクとして並行実行しそれらが全て終了するのを待ち合わせる処理(Latch)を書きたい場合にerrgroupは使える.その中でも「1つでもサブタスクでエラーが発生した場合に他のサブタスクを全てを終了しエラーを返したい」(複数のサブタスクが全て正常に終了して初めて1つの処理として完結する)場合が主な使いどころである.

実例

ここでは例として複数のworkerサブタスクをgoroutineで並行実行しそれらすべての終了を待ち合わせるという処理を考える.最初に今までのやりかたとしてsync.WaitGroupを使った実装を,次にerrgroupを使った実装を紹介する.

sync.WaitGroup

goroutineの待機処理としてよく使われるのがsync.WaitGroupである.その名前の通り指定した数の処理(goroutine)の実行の待ち合わせに利用する.例えば以下のように書くことができる.

var wg sync.WaitGroup
errCh := make(chan error, 1)
for i := 0; i < 10; i++ {
    wg.Add(1)
    go func(i int) {
        defer wg.Done()
        worker(i)
    }(i)
}

wg.Wait()

新たなgoroutineを生成する度にAddでWaitGroupをインクリメントし処理が終了したときにDoneを呼ぶ.そして全てのworkerの処理が終了するまでWaitで処理をブロックする.これはchannelを使っても実装できるがsync.WaitGroupを使ったほうが読みやすいことも多い.

ではworkerでのエラーを処理をしたい場合にはどうするのが良いだろうか? sliceでエラーをため終了後にそれを取り出す,errorのchannelを作り外部でそれを受け取るといったパターンが考えられる.何にせよ別途自分で処理を実装する必要がある.

sync.ErrGroup

errgroupパッケージを使う以下のように書くことができる.

eg := errgroup.Group{}
for i := 0; i < 10; i++ {
    i := i
    eg.Go(func() error {
        return worker(i)
    })
}

if err := eg.Wait(); err != nil {
    log.Fatal(err)
}

errgroupではGoメソッドを使いサブタスクを実行する.ここに与えられた処理は新たなgoroutineで実行される.Waitsync.WaitGroupと同様にGoメソッドで実行された全てのサブタスクが終了するまで処理をブロックする.そして(もしあれば)Goメソッド内で最初に返されたnon-nilのerrorを返す.

errgroupが強力なのはcontextパッケージを使い,1つのサブタスクでエラーが発生したときに他の全てのサブタスクをキャンセルできるところである.例えば以下のように書くことができる.

eg, ctx := errgroup.WithContext(context.TODO())
for i := 0; i < 10; i++ {
    i := i
    eg.Go(func() error {
        return workerContext(ctx, i)
    })
}

if err := eg.Wait(); err != nil {
    log.Println(err)
}

違いは新たなGroupをWithContextで生成し,かつ同時に新たなContextも生成している部分である.またworkerworkerContextとしContextを渡せるようにしている.これにより1つのサブタスクでエラーが発生した場合に生成したContextをキャンセルすることができる.つまり(workerContextをちゃんと実装すれば)適切なリソース解放を行い処理を終了させることができる.

まとめ

これだけでなくGoDocのExampleにも挙げられているようにpipeline処理にも使うことができる.これらの処理はGolangではよく実装するパターンでありもしかしたら標準に仲間入りするかもしれない.

とりあえずサブタスクを全て実行してしまいたい,発生したエラーは全て取り出したい,といった場合は別のパターンを実装するのが良い.

参考