Fixed the memory leaks using circular buffer.

This commit is contained in:
Stephen McQuay 2014-03-08 18:07:43 -08:00
parent 4f912b29ba
commit f161a60e37
2 changed files with 99 additions and 13 deletions

View File

@ -4,7 +4,6 @@ package bandwidth
import ( import (
"errors" "errors"
"log"
"sort" "sort"
"time" "time"
) )
@ -27,6 +26,7 @@ type Bandwidth struct {
rxSnap []float64 rxSnap []float64
txSnap []float64 txSnap []float64
dt time.Duration dt time.Duration
dts []int
curRx int curRx int
curTx int curTx int
@ -39,8 +39,8 @@ type Bandwidth struct {
} }
// NewBandwidth Returns a populated and ready to launch Bandwidth. seconds is // NewBandwidth Returns a populated and ready to launch Bandwidth. seconds is
// a slice of seconds on which to report (e.g. 1, 10, 60 seconds). dt is how // a slice of multiples of dt on which to report (e.g. 1x, 10x, 60x dt). dt is
// often the values used to send to Rx and Tx are updated. // also how often the values used to send to Rx and Tx are updated.
func NewBandwidth(dts []int, dt time.Duration) (*Bandwidth, error) { func NewBandwidth(dts []int, dt time.Duration) (*Bandwidth, error) {
if len(dts) < 1 { if len(dts) < 1 {
return nil, errors.New("must specify at least one interval lenght") return nil, errors.New("must specify at least one interval lenght")
@ -53,6 +53,7 @@ func NewBandwidth(dts []int, dt time.Duration) (*Bandwidth, error) {
Rx: make(chan []float64), Rx: make(chan []float64),
Tx: make(chan []float64), Tx: make(chan []float64),
dt: dt, dt: dt,
dts: dts,
Quit: make(chan interface{}), Quit: make(chan interface{}),
rxstream: make([]int, max), rxstream: make([]int, max),
txstream: make([]int, max), txstream: make([]int, max),
@ -71,18 +72,27 @@ outer:
case <-t.C: case <-t.C:
bw.rxstream[bw.timeI] = bw.curRx bw.rxstream[bw.timeI] = bw.curRx
bw.txstream[bw.timeI] = bw.curTx bw.txstream[bw.timeI] = bw.curTx
bw.rxSnap = bw.averages(bw.rxstream) bw.rxSnap = bw.averages(bw.rxstream)
bw.txSnap = bw.averages(bw.txstream) bw.txSnap = bw.averages(bw.txstream)
bw.curTx = 0 bw.curTx = 0
bw.curRx = 0 bw.curRx = 0
bw.timeI += 1
// testing hg branching // n.b.: here we march forward through time by going backward in
// our slice.
bw.timeI = (bw.timeI - 1) % bw.max
// lol: because modulo does unexpected things for negative numbers.
if bw.timeI < 0 {
bw.timeI = bw.timeI + bw.max
}
// log.Printf("%d %+v", bw.timeI, bw.rxstream)
case bw.Rx <- bw.rxSnap: case bw.Rx <- bw.rxSnap:
case bw.Tx <- bw.txSnap: case bw.Tx <- bw.txSnap:
case s := <-bw.AddRx: case s := <-bw.AddRx:
bw.curTx += s
case s := <-bw.AddTx:
bw.curRx += s bw.curRx += s
case s := <-bw.AddTx:
bw.curTx += s
case <-bw.Quit: case <-bw.Quit:
break outer break outer
} }
@ -94,8 +104,17 @@ outer:
} }
func (bw *Bandwidth) averages(state []int) []float64 { func (bw *Bandwidth) averages(state []int) []float64 {
for i := 0; i < bw.max; i++ { r := []float64{}
log.Println(bw.timeI + i) var i int = 0
total := 0
for _, ti := range bw.dts {
for ; ; i++ {
if i == ti {
break
}
total += state[(bw.timeI+i)%bw.max]
}
r = append(r, float64(total)/float64(ti))
} }
return nil return r
} }

View File

@ -38,7 +38,74 @@ func TestEmptySeconds(t *testing.T) {
} }
} }
func TestA(t *testing.T) { func TestStreamSize(t *testing.T) {
bw, _ := NewBandwidth([]int{1, 10, 30}, 1*time.Second) bw, _ := NewBandwidth([]int{1, 2, 5}, 1*time.Second)
log.Printf("%+v", bw) if len(bw.rxstream) != 5 {
t.Errorf("rxstream slice wrong length: %d, expected %d", len(bw.rxstream), 5)
}
if len(bw.txstream) != 5 {
t.Errorf("txstream slice wrong length: %d, expected %d", len(bw.rxstream), 5)
}
}
func TestOnes(t *testing.T) {
bw, _ := NewBandwidth([]int{1, 2, 5}, 1*time.Second)
for i := 0; i < bw.max; i++ {
bw.rxstream[i] = 1.0
}
avgs := bw.averages(bw.rxstream)
// try a large range of starting points:
for i := 0; i < 10; i++ {
bw.timeI = i
validate(t, avgs, []float64{1.0, 1.0, 1.0})
}
}
func TestManyOnes(t *testing.T) {
bw, _ := NewBandwidth([]int{1, 10, 60}, 1*time.Second)
for i := 0; i < bw.max; i++ {
bw.rxstream[i] = 1.0
}
avgs := bw.averages(bw.rxstream)
// try a large range of starting points:
for i := -70; i < 70; i++ {
bw.timeI = i
validate(t, avgs, []float64{1.0, 1.0, 1.0})
}
}
func TestLinear(t *testing.T) {
bw, _ := NewBandwidth([]int{1, 10, 60}, 1*time.Second)
for i := 0; i < bw.max; i++ {
bw.rxstream[i] = i
}
avgs := bw.averages(bw.rxstream)
validate(t, avgs, []float64{0.0, 4.5, 29.5})
}
func TestInverseLinear(t *testing.T) {
bw, _ := NewBandwidth([]int{1, 10, 60}, 1*time.Second)
for i := 0; i < bw.max; i++ {
bw.rxstream[i] = bw.max - i
}
avgs := bw.averages(bw.rxstream)
validate(t, avgs, []float64{60.0, 55.5, 30.5})
}
func TestSpecific(t *testing.T) {
bw, _ := NewBandwidth([]int{1, 2, 10}, 1*time.Second)
bw.rxstream[8] = 1.0
var avgs []float64
bw.timeI = 8
avgs = bw.averages(bw.rxstream)
validate(t, avgs, []float64{1.0, 0.5, 0.1})
bw.timeI = 7
avgs = bw.averages(bw.rxstream)
validate(t, avgs, []float64{0.0, 0.5, 0.1})
bw.timeI = 9
avgs = bw.averages(bw.rxstream)
validate(t, avgs, []float64{0.0, 0.0, 0.1})
} }