【Go】学習メモ2:goroutine(ProducerとConsumer)

goroutineのproducerとconsumerについて少し悩んだのでメモします。
間違い等ありましたらご指摘いただけると幸いです。

1 全体の流れ

ProducerとConsumerはそれぞれgoroutineを内包し、channelを介してタスクの生成と処理を受け渡す流れになります。次のようなイメージです。

f:id:k-bind:20200525151113p:plain
consumerとproducerの流れ(イメージ)
Producer内で記述されたタスクに従い、channelに処理結果が格納され、さらにConsumer内の記述によってchannelから結果を取り出し処理を施します。
func main()とgoroutineであるProducer または Consumerは各々処理が分離されており、何もしないとgoroutineが完了する前にfunc main()が終了してしまう問題があります。
そうならないようfunc main()とgoroutineの連携の為には、sync.WaitGroupを宣言することで、func main()はgoroutineの処理が完了(wg.Done())するまで待機(wg.Wait())します。
繰り返しになりますが、もしこの連携がない場合は双方が分離されたまま処理が進むのでfunc main()が先に終了しgoroutineのプロセスが完了しないままプログラムが終了します。

2 channelのclose

Producer()側でchannelに処理結果を複数送信し、Consumer()側でchannelを繰り返し取り出し処理を施す場合はfor文のRangeが使えます。
しかし、最終的にchannelをclose()で閉じないとchannelはまだ格納できる受信状態にあるため、Rangeによる終わりが定まらず繰り返し処理が正しく機能しません。
次のイメージの通りです。

f:id:k-bind:20200525152721p:plain
channelのクローズについて
従って、Consumer()内のchannelから結果を取り出す処理を完了させるにはchannelのclose()が必要です。

3 サンプルコードによるポイントの説明

具体的にConsumerとProducerのサンプルコードで説明します。
処理のポイントを〇数字でつけてみました。

f:id:k-bind:20200525160208p:plain
consumerとproducerのサンプルコード
func main()によって処理が進められていきます。
①:wg.Add(1)によってループ回数ごとに並列化処理数が追加されproducer()が実行されます。
②:producer()内では記述された処理の結果がchannelに送信されます。まだchannelがクローズされていないので、channelは受信待ちの状態のままです。
③:続いてfunc main()からconsumer()が呼ばれます。consumer内のgoroutineが終了するまで間はwg.wait()によってfunc main()は待機。
そして、wg.Done()によってgoroutineの処理の完了がfunc main()へと伝わり、func main()が処理を再開します。
ここが重要なポイントですが、まだchannelは受信待ちで開いた状態の為、rangeの範囲が定まっていないのでfor文自体は処理が完了していません。
④:ここでclose(ch)によってchannelがクローズされたことで、channelに格納されている処理結果の数が確定し、for文も終了となります。
⑤:func main()に記述したtime.Sleep()がないとConsumer()の最後に記述したfmt.Println()が実行されません。何故なら、func main()とgoroutineであるconsumerは処理が分離されており、func main()が先に終了し、プログラムが完了してしまうため、consumer側のfmt.Println()が実行されないからです。そこで、func main()側でtime.Sleepを実行することでconsumer()のgoroutine後の処理を行う猶予を与えています。イメージは次の通りです。
f:id:k-bind:20200525161343p:plain
sleep追加前
f:id:k-bind:20200525161415p:plain
sleep追加後

4 使ったサンプルコード

最後にこれまで説明したサンプルコードを載せておきます。
今回は以上になります。ありがとうございました。

package main

import (
	"fmt"
	"sync"
	"time"
)

func producer(ch chan int, i int) {
	// Something
	ch <- i * 2
}

func consumer(ch chan int, wg *sync.WaitGroup) {
	for i := range ch {
		fmt.Println("process", i*1000)
		wg.Done()
	}
	fmt.Println("consumer:end")
}

func main() {
	var wg sync.WaitGroup
	ch := make(chan int)

	// Producer
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go producer(ch, 1)
	}

	// Consumer
	go consumer(ch, &wg)
	wg.Wait()
	close(ch)
	time.Sleep(2 * time.Second)

}