From f161a60e378928700248900ba131202ed65b2396 Mon Sep 17 00:00:00 2001 From: Stephen McQuay Date: Sat, 8 Mar 2014 18:07:43 -0800 Subject: [PATCH] Fixed the memory leaks using circular buffer. --- bandwidth.go | 39 ++++++++++++++++++------- bandwidth_test.go | 73 +++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 99 insertions(+), 13 deletions(-) diff --git a/bandwidth.go b/bandwidth.go index 0020802..d37195f 100644 --- a/bandwidth.go +++ b/bandwidth.go @@ -4,7 +4,6 @@ package bandwidth import ( "errors" - "log" "sort" "time" ) @@ -27,6 +26,7 @@ type Bandwidth struct { rxSnap []float64 txSnap []float64 dt time.Duration + dts []int curRx int curTx int @@ -39,8 +39,8 @@ type Bandwidth struct { } // NewBandwidth Returns a populated and ready to launch Bandwidth. seconds is -// a slice of seconds on which to report (e.g. 1, 10, 60 seconds). dt is how -// often the values used to send to Rx and Tx are updated. +// a slice of multiples of dt on which to report (e.g. 1x, 10x, 60x dt). dt is +// also how often the values used to send to Rx and Tx are updated. func NewBandwidth(dts []int, dt time.Duration) (*Bandwidth, error) { if len(dts) < 1 { return nil, errors.New("must specify at least one interval lenght") @@ -53,6 +53,7 @@ func NewBandwidth(dts []int, dt time.Duration) (*Bandwidth, error) { Rx: make(chan []float64), Tx: make(chan []float64), dt: dt, + dts: dts, Quit: make(chan interface{}), rxstream: make([]int, max), txstream: make([]int, max), @@ -71,18 +72,27 @@ outer: case <-t.C: bw.rxstream[bw.timeI] = bw.curRx bw.txstream[bw.timeI] = bw.curTx + bw.rxSnap = bw.averages(bw.rxstream) bw.txSnap = bw.averages(bw.txstream) + bw.curTx = 0 bw.curRx = 0 - bw.timeI += 1 - // testing hg branching + + // n.b.: here we march forward through time by going backward in + // our slice. + bw.timeI = (bw.timeI - 1) % bw.max + // lol: because modulo does unexpected things for negative numbers. + if bw.timeI < 0 { + bw.timeI = bw.timeI + bw.max + } + // log.Printf("%d %+v", bw.timeI, bw.rxstream) case bw.Rx <- bw.rxSnap: case bw.Tx <- bw.txSnap: case s := <-bw.AddRx: - bw.curTx += s - case s := <-bw.AddTx: bw.curRx += s + case s := <-bw.AddTx: + bw.curTx += s case <-bw.Quit: break outer } @@ -94,8 +104,17 @@ outer: } func (bw *Bandwidth) averages(state []int) []float64 { - for i := 0; i < bw.max; i++ { - log.Println(bw.timeI + i) + r := []float64{} + var i int = 0 + total := 0 + for _, ti := range bw.dts { + for ; ; i++ { + if i == ti { + break + } + total += state[(bw.timeI+i)%bw.max] + } + r = append(r, float64(total)/float64(ti)) } - return nil + return r } diff --git a/bandwidth_test.go b/bandwidth_test.go index 01ff408..091d008 100644 --- a/bandwidth_test.go +++ b/bandwidth_test.go @@ -38,7 +38,74 @@ func TestEmptySeconds(t *testing.T) { } } -func TestA(t *testing.T) { - bw, _ := NewBandwidth([]int{1, 10, 30}, 1*time.Second) - log.Printf("%+v", bw) +func TestStreamSize(t *testing.T) { + bw, _ := NewBandwidth([]int{1, 2, 5}, 1*time.Second) + if len(bw.rxstream) != 5 { + t.Errorf("rxstream slice wrong length: %d, expected %d", len(bw.rxstream), 5) + } + if len(bw.txstream) != 5 { + t.Errorf("txstream slice wrong length: %d, expected %d", len(bw.rxstream), 5) + } +} + +func TestOnes(t *testing.T) { + bw, _ := NewBandwidth([]int{1, 2, 5}, 1*time.Second) + for i := 0; i < bw.max; i++ { + bw.rxstream[i] = 1.0 + } + avgs := bw.averages(bw.rxstream) + // try a large range of starting points: + for i := 0; i < 10; i++ { + bw.timeI = i + validate(t, avgs, []float64{1.0, 1.0, 1.0}) + } +} + +func TestManyOnes(t *testing.T) { + bw, _ := NewBandwidth([]int{1, 10, 60}, 1*time.Second) + for i := 0; i < bw.max; i++ { + bw.rxstream[i] = 1.0 + } + avgs := bw.averages(bw.rxstream) + // try a large range of starting points: + for i := -70; i < 70; i++ { + bw.timeI = i + validate(t, avgs, []float64{1.0, 1.0, 1.0}) + } +} + +func TestLinear(t *testing.T) { + bw, _ := NewBandwidth([]int{1, 10, 60}, 1*time.Second) + for i := 0; i < bw.max; i++ { + bw.rxstream[i] = i + } + avgs := bw.averages(bw.rxstream) + validate(t, avgs, []float64{0.0, 4.5, 29.5}) +} + +func TestInverseLinear(t *testing.T) { + bw, _ := NewBandwidth([]int{1, 10, 60}, 1*time.Second) + for i := 0; i < bw.max; i++ { + bw.rxstream[i] = bw.max - i + } + avgs := bw.averages(bw.rxstream) + validate(t, avgs, []float64{60.0, 55.5, 30.5}) +} + +func TestSpecific(t *testing.T) { + bw, _ := NewBandwidth([]int{1, 2, 10}, 1*time.Second) + bw.rxstream[8] = 1.0 + var avgs []float64 + + bw.timeI = 8 + avgs = bw.averages(bw.rxstream) + validate(t, avgs, []float64{1.0, 0.5, 0.1}) + + bw.timeI = 7 + avgs = bw.averages(bw.rxstream) + validate(t, avgs, []float64{0.0, 0.5, 0.1}) + + bw.timeI = 9 + avgs = bw.averages(bw.rxstream) + validate(t, avgs, []float64{0.0, 0.0, 0.1}) }