Go言語でCPU数に応じて並列処理数を制限する

負荷のかかる処理を制限なしに並列化しても意味ない.処理の並列数を予測可能な場合は,当たりをつけて最適化するのもよいが,不明確な場合は,CPU数による制限が単純な1つの解になる.

TL;DR

CPU数に応じたバッファ長のChannelを使ってセマフォを実装する.

実例

goxはGo言語製のツールを並列コンパイルするツール.コンパイルの処理は重いため,デフォルトで並列処理数をCPU数で制限している.

簡単な例

例えば,以下のような単純な並列処理を考える.heavy()(重い処理)を並列で実行する.

package main

import (
        "fmt"
        "sync"
        "time"
)

func heavy(i int) {
        fmt.Println(i)
        time.Sleep(5 * time.Second)
}

func main() {
        var wg sync.WaitGroup    
        for i := 0; i <= 100; i++ {
            wg.Add(1)
            go func(i int) {
                defer wg.Done()
                heavy(i)
            }(i)
        }
        wg.Wait()
}

この並列処理の同時実行数をCPU数で制限する.

まず,利用可能なCPUのコア数は,runtimeパッケージのNumCPU()で取得できる.

func NumCPU() int

次に,CPU数をバッファ長としたChannelを作成する.

cpus := runtime.NumCPU()
semaphore := make(chan int, cpus)

後は,heavy()をChannelへの送受信で囲む.これで,CPU数だけバッファが溜まると,Channelへの送信がブロックされ,新しい並列処理の開始もブロックされる.

最終的な実装は以下のようになる.

package main

import (
        "fmt"
        "sync"
        "time"
        "runtime"
)

func heavy(i int) {
        fmt.Println(i)
        time.Sleep(5 * time.Second)
}

func main() {
        var wg sync.WaitGroup
        cpus := runtime.NumCPU()
        semaphore := make(chan int, cpus)
        for i := 0; i <= 100; i++ {
            wg.Add(1)
            go func(i int) {
                defer wg.Done()
                semaphore <- 1
                heavy(i)
                <-semaphore
            }(i)
        }
        wg.Wait()
}

マルチコアで実行する

最後にちょっとCPU関連で別の話題.現状,goroutineのスケジューラはマルチコアを最適に使うようになっていないらしい(“Why does using GOMAXPROCS > 1 sometimes make my program slower?”).そのため,デフォルトの設定では使用するCPUのコア数は1になっている.

これを変更するには,runtimeパッケージのGOMAXPROCS()を使う,もしくは環境変数 (GOMAXPROCS) を設定する.

func GOMAXPROCS(n int) int

利用可能なCPUを全て使って処理を実行するには,以下のようにする.

cpus := runtime.NumCPU()
runtime.GOMAXPROCS(cpus)

将来的には,スケジューラがいい感じにしてくれるっぽい.

参考