【Go】学習メモ2:goroutine(ProducerとConsumer)
goroutineのproducerとconsumerについて少し悩んだのでメモします。
間違い等ありましたらご指摘いただけると幸いです。
1 全体の流れ
ProducerとConsumerはそれぞれgoroutineを内包し、channelを介してタスクの生成と処理を受け渡す流れになります。次のようなイメージです。Producer内で記述されたタスクに従い、channelに処理結果が格納され、さらにConsumer内の記述によってchannelから結果を取り出し処理を施します。
func main()とgoroutineであるProducer または Consumerは各々処理が分離されており、何もしないとgoroutineが完了する前にfunc main()が終了してしまう問題があります。
そうならないようfunc main()とgoroutineの連携の為には、sync.WaitGroupを宣言することで、func main()はgoroutineの処理が完了(wg.Done())するまで待機(wg.Wait())します。
繰り返しになりますが、もしこの連携がない場合は双方が分離されたまま処理が進むのでfunc main()が先に終了しgoroutineのプロセスが完了しないままプログラムが終了します。
2 channelのclose
Producer()側でchannelに処理結果を複数送信し、Consumer()側でchannelを繰り返し取り出し処理を施す場合はfor文のRangeが使えます。
しかし、最終的にchannelをclose()で閉じないとchannelはまだ格納できる受信状態にあるため、Rangeによる終わりが定まらず繰り返し処理が正しく機能しません。
次のイメージの通りです。従って、Consumer()内のchannelから結果を取り出す処理を完了させるにはchannelのclose()が必要です。
3 サンプルコードによるポイントの説明
具体的にConsumerとProducerのサンプルコードで説明します。
処理のポイントを〇数字でつけてみました。func main()によって処理が進められていきます。
①:wg.Add(1)によってループ回数ごとに並列化処理数が追加されproducer()が実行されます。
②:producer()内では記述された処理の結果がchannelに送信されます。まだchannelがクローズされていないので、channelは受信待ちの状態のままです。
③:続いてfunc main()からconsumer()が呼ばれます。consumer内のgoroutineが終了するまで間はwg.wait()によってfunc main()は待機。
そして、wg.Done()によってgoroutineの処理の完了がfunc main()へと伝わり、func main()が処理を再開します。
ここが重要なポイントですが、まだchannelは受信待ちで開いた状態の為、rangeの範囲が定まっていないのでfor文自体は処理が完了していません。
④:ここでclose(ch)によってchannelがクローズされたことで、channelに格納されている処理結果の数が確定し、for文も終了となります。
⑤:func main()に記述したtime.Sleep()がないとConsumer()の最後に記述したfmt.Println()が実行されません。何故なら、func main()とgoroutineであるconsumerは処理が分離されており、func main()が先に終了し、プログラムが完了してしまうため、consumer側のfmt.Println()が実行されないからです。そこで、func main()側でtime.Sleepを実行することでconsumer()のgoroutine後の処理を行う猶予を与えています。イメージは次の通りです。
4 使ったサンプルコード
最後にこれまで説明したサンプルコードを載せておきます。
今回は以上になります。ありがとうございました。
package main import ( "fmt" "sync" "time" ) func producer(ch chan int, i int) { // Something ch <- i * 2 } func consumer(ch chan int, wg *sync.WaitGroup) { for i := range ch { fmt.Println("process", i*1000) wg.Done() } fmt.Println("consumer:end") } func main() { var wg sync.WaitGroup ch := make(chan int) // Producer for i := 0; i < 10; i++ { wg.Add(1) go producer(ch, 1) } // Consumer go consumer(ch, &wg) wg.Wait() close(ch) time.Sleep(2 * time.Second) }