From 129663bc89e611cecd18ce03d5dc3dc8d5e857d1 Mon Sep 17 00:00:00 2001 From: stephen mcquay Date: Wed, 11 Nov 2015 15:56:00 -0800 Subject: [PATCH] simplified according to pipelines article https://blog.golang.org/pipelines --- main.go | 92 +++++++++++++++++++++++++++++++++------------------------ 1 file changed, 54 insertions(+), 38 deletions(-) diff --git a/main.go b/main.go index a29c4e4..142dc0a 100644 --- a/main.go +++ b/main.go @@ -3,59 +3,75 @@ package main import ( "bufio" "fmt" + "io" "os" "sync" "time" ) -func expensive(worker int, task string) error { - time.Sleep(250 * time.Millisecond) - return fmt.Errorf("worker %d is done with %q", worker, task) -} - -func worker(id int, work chan string, results chan error, wg *sync.WaitGroup) { - defer wg.Done() - for { - i, ok := <-work - if !ok { - return +func compute(id int, work <-chan string) <-chan error { + out := make(chan error) + go func() { + for w := range work { + time.Sleep(1 * time.Second) + out <- fmt.Errorf("worker %d is done with %q", id, w) } - results <- expensive(id, i) - } + close(out) + }() + return out } -func main() { - work := make(chan string) - results := make(chan error) - - numWorkers := 4 - wwg := &sync.WaitGroup{} - wg := &sync.WaitGroup{} - for w := 0; w < numWorkers; w++ { - wwg.Add(1) - go worker(w, work, results, wwg) - } - - s := bufio.NewScanner(os.Stdin) - go func(wg *sync.WaitGroup) { +func source(in io.Reader) <-chan string { + out := make(chan string) + s := bufio.NewScanner(in) + go func() { for s.Scan() { - wg.Add(1) - work <- s.Text() + out <- s.Text() } if err := s.Err(); err != nil { panic(err) } - close(work) - }(wg) - - go func() { - wwg.Wait() - wg.Wait() - close(results) + close(out) }() + return out +} - for e := range results { - fmt.Printf("received: %+v\n", e) +func merge(cs []<-chan error) <-chan error { + out := make(chan error) + + var wg sync.WaitGroup + + output := func(c <-chan error) { + for n := range c { + out <- n + } wg.Done() } + + wg.Add(len(cs)) + for _, c := range cs { + go output(c) + } + + go func() { + wg.Wait() + close(out) + }() + return out +} + +func main() { + work := source(os.Stdin) + results := []<-chan error{} + + workers := 4 + for w := 0; w < workers; w++ { + results = append(results, compute(w, work)) + } + + out := merge(results) + + for e := range out { + fmt.Printf("received: %+v\n", e) + } }