diff --git a/main.go b/main.go index 7682e49..0e2ca1d 100644 --- a/main.go +++ b/main.go @@ -1,10 +1,10 @@ package main import ( + "container/heap" "fmt" "math/rand" "sort" - "sync" "time" ) @@ -34,23 +34,64 @@ func source(c int) <-chan int { } func merge(cs ...<-chan int) <-chan int { - wg := sync.WaitGroup{} out := make(chan int) - output := func(c <-chan int) { - for n := range c { - out <- n - } - wg.Done() - } - - wg.Add(len(cs)) - for _, c := range cs { - go output(c) - } go func() { - wg.Wait() + h := &IntHeap{} + heap.Init(h) + + // prime the pumps + for i, src := range cs { + head, ok := <-src + if !ok { + continue + } + it := item{ + val: head, + src: i, + } + heap.Push(h, it) + } + + for h.Len() > 0 { + top := heap.Pop(h).(item) + out <- top.val + if head, ok := <-cs[top.src]; ok { + it := item{ + val: head, + src: top.src, + } + heap.Push(h, it) + } + } + close(out) }() return out } + +// An IntHeap is a min-heap of ints. +type IntHeap []item + +func (h IntHeap) Len() int { return len(h) } +func (h IntHeap) Less(i, j int) bool { return h[i].val < h[j].val } +func (h IntHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *IntHeap) Push(x interface{}) { + // Push and Pop use pointer receivers because they modify the slice's length, + // not just its contents. + *h = append(*h, x.(item)) +} + +func (h *IntHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +type item struct { + val int + src int +}