Go Concurrency Patterns: Pipelines and cancellation - The Go Blog ãèªãã§ãã¦ããªããªãç解ããã¾ã§è¦ããã ã®ã§å¾©ç¿ãã¦ãèªåã§ãã³ã¼ããæ¸ãã¦ç¢ºããã¦ã¿ãã
ãé¡
è¤æ°ã®å
¥åãã¼ã¿ããããã«å¯¾ãã¦éãå¦çãè¡ããçµæã¨ãã¦è¿ã£ã¦ããå¤ãã¾ã¨ãã¦åå¾ããã
éä¸ã§ã¨ã©ã¼ãçºçãããç´ã¡ã«å¦çãä¸æ¢ãã¦çµäºããã
ã³ã¼ã
ãã¡ã°ãç°¡åãªä¾
ã¨ã©ã¼ãèæ ®ããªãå ´åã
package main import ( "fmt" "log" "math/rand" "time" ) func init() { log.SetFlags(log.Lmicroseconds) rand.Seed(time.Now().UnixNano()) } func doSomething(id int) string { wait := rand.Intn(1000) time.Sleep(time.Millisecond * time.Duration(wait)) // something heavy return fmt.Sprintf("%02d-%03d", id, wait) } func getAllData() (results []string) { for i := 0; i < 100; i++ { value := doSomething(i) log.Println("got", value) results = append(results, value) } return results } func main() { data := getAllData() log.Println("Finished.", data) }
ãããªãããã100ã«ã¼ãã§æ¯åæ°ç¾ããªç§ãããå¦ç(æ¬å½ã¯CPUã¶ãåããããªå¦çã ã£ãã)ããã¦ããã®çµæãã²ã¨ã¤ãã¤ç¹ãã¦ãã£ã¦çµæãæ ¼ç´ãããsliceãè¿ãã
$ go run example.go 22:57:33.931461 got 00-955 22:57:34.706984 got 01-774 22:57:35.204441 got 02-497 ... 22:58:23.020169 got 97-116 22:58:23.528240 got 98-507 22:58:24.178353 got 99-649 22:58:24.178596 Finished. [00-955 01-774 02-497 ...
å½ç¶ãªããé çªã«1åãã¤å¦çãã¦ããã®ã§ã¨ã¦ãæéãããã
ã¨ã©ã¼å¦çãå ãã
doSomething
ã®ä¸ã§ããããã¯ãã®åã«ç¹°ãè¿ãå¦çã®å
é¨ã§ã¨ã©ã¼ãèµ·ãããããã¨ãããé©å½ã«100åã®1ãããã®ç¢ºçã§èµ·ãããã¨ã«ã㦠ããããã®é¢æ°ãerror
ãè¿ãããå¤æ´
var errUnfortunate1 = errors.New("unfortunate error 1") var errUnfortunate2 = errors.New("unfortunate error 2") func doSomething(id int) (string, error) { wait := rand.Intn(1000) time.Sleep(time.Millisecond * time.Duration(wait)) // something heavy if rand.Intn(100) == 0 { return "", errUnfortunate1 } return fmt.Sprintf("%02d-%03d", id, wait), nil } func getAllData() (results []string, err error) { for i := 0; i < 100; i++ { if rand.Intn(100) == 0 { return nil, errUnfortunate2 } value, err := doSomething(i) if err != nil { return nil, err } log.Println("got", value) results = append(results, value) } return results, nil } func main() { data, err := getAllData() if err != nil { log.Println("Failed!", err) return } log.Println("Finished.", date) }
é¢æ°ã®è¿ãå¤ããã¨ã©ã¼ãã§ãã¯ã㦠ä½ãããã°ããã«getAllData
ãæãã¦main
å
ã§åºåãã¦çµäºããããã«ãªã£ã¦ããã
並è¡åãã®1 channelå
å¦çã並è¡ã§è¡ãããã®æºåã¨ãã¦ãgoroutineã¨channelã使ã£ãå½¢ã«å¤ãã¦ããã
ã¾ãã¯å
¥åãéã£ã¦ãããchannelãä½ã£ã¦è¿ãé¢æ°ãä½ããããããrange
ã§èªã¿åãããã«ãã¦ã¿ããã¨ã©ã¼å¦çãç¡è¦ããã¨
func getDataChannel() <-chan string { c := make(chan string) go func() { for i := 0; i < 100; i++ { value, _ := doSomething(i) log.Println("got", value) c <- value } close(c) }() return c } func getAllData() (results []string, err error) { c := getDataChannel() for value := range c { results = append(results, value) } return results, nil }
ãããªãããã
並è¡åãã®2 éãå¦çã並è¡ã«
å¼ãç¶ãã¨ã©ã¼å¦çãç¡è¦ããã¾ã¾ã ãã©ãdoSomething
é¨åãgoroutineã«ã
åç´ã«å³æé¢æ°ã§å²ãã§ä¸¦è¡åããã ãã ã¨å¦çãçµããåã«c
ãéãã¦ãã¾ã£ããmainã¾ã§çµäºãã¦ãã¾ã£ããããã®ã§ãsync.WaitGroup
ã使ã£ã¦å
¨é¨ãããã¾ã§å¾
ã¤ã
import ( ... "sync" ) func getDataChannel() <-chan string { c := make(chan string) go func() { var wg sync.WaitGroup for i := 0; i < 100; i++ { wg.Add(1) go func(id int) { value, _ := doSomething(id) log.Println("got", value) c <- value wg.Done() }(i) } wg.Wait() close(c) }() return c }
æ©ãçµãã£ããã®ããé ã«ã©ãã©ããã¼ã¿ãéããã¦ãå
¨é¨ãããã¾ã§å¾
ã£ã¦ããc
ãcloseãããã
ã¨ã©ã¼ãä½ããªããã°ããã§è¯ãã®ã ãã©ãã
並è¡åãã®3 ã¨ã©ã¼å¦ç1
ã¾ãã¯doSomething
ã§è¿ã£ã¦ããerrUnfortunate1ãææã
ããã¯goroutineå
ã§èµ·ããå¾ãã®ã§é¢æ°ã®è¿ãå¤ã¨ãã¦ã¯ä½¿ãã¥ãããã®ã§ãè¿ã£ã¦ããvalueã¨ã¨ãã«structã«å«ãã¦channelã«éãããã«ãã
type result struct { value string err error } func getDataChannel() <-chan result { c := make(chan result) go func() { var wg sync.WaitGroup for i := 0; i < 100; i++ { wg.Add(1) go func(id int) { value, err := doSomething(id) log.Println("got", value, err) c <- result{value: value, err: err} wg.Done() }(i) } wg.Wait() close(c) }() return c } func getAllData() (results []string, err error) { c := getDataChannel() for r := range c { if r.err != nil { return nil, r.err } results = append(results, r.value) } return results, nil }
åãåãå´ã®range
ã«ã¼ãå
ã§result.err
ããã§ãã¯ãã¦ãã¨ã©ã¼ãæ¤åºãããããã§çµäºã
ããã ãã§ã¯ã¾ã ã¾ã åé¡ããã®ã ãã© ã¨ããããããã§ã¯ããã§ææã§ãããã¨ã«ãã
並è¡åãã®4 ã¨ã©ã¼å¦ç2
次ã«ãç¹°ãè¿ãå¦çã®å
é¨ã§èµ·ããå¾ãerrUnfortunate2ãææããã
goroutineã§ã¯è¿ãå¤ãã¨ããªãã®ã§ãã«ã¼ãããé¨åãfunc() error {}()
ãªå³æé¢æ°ã§å²ããã¨ã§åå¾ãããå¾ãerrorãéãæ段ã¨ãã¦errorç¨ã®channelãç¨æããresultãéãchannelã¨ä¸ç·ã«è¿ãã¦ä½¿ã£ã¦ãããããã«ãã
func getDataChannel() (<-chan result, <-chan error) { c := make(chan result) errc := make(chan error) go func() { var wg sync.WaitGroup err := func() error { for i := 0; i < 100; i++ { if rand.Intn(100) == 0 { return errUnfortunate2 } wg.Add(1) go func(id int) { value, err := doSomething(id) log.Println("got", value, err) c <- result{value: value, err: err} wg.Done() }(i) } return nil }() wg.Wait() close(c) errc <- err }() return c, errc } func getAllData() (results []string, err error) { c, errc := getDataChannel() for r := range c { results = append(results, r.value) if r.err != nil { return nil, r.err } } err = <-errc if err != nil { return } return results, nil }
errorãèµ·ãããã¨èµ·ãã¾ãã¨å³æé¢æ°ãçµäºããå¾ã«c
ã¯closeãããã®ã§range c
ã«ã¼ããçµäºãããã®å¾ã«errc
ããå³æé¢æ°ã®è¿ãå¤ã¨ãã¦å¾ãerrorãåå¾ãã¦ãã§ãã¯ãããã¨ãã§ããã
ããã¾ãåé¡ããããã©ä¸å¿ææã¯ã§ããã
並è¡åãã®5 ä¸æããããã¨ãç¥ããã
ããã¾ã§ã ã¨ãerrUnfortunate1ãèµ·ããã¨ãã«ã¯c
ãcloseãããã¨ããªãèµ°ã£ã¦ãå¦çãç¶ãããerrUnfortunate2ã®ã¨ãã«ãèµ°ã£ã¦ããã®å¾
ã£ã¦ããcloseãããã¨ã«ãªã£ã¦ãã¾ã£ãããã¾ã æ£ããä¸æã§ãã¦ããã¨ã¯è¨ããªãã
並è¡åãã¦èµ°ã£ã¦ããå¦çãã¡ã«ä¸æããããã¨ãç¥ãããããã«ãããä¸ã¤channelãç¨æãã¦ããã使ã£ã¦å¤å®ããããã«ããã
func getDataChannel(done <-chan struct{}) (<-chan result, <-chan error) { c := make(chan result) errc := make(chan error) go func() { var wg sync.WaitGroup err := func(walkFunc func(int) error) (err error) { for i := 0; i < 100; i++ { time.Sleep(time.Millisecond * 50) if rand.Intn(100) == 0 { return errUnfortunate2 } err = walkFunc(i) if err != nil { return } } return nil }(func(id int) error { wg.Add(1) go func() { value, err := doSomething(id) log.Println("got", value, err) select { case c <- result{value: value, err: err}: log.Println("sent.") case <-done: log.Println("not sent.") } wg.Done() }() select { case <-done: return errors.New("canceled") default: return nil } }) wg.Wait() close(c) errc <- err }() return c, errc } func getAllData() (results []string, err error) { done := make(chan struct{}) defer close(done) c, errc := getDataChannel(done) for r := range c { results = append(results, r.value) if r.err != nil { return nil, r.err } } err = <-errc if err != nil { return } return results, nil }
getAllData
å´ã§ç¨æããdone
channelã¯ãdeferã«ãã£ã¦é¢æ°ãæããã¨ãã«closeããããããgetDataChannel
ã«æ¸¡ãã¦ããã¦ããã¡ãã§ã¯select
ã使ã£ã¦å¦çãåå²ããããã¨ãã§ãããdoneãéãã¦ããã°ãã¡ãå´ãå®è¡ãããã®ã§doSomething
ããå¤ãè¿ã£ã¦ãã¦ãc
ã«ã¯éä¿¡ãããªãããã«ã¼ããå®è¡ããwalkFunc
ã¯"canceled"ãªã¨ã©ã¼ãåãåãã«ã¼ããä¸æããããã«ãªãã
並è¡åãã®6 å®æå½¢ï¼
ä¸æããã¨ãã«sync.WaitGroup
ã§å
¨é¨çµããã¾ã§Waitããã®ã¯ãããã¯ããå¿
è¦ãªãã®ã§goroutineã«ãã(deferã§ãè¯ãããï¼)ãã§ãrangeã§c
ãéããã¾ã§å¾
ã£ã¦ãã¦ã¯çµå±errc
ããããã«ã¯åãåããªãã®ã§ãã¡ããselectã使ãã
ãã¨ãerrc
ã¯éãåã«åãåãå´ãçµäºãã¦ãã¾ã£ã¦ããã¨æ¸ãè¾¼ã¿ããããã¯ãããå¯è½æ§ãããã®ã§ãããã¡ãªã³ã°ãã¦ããå¿
è¦ããããã®ã§make
ã®ç¬¬2å¼æ°ã§1以ä¸ãæå®ãã¦ããã
func getDataChannel(done <-chan struct{}) (<-chan result, <-chan error) { c := make(chan result) errc := make(chan error, 1) go func() { var wg sync.WaitGroup err := func(walkFunc func(int) error) (err error) { for i := 0; i < 100; i++ { time.Sleep(time.Millisecond * 50) if rand.Intn(100) == 0 { return errUnfortunate2 } err = walkFunc(i) if err != nil { return } } return nil }(func(id int) error { wg.Add(1) go func() { log.Println("start", id) value, err := doSomething(id) log.Println("got", value, err) select { case c <- result{value: value, err: err}: case <-done: } wg.Done() }() select { case <-done: return errors.New("canceled") default: return nil } }) go func() { wg.Wait() close(c) }() errc <- err }() return c, errc } func getAllData() (results []string, err error) { done := make(chan struct{}) defer close(done) c, errc := getDataChannel(done) Loop: for { select { case r, ok := <-c: if !ok { break Loop } results = append(results, r.value) if r.err != nil { return nil, r.err } case err = <-errc: if err != nil { return } } } return results, nil }
ããã§ã並è¡ã㤠ã¨ã©ã¼æã«ã¯å³åº§ã«å¦çãä¸æããã¦ä½è¨ãªãã¼ã¿éåä¿¡ãªã©ããªãå¾å§æ«ãã§ããããã«ãªã£ãã
goroutineã®èµ·åæ°ãå¶é
ã¨ã¯ããä¸è¨ã®æ¹æ³ã ã¨å
¥ååãåããã³ã«ã©ãã©ãgoroutineãèµ·åãããã¨ã«ãªãã¡ã¢ãªä½¿ç¨éãªã©ããºããã¨ã«ãªãå¾ãã
ã®ã§ã並è¡ã«èµ°ãããæ°ãå¶éãããå¥ã®ãã¿ã¼ã³ãç¨æããã
ã¾ããå¦çã®çµæãéãchannelãè¿ãã¦ããgetDataChannel
ãã"å
¥å"ãéãchannelãè¿ãã ãã®ãã®ã«å¤æ´ããã
func getInputChannel(done <-chan struct{}) (<-chan int, <-chan error) { ids := make(chan int) errc := make(chan error, 1) go func() { defer close(ids) err := func(walkFunc func(int) error) (err error) { for i := 0; i < 100; i++ { time.Sleep(time.Millisecond * 50) if rand.Intn(100) == 0 { return errUnfortunate2 } err = walkFunc(i) if err != nil { return } } return nil }(func(id int) error { select { case <-done: return errors.New("canceled") case ids <- id: } return nil }) errc <- err }() return ids, errc }
ãããªããããdoneãéãã¦ãªãéãã¯å ¥åãã¼ã¿ã¨ãªãidãéãã¤ã¥ããã
ã§ããã®å ¥åchannelãåãåã£ã¦åºåã«çµæãæµãworkerçãªãã®ãå¥ã«ä½ãã
func worker(ids <-chan int, c chan<- result, done <-chan struct{}) { for id := range ids { value, err := doSomething(id) log.Println("got", value, err) select { case c <- result{value: value, err: err}: case <-done: return } } }
åç´ã«å
¥åãæµãã¦ããéãdoSomething
ãªå¦çããã¦ãdoneãéãã¦ããªãéãã¯c
ã«resultãéãã¤ã¥ãããå½¹å²ãããããªãã¦ããã
ãã§ããã¨ã¯ãããgoroutineã§èµ·åããã¦åãåãã ãããã ãçµäºããã®ãå¾
ã£ã¦ããc
ãcloseãã¦ããå¿
è¦ã¯ããã
func getAllData() (results []string, err error) { done := make(chan struct{}) defer close(done) ids, errc := getInputChannel(done) var wg sync.WaitGroup c := make(chan result) wg.Add(1) go func() { worker(ids, c, done) wg.Done() }() go func() { wg.Wait() close(c) }() Loop: for { select { case r, ok := <-c: if !ok { break Loop } results = append(results, r.value) if r.err != nil { return nil, r.err } case err = <-errc: if err != nil { return } } } return results, nil }
ãã®å½¢ã§å¼ã³åºãããworkerã¯ä»»æã®æ°ã®goroutineã§ä¸¦è¡èµ·åãã¦ãããããããå ¥åãåãåãåºåãéããã¨ããå½¹ç®ãããªãã ããªã®ã§ä¸æãåä½ãã¦ãããã
var wg sync.WaitGroup c := make(chan result) for i := 0; i < 10; i++ { wg.Add(1) go func() { worker(ids, c, done) wg.Done() }() } go func() { wg.Wait() close(c) }()
æçµå½¢
ã¨ããããã§æçµçã«åºæ¥ä¸ãã£ãã®ã以ä¸ãã¡ããã¨ç´å¾ã§ããããã¡ã§ http://blog.golang.org/pipelines/bounded.go ã¨åããããªå½¢ã«ã§ããã®ã§å¤§ä¸å¤«ã ã¨æãã
package main import ( "errors" "fmt" "log" "math/rand" "runtime" "sync" "time" ) func init() { log.SetFlags(log.Lmicroseconds) rand.Seed(time.Now().UnixNano()) } var errUnfortunate1 = errors.New("unfortunate error 1") var errUnfortunate2 = errors.New("unfortunate error 2") type result struct { value string err error } func doSomething(id int) (string, error) { wait := rand.Intn(1000) time.Sleep(time.Millisecond * time.Duration(wait)) // something heavy if rand.Intn(100) == 0 { return "", errUnfortunate1 } return fmt.Sprintf("%02d-%03d", id, wait), nil } func getInputChannel(done <-chan struct{}) (<-chan int, <-chan error) { ids := make(chan int) errc := make(chan error, 1) go func() { defer close(ids) err := func(walkFunc func(int) error) (err error) { for i := 0; i < 100; i++ { time.Sleep(time.Millisecond * 50) if rand.Intn(100) == 0 { return errUnfortunate2 } err = walkFunc(i) if err != nil { return } } return nil }(func(id int) error { select { case <-done: return errors.New("canceled") case ids <- id: } return nil }) errc <- err }() return ids, errc } func worker(ids <-chan int, c chan<- result, done <-chan struct{}) { for id := range ids { value, err := doSomething(id) log.Println("got", value, err) select { case c <- result{value: value, err: err}: log.Println("sent") case <-done: log.Println("not sent") return } } } func getAllData() (results []string, err error) { done := make(chan struct{}) defer close(done) ids, errc := getInputChannel(done) var wg sync.WaitGroup c := make(chan result) for i := 0; i < 10; i++ { wg.Add(1) go func() { worker(ids, c, done) wg.Done() }() } go func() { wg.Wait() close(c) }() Loop: for { select { case r, ok := <-c: if !ok { break Loop } results = append(results, r.value) if r.err != nil { return nil, r.err } case err = <-errc: if err != nil { return } } } return results, nil } func main() { defer func() { // ç°å¸¸ã«å¤§ããªæ°ã®goroutineãèµ·åãã£ã±ãªãã§ãªãã確ããã time.Sleep(time.Millisecond * 2000) log.Println(runtime.NumGoroutine()) }() data, err := getAllData() if err != nil { log.Println("Failed!", err) return } log.Println("Finished.", data) }
ã¾ã¨ã
ãªããªãå¦çã®æµããè¤éãªãããããã¦ããªãã§ãããªæ¸ãæ¹ããã®ãããããã¨ã©ã¼ã«ãªã£ããã©ããªãã®ãã¨ãæ©ãã ãã©ãæ¸ããªããèªãã§ããã¡ã«ããããããã¼ãã ããããããã®ããã確ãã«ããããããã¨æã£ããããããå½¢ã«ãªãããã¼ãã£ã¦ç´å¾ã§ããã
ã¨ã¯ããã¹ã©ã¹ã©ã¨ããããã®ãæ¸ããæ°ã¯ã¾ã ããªããã©â¦ã