From ecace73aed770035a0227920930155f0fd9a4aad Mon Sep 17 00:00:00 2001 From: Stephen McQuay Date: Sun, 2 Mar 2014 22:44:09 -0800 Subject: [PATCH] added files from hackerbots --- bandwidth.go | 142 ++++++++++++++++++++++++++++++++++++++++++++++ bandwidth_test.go | 118 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 260 insertions(+) create mode 100644 bandwidth.go create mode 100644 bandwidth_test.go diff --git a/bandwidth.go b/bandwidth.go new file mode 100644 index 0000000..df5ca1f --- /dev/null +++ b/bandwidth.go @@ -0,0 +1,142 @@ +package main + +import ( + "container/list" + "errors" + "fmt" + "time" +) + +type datum struct { + value float64 + at time.Time +} + +func (d datum) String() string { + r := struct { + value string + at string + }{ + value: fmt.Sprintf("%04f", d.value), + at: fmt.Sprintf("%s", d.at), + } + return fmt.Sprintf("%+v", r) +} + +type stream struct { + data *list.List + extents []time.Duration + max time.Duration +} + +func NewStream(seconds []int) *stream { + extents := []time.Duration{} + for _, s := range seconds { + extents = append(extents, time.Duration(-s)*time.Second) + + } + max := extents[len(extents)-1] + return &stream{ + data: list.New(), + extents: extents, + max: max, + } +} + +func (s *stream) add(v datum) { + s.data.PushBack(v) + newest := s.data.Back().Value.(datum) + cutoff := newest.at.Add(s.max) + for e := s.data.Front(); e != nil; e = e.Next() { + cur := e.Value.(datum).at + if cur.Before(cutoff) { + s.data.Remove(e) + } else { + break + } + } +} + +func (s *stream) averages() []float64 { + var limit time.Time + + total := 0.0 + + totals := []float64{} + if s.data.Back() == nil { + return totals + } + + newest := s.data.Back().Value.(datum) + e := s.data.Back() + + for _, extent := range s.extents { + limit = newest.at.Add(extent) + for ; e != nil; e = e.Prev() { + if e.Prev() == nil { + break + } + next := e.Prev().Value.(datum) + if next.at.Before(limit) { + break + } + cur := e.Value.(datum) + total += cur.value + } + totals = append(totals, total/float64(-int(extent/time.Second))) + } + + return totals +} + +type Bandwidth struct { + AddRx chan int + AddTx chan int + Rx chan []float64 + Tx chan []float64 + rxSnap []float64 + txSnap []float64 + dt time.Duration + rxstream *stream + txstream *stream + Quit chan interface{} +} + +func NewBandwidth(seconds []int, dt time.Duration) (*Bandwidth, error) { + if len(seconds) < 1 { + return nil, errors.New("must specify at least one interval lenght") + } + r := &Bandwidth{ + AddRx: make(chan int, 1024), + AddTx: make(chan int, 1024), + Rx: make(chan []float64), + Tx: make(chan []float64), + dt: dt, + Quit: make(chan interface{}), + rxstream: NewStream(seconds), + txstream: NewStream(seconds), + } + return r, nil +} + +func (bw *Bandwidth) run() { + t := time.NewTicker(bw.dt) +outer: + for { + select { + case <-t.C: + bw.rxSnap = bw.rxstream.averages() + bw.txSnap = bw.txstream.averages() + case bw.Rx <- bw.rxSnap: + case bw.Tx <- bw.txSnap: + case s := <-bw.AddRx: + bw.rxstream.add(datum{float64(s), time.Now()}) + case s := <-bw.AddTx: + bw.txstream.add(datum{float64(s), time.Now()}) + case <-bw.Quit: + break outer + } + } + close(bw.AddRx) + close(bw.AddTx) +} diff --git a/bandwidth_test.go b/bandwidth_test.go new file mode 100644 index 0000000..3754110 --- /dev/null +++ b/bandwidth_test.go @@ -0,0 +1,118 @@ +package main + +import ( + "log" + "testing" + "time" +) + +func init() { + log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) +} + +func general(t *testing.T) { + bw, err := NewBandwidth([]int{1, 10, 60}, 100*time.Millisecond) + if err != nil { + t.Error(err) + } + go bw.run() + bw.AddRx <- 10 + bw.AddTx <- 10 + time.Sleep(1 * time.Second) + log.Printf("%+v", <-bw.Rx) + log.Printf("%+v", <-bw.Tx) + for i := 0; i < 10; i++ { + bw.AddRx <- (10 * i) + bw.AddRx <- (10 * i) + time.Sleep(500 * time.Millisecond) + log.Printf("%+v", <-bw.Rx) + log.Printf("%+v", <-bw.Tx) + } + log.Printf("%+v", <-bw.Rx) + log.Printf("%+v", <-bw.Tx) + time.Sleep(10 * time.Second) + log.Printf("%+v", <-bw.Rx) + log.Printf("%+v", <-bw.Tx) + close(bw.Quit) + time.Sleep(100 * time.Millisecond) + log.Printf("%+v", <-bw.Rx) + log.Printf("%+v", <-bw.Tx) +} + +func validate(t *testing.T, actual, expected []float64) { + if len(actual) != len(expected) { + t.Errorf("len is not same: %d expected %d", len(actual), len(expected)) + } + for i, _ := range actual { + if actual[i] != expected[i] { + t.Errorf("%dth: got %f expected %f", i, actual[i], expected[i]) + } + } +} + +func TestOncePerSecond(t *testing.T) { + bw, err := NewBandwidth([]int{1, 10, 60}, 100*time.Millisecond) + if err != nil { + t.Error(err) + } + bw.rxstream = NewStream([]int{1, 10, 60}) + var i int64 = 0 + for ; i < 1000; i++ { + d := datum{ + value: 1.0, + at: time.Unix(1234567890+i, 0), + } + bw.rxstream.add(d) + } + validate(t, bw.rxstream.averages(), []float64{1.0, 1.0, 1.0}) +} + +func TestOneOverManySeconds(t *testing.T) { + bw, err := NewBandwidth([]int{1, 10, 60}, 100*time.Millisecond) + if err != nil { + t.Error(err) + } + bw.rxstream = NewStream([]int{1, 10, 60}) + var i int64 = 0 + for ; i < 1000; i++ { + d := datum{ + value: 1.0, + at: time.Unix(1234567890+i*2, 0), + } + bw.rxstream.add(d) + } + validate(t, bw.rxstream.averages(), []float64{0.0, 0.5, 0.5}) +} + +func TestManyPerSecond(t *testing.T) { + bw, err := NewBandwidth([]int{1, 10, 60}, 100*time.Millisecond) + if err != nil { + t.Error(err) + } + bw.rxstream = NewStream([]int{1, 10, 60}) + var i int64 = 0 + for ; i < 10000; i++ { + d := datum{ + value: 1.0, + at: time.Unix(1234567890, i*10000000), + } + bw.rxstream.add(d) + } + validate(t, bw.rxstream.averages(), []float64{100.0, 100.0, 100.0}) +} + +func TestEmpty(t *testing.T) { + bw, err := NewBandwidth([]int{1, 10, 60}, 100*time.Millisecond) + if err != nil { + t.Error(err) + } + bw.rxstream = NewStream([]int{1, 10, 60}) + validate(t, bw.rxstream.averages(), []float64{}) +} + +func TestEmptySeconds(t *testing.T) { + _, err := NewBandwidth([]int{}, 100*time.Millisecond) + if err == nil { + t.Error(err) + } +}