Go言語でCPU数に応じて並列処理数を制限する
負荷のかかる処理を制限なしに並列化しても意味ない.処理の並列数を予測可能な場合は,当たりをつけて最適化するのもよいが,不明確な場合は,CPU数による制限が単純な1つの解になる.
TL;DR
CPU数に応じたバッファ長のChannelを使ってセマフォを実装する.
実例
gox
はGo言語製のツールを並列コンパイルするツール.コンパイルの処理は重いため,デフォルトで並列処理数をCPU数で制限している.
簡単な例
例えば,以下のような単純な並列処理を考える.heavy()
(重い処理)を並列で実行する.
package main
import (
"fmt"
"sync"
"time"
)
func heavy(i int) {
fmt.Println(i)
time.Sleep(5 * time.Second)
}
func main() {
var wg sync.WaitGroup
for i := 0; i <= 100; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
heavy(i)
}(i)
}
wg.Wait()
}
この並列処理の同時実行数をCPU数で制限する.
まず,利用可能なCPUのコア数は,runtimeパッケージのNumCPU()
で取得できる.
func NumCPU() int
次に,CPU数をバッファ長としたChannelを作成する.
cpus := runtime.NumCPU()
semaphore := make(chan int, cpus)
後は,heavy()
をChannelへの送受信で囲む.これで,CPU数だけバッファが溜まると,Channelへの送信がブロックされ,新しい並列処理の開始もブロックされる.
最終的な実装は以下のようになる.
package main
import (
"fmt"
"sync"
"time"
"runtime"
)
func heavy(i int) {
fmt.Println(i)
time.Sleep(5 * time.Second)
}
func main() {
var wg sync.WaitGroup
cpus := runtime.NumCPU()
semaphore := make(chan int, cpus)
for i := 0; i <= 100; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
semaphore <- 1
heavy(i)
<-semaphore
}(i)
}
wg.Wait()
}
マルチコアで実行する
最後にちょっとCPU関連で別の話題.現状,goroutineのスケジューラはマルチコアを最適に使うようになっていないらしい(“Why does using GOMAXPROCS > 1 sometimes make my program slower?").そのため,デフォルトの設定では使用するCPUのコア数は1になっている.
これを変更するには,runtimeパッケージのGOMAXPROCS()
を使う,もしくは環境変数 (GOMAXPROCS
) を設定する.
func GOMAXPROCS(n int) int
利用可能なCPUを全て使って処理を実行するには,以下のようにする.
cpus := runtime.NumCPU()
runtime.GOMAXPROCS(cpus)
将来的には,スケジューラがいい感じにしてくれるっぽい.