fanout/main.go

78 lines
1.1 KiB
Go
Raw Permalink Normal View History

2015-11-11 13:26:01 -08:00
package main
import (
"bufio"
"fmt"
"io"
2015-11-11 13:26:01 -08:00
"os"
"sync"
"time"
)
func compute(id int, work <-chan string) <-chan error {
out := make(chan error)
go func() {
for w := range work {
time.Sleep(1 * time.Second)
out <- fmt.Errorf("worker %d is done with %q", id, w)
2015-11-11 13:26:01 -08:00
}
close(out)
}()
return out
2015-11-11 13:26:01 -08:00
}
func source(in io.Reader) <-chan string {
out := make(chan string)
s := bufio.NewScanner(in)
go func() {
2015-11-11 13:26:01 -08:00
for s.Scan() {
out <- s.Text()
2015-11-11 13:26:01 -08:00
}
if err := s.Err(); err != nil {
panic(err)
}
close(out)
}()
return out
}
func merge(cs []<-chan error) <-chan error {
out := make(chan error)
var wg sync.WaitGroup
output := func(c <-chan error) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
2015-11-11 13:26:01 -08:00
go func() {
wg.Wait()
close(out)
2015-11-11 13:26:01 -08:00
}()
return out
}
2015-11-11 13:26:01 -08:00
func main() {
work := source(os.Stdin)
results := []<-chan error{}
workers := 4
for w := 0; w < workers; w++ {
results = append(results, compute(w, work))
}
out := merge(results)
for e := range out {
2015-11-11 13:26:01 -08:00
fmt.Printf("received: %+v\n", e)
}
}