From bb8ce3c1eaab8ce2193eba6728207e53ae977de2 Mon Sep 17 00:00:00 2001 From: Stephen McQuay Date: Sat, 8 Mar 2014 00:45:55 -0800 Subject: [PATCH] Started down a different path; using a circular buffer instead. After talks on #go-nuts, it seems as though I should look into using something like the following: https://github.com/cespare/goproc/tree/master/nwstat And this makes sense. I have trimmed out the old tests and rearranged some code here. --- bandwidth.go | 128 ++++++++++++---------------------------------- bandwidth_test.go | 90 +++----------------------------- 2 files changed, 40 insertions(+), 178 deletions(-) diff --git a/bandwidth.go b/bandwidth.go index c0792f1..ab8bcc1 100644 --- a/bandwidth.go +++ b/bandwidth.go @@ -3,9 +3,9 @@ package bandwidth import ( - "container/list" "errors" - "fmt" + "log" + "sort" "time" ) @@ -24,20 +24,29 @@ type Bandwidth struct { Quit chan interface{} - rxSnap []float64 - txSnap []float64 - dt time.Duration - rxstream *stream - txstream *stream + rxSnap []float64 + txSnap []float64 + dt time.Duration + + curRx int + curTx int + + rxstream []int + txstream []int + + timeI int + max int } // NewBandwidth Returns a populated and ready to launch Bandwidth. seconds is // a slice of seconds on which to report (e.g. 1, 10, 60 seconds). dt is how // often the values used to send to Rx and Tx are updated. -func NewBandwidth(seconds []int, dt time.Duration) (*Bandwidth, error) { - if len(seconds) < 1 { +func NewBandwidth(dts []int, dt time.Duration) (*Bandwidth, error) { + if len(dts) < 1 { return nil, errors.New("must specify at least one interval lenght") } + sort.Ints(dts) + max := dts[len(dts)-1] r := &Bandwidth{ AddRx: make(chan int, 1024), AddTx: make(chan int, 1024), @@ -45,8 +54,9 @@ func NewBandwidth(seconds []int, dt time.Duration) (*Bandwidth, error) { Tx: make(chan []float64), dt: dt, Quit: make(chan interface{}), - rxstream: newStream(seconds), - txstream: newStream(seconds), + rxstream: make([]int, max), + txstream: make([]int, max), + max: max, } return r, nil } @@ -59,14 +69,17 @@ outer: for { select { case <-t.C: - bw.rxSnap = bw.rxstream.averages() - bw.txSnap = bw.txstream.averages() + bw.rxSnap = bw.averages(bw.rxstream) + bw.txSnap = bw.averages(bw.txstream) + bw.curTx = 0 + bw.curRx = 0 + bw.timeI += 1 case bw.Rx <- bw.rxSnap: case bw.Tx <- bw.txSnap: case s := <-bw.AddRx: - bw.rxstream.add(datum{float64(s), time.Now()}) + bw.curTx += s case s := <-bw.AddTx: - bw.txstream.add(datum{float64(s), time.Now()}) + bw.curRx += s case <-bw.Quit: break outer } @@ -77,86 +90,9 @@ outer: close(bw.Tx) } -// datum stores both a piece of info, plus the time at which that info was -// collected. -type datum struct { - value float64 - at time.Time -} - -func (d datum) String() string { - r := struct { - value string - at string - }{ - value: fmt.Sprintf("%04f", d.value), - at: fmt.Sprintf("%s", d.at), +func (bw *Bandwidth) averages(state []int) []float64 { + for i := 0; i < bw.max; i++ { + log.Println(bw.timeI + i) } - return fmt.Sprintf("%+v", r) -} - -// stream manages a list of datum, and prunes itself on add -type stream struct { - data *list.List - extents []time.Duration - max time.Duration -} - -func newStream(seconds []int) *stream { - extents := []time.Duration{} - for _, s := range seconds { - extents = append(extents, time.Duration(-s)*time.Second) - - } - max := extents[len(extents)-1] - return &stream{ - data: list.New(), - extents: extents, - max: max, - } -} - -func (s *stream) add(v datum) { - s.data.PushBack(v) -} - -func (s *stream) averages() []float64 { - var limit time.Time - total := 0.0 - totals := []float64{} - - if s.data.Back() == nil { - return totals - } - - now := time.Now() - cutoff := now.Add(s.max) - for e := s.data.Front(); e != nil; e = e.Next() { - cur := e.Value.(datum).at - if cur.Before(cutoff) { - s.data.Remove(e) - } else { - break - } - } - - e := s.data.Back() - - for _, extent := range s.extents { - limit = now.Add(extent) - for ; e != nil; e = e.Prev() { - if e.Prev() == nil { - break - } - next := e.Prev().Value.(datum) - if next.at.Before(limit) { - break - } - cur := e.Value.(datum) - total += cur.value - } - totals = append(totals, total/float64(-int(extent/time.Second))) - } - - return totals + return nil } diff --git a/bandwidth_test.go b/bandwidth_test.go index 344b4f2..01ff408 100644 --- a/bandwidth_test.go +++ b/bandwidth_test.go @@ -10,35 +10,6 @@ func init() { log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) } -func general(t *testing.T) { - bw, err := NewBandwidth([]int{1, 10, 60}, 100*time.Millisecond) - if err != nil { - t.Error(err) - } - go bw.Run() - bw.AddRx <- 10 - bw.AddTx <- 10 - time.Sleep(1 * time.Second) - log.Printf("%+v", <-bw.Rx) - log.Printf("%+v", <-bw.Tx) - for i := 0; i < 10; i++ { - bw.AddRx <- (10 * i) - bw.AddRx <- (10 * i) - time.Sleep(500 * time.Millisecond) - log.Printf("%+v", <-bw.Rx) - log.Printf("%+v", <-bw.Tx) - } - log.Printf("%+v", <-bw.Rx) - log.Printf("%+v", <-bw.Tx) - time.Sleep(10 * time.Second) - log.Printf("%+v", <-bw.Rx) - log.Printf("%+v", <-bw.Tx) - close(bw.Quit) - time.Sleep(100 * time.Millisecond) - log.Printf("%+v", <-bw.Rx) - log.Printf("%+v", <-bw.Tx) -} - func validate(t *testing.T, actual, expected []float64) { if len(actual) != len(expected) { t.Errorf("len is not same: %d expected %d", len(actual), len(expected)) @@ -50,64 +21,14 @@ func validate(t *testing.T, actual, expected []float64) { } } -func TestOncePerSecond(t *testing.T) { - bw, err := NewBandwidth([]int{1, 10, 60}, 100*time.Millisecond) - if err != nil { - t.Error(err) - } - bw.rxstream = newStream([]int{1, 10, 60}) - var i int64 = 0 - for ; i < 1000; i++ { - d := datum{ - value: 1.0, - at: time.Unix(1234567890+i, 0), - } - bw.rxstream.add(d) - } - validate(t, bw.rxstream.averages(), []float64{1.0, 1.0, 1.0}) -} - -func TestOneOverManySeconds(t *testing.T) { - bw, err := NewBandwidth([]int{1, 10, 60}, 100*time.Millisecond) - if err != nil { - t.Error(err) - } - bw.rxstream = newStream([]int{1, 10, 60}) - var i int64 = 0 - for ; i < 1000; i++ { - d := datum{ - value: 1.0, - at: time.Unix(1234567890+i*2, 0), - } - bw.rxstream.add(d) - } - validate(t, bw.rxstream.averages(), []float64{0.0, 0.5, 0.5}) -} - -func TestManyPerSecond(t *testing.T) { - bw, err := NewBandwidth([]int{1, 10, 60}, 100*time.Millisecond) - if err != nil { - t.Error(err) - } - bw.rxstream = newStream([]int{1, 10, 60}) - var i int64 = 0 - for ; i < 10000; i++ { - d := datum{ - value: 1.0, - at: time.Unix(1234567890, i*10000000), - } - bw.rxstream.add(d) - } - validate(t, bw.rxstream.averages(), []float64{100.0, 100.0, 100.0}) -} - func TestEmpty(t *testing.T) { bw, err := NewBandwidth([]int{1, 10, 60}, 100*time.Millisecond) if err != nil { t.Error(err) } - bw.rxstream = newStream([]int{1, 10, 60}) - validate(t, bw.rxstream.averages(), []float64{}) + go bw.Run() + bw.rxstream = []int{1, 10, 60} + validate(t, <-bw.Rx, []float64{}) } func TestEmptySeconds(t *testing.T) { @@ -116,3 +37,8 @@ func TestEmptySeconds(t *testing.T) { t.Error(err) } } + +func TestA(t *testing.T) { + bw, _ := NewBandwidth([]int{1, 10, 30}, 1*time.Second) + log.Printf("%+v", bw) +}