simplified according to pipelines article

https://blog.golang.org/pipelines
This commit is contained in:
Stephen McQuay 2015-11-11 15:56:00 -08:00
parent 74ecf49cd4
commit 129663bc89
1 changed files with 54 additions and 38 deletions

92
main.go
View File

@ -3,59 +3,75 @@ package main
import ( import (
"bufio" "bufio"
"fmt" "fmt"
"io"
"os" "os"
"sync" "sync"
"time" "time"
) )
func expensive(worker int, task string) error { func compute(id int, work <-chan string) <-chan error {
time.Sleep(250 * time.Millisecond) out := make(chan error)
return fmt.Errorf("worker %d is done with %q", worker, task) go func() {
} for w := range work {
time.Sleep(1 * time.Second)
func worker(id int, work chan string, results chan error, wg *sync.WaitGroup) { out <- fmt.Errorf("worker %d is done with %q", id, w)
defer wg.Done()
for {
i, ok := <-work
if !ok {
return
} }
results <- expensive(id, i) close(out)
} }()
return out
} }
func main() { func source(in io.Reader) <-chan string {
work := make(chan string) out := make(chan string)
results := make(chan error) s := bufio.NewScanner(in)
go func() {
numWorkers := 4
wwg := &sync.WaitGroup{}
wg := &sync.WaitGroup{}
for w := 0; w < numWorkers; w++ {
wwg.Add(1)
go worker(w, work, results, wwg)
}
s := bufio.NewScanner(os.Stdin)
go func(wg *sync.WaitGroup) {
for s.Scan() { for s.Scan() {
wg.Add(1) out <- s.Text()
work <- s.Text()
} }
if err := s.Err(); err != nil { if err := s.Err(); err != nil {
panic(err) panic(err)
} }
close(work) close(out)
}(wg)
go func() {
wwg.Wait()
wg.Wait()
close(results)
}() }()
return out
}
for e := range results { func merge(cs []<-chan error) <-chan error {
fmt.Printf("received: %+v\n", e) out := make(chan error)
var wg sync.WaitGroup
output := func(c <-chan error) {
for n := range c {
out <- n
}
wg.Done() wg.Done()
} }
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
work := source(os.Stdin)
results := []<-chan error{}
workers := 4
for w := 0; w < workers; w++ {
results = append(results, compute(w, work))
}
out := merge(results)
for e := range out {
fmt.Printf("received: %+v\n", e)
}
} }