From 0ad73b982edb427aec23c6cf13269b4a471337bf Mon Sep 17 00:00:00 2001 From: stephen mcquay Date: Tue, 29 Sep 2015 23:29:49 -0700 Subject: [PATCH] bandwidth -> bps, and behavior change - bps.BPS only keeps track of rates of a single rate - bps.BPS implements io.Writer - removed chan semantics, interact via .Add or .Write - fleshed out docs --- bandwidth.go | 172 ++++++++++++++++++++++++++-------------------- bandwidth_test.go | 101 +++++++++++++++++---------- readme.md | 22 +++--- 3 files changed, 174 insertions(+), 121 deletions(-) diff --git a/bandwidth.go b/bandwidth.go index d37195f..2dac500 100644 --- a/bandwidth.go +++ b/bandwidth.go @@ -1,120 +1,146 @@ -// Package bandwidth is a simple tool for keeping track of transmitted and -// received counts (ostensibly bytes). -package bandwidth +// Package bps is a simple tool for keeping track of the rate of bytes +// transmitted +package bps import ( "errors" + "io/ioutil" "sort" + "sync" "time" ) -// Bandwidth keeps track of state for Rx and Tx (byte) counts. Instantiate -// a Bandwidth then feed values using the AddRx and AddTx channels. When the -// accumulated values are needed, simply read from the Rx or Tx chans. When the -// Bandwidth is no loner needed close the Quit chan. -type Bandwidth struct { - // write number of bytes to us - AddRx chan int - AddTx chan int +// BPS keeps track of state for byte counts +// +// Instantiate a BPS then feed bytes either via BPS.Add, or writing to it. When +// the accumulated values are needed call BPS.Cur. When the BPS is no loner +// needed call BPS.Close +type BPS struct { + sync.RWMutex - // read stats from us - Rx chan []float64 - Tx chan []float64 + quit chan interface{} + closed chan interface{} - Quit chan interface{} + snapshot []float64 - rxSnap []float64 - txSnap []float64 - dt time.Duration - dts []int + dt time.Duration + dts []int64 - curRx int - curTx int + // curBs bytes read for this dt + curBs int64 - rxstream []int - txstream []int + // timeBuckets contains an entry for bytes read for each dt of time up to + // the longest recoreded time slice. + timeBuckets []int64 + // timeI keys into timeBuckets for the current point in time timeI int - max int + + // max is defined in New to be the maximum number of temporal buckets + // required. + max int64 } -// NewBandwidth Returns a populated and ready to launch Bandwidth. seconds is +// New Returns a populated and ready to launch BPS. dts is // a slice of multiples of dt on which to report (e.g. 1x, 10x, 60x dt). dt is // also how often the values used to send to Rx and Tx are updated. -func NewBandwidth(dts []int, dt time.Duration) (*Bandwidth, error) { +func New(dts []int, dt time.Duration) (*BPS, error) { if len(dts) < 1 { return nil, errors.New("must specify at least one interval lenght") } sort.Ints(dts) - max := dts[len(dts)-1] - r := &Bandwidth{ - AddRx: make(chan int, 1024), - AddTx: make(chan int, 1024), - Rx: make(chan []float64), - Tx: make(chan []float64), - dt: dt, - dts: dts, - Quit: make(chan interface{}), - rxstream: make([]int, max), - txstream: make([]int, max), - max: max, + convertedDts := []int64{} + for _, dt := range dts { + convertedDts = append(convertedDts, int64(dt)) } + max := convertedDts[len(convertedDts)-1] + r := &BPS{ + dt: dt, + dts: convertedDts, + quit: make(chan interface{}), + closed: make(chan interface{}), + timeBuckets: make([]int64, max), + max: max, + } + go r.run() return r, nil } -// Run is a method of Bandwidth that must be started in a goroutine in order -// for things to be functional. -func (bw *Bandwidth) Run() { - t := time.NewTicker(bw.dt) -outer: +func (b *BPS) runLoop() { + t := time.NewTicker(b.dt) for { select { case <-t.C: - bw.rxstream[bw.timeI] = bw.curRx - bw.txstream[bw.timeI] = bw.curTx + b.Lock() - bw.rxSnap = bw.averages(bw.rxstream) - bw.txSnap = bw.averages(bw.txstream) + b.timeBuckets[b.timeI] = b.curBs - bw.curTx = 0 - bw.curRx = 0 + b.snapshot = b.averages(b.timeBuckets) - // n.b.: here we march forward through time by going backward in - // our slice. - bw.timeI = (bw.timeI - 1) % bw.max - // lol: because modulo does unexpected things for negative numbers. - if bw.timeI < 0 { - bw.timeI = bw.timeI + bw.max + b.curBs = 0 + + // Here we march forward through time by going backward in our + // slice. + b.timeI = (b.timeI - 1) % int(b.max) + // because modulo does unexpected things for negative numbers. + if b.timeI < 0 { + b.timeI = b.timeI + int(b.max) } - // log.Printf("%d %+v", bw.timeI, bw.rxstream) - case bw.Rx <- bw.rxSnap: - case bw.Tx <- bw.txSnap: - case s := <-bw.AddRx: - bw.curRx += s - case s := <-bw.AddTx: - bw.curTx += s - case <-bw.Quit: - break outer + b.Unlock() + case <-b.quit: + return } } - close(bw.AddRx) - close(bw.AddTx) - close(bw.Rx) - close(bw.Tx) } -func (bw *Bandwidth) averages(state []int) []float64 { +// Run is a method of BPS that must be started in a goroutine in order +// for things to be functional. +func (b *BPS) run() { + b.runLoop() + close(b.closed) +} + +func (b *BPS) averages(state []int64) []float64 { r := []float64{} - var i int = 0 - total := 0 - for _, ti := range bw.dts { + var i int64 = 0 + var total int64 = 0 + for _, ti := range b.dts { for ; ; i++ { if i == ti { break } - total += state[(bw.timeI+i)%bw.max] + total += state[(int64(b.timeI)+i)%b.max] } r = append(r, float64(total)/float64(ti)) } return r } + +func (b *BPS) Write(p []byte) (int, error) { + n, err := ioutil.Discard.Write(p) + b.Add(int64(n)) + return n, err +} + +func (b *BPS) Add(i int64) { + b.Lock() + b.curBs += i + b.timeBuckets[b.timeI] = b.curBs + b.snapshot = b.averages(b.timeBuckets) + b.Unlock() +} + +func (b *BPS) Cur() []float64 { + r := make([]float64, len(b.dts)) + b.Lock() + for i := range b.snapshot { + r[i] = b.snapshot[i] + } + b.Unlock() + return r +} + +func (b *BPS) Close() { + close(b.quit) + <-b.closed +} diff --git a/bandwidth_test.go b/bandwidth_test.go index 091d008..9ff90ae 100644 --- a/bandwidth_test.go +++ b/bandwidth_test.go @@ -1,6 +1,8 @@ -package bandwidth +package bps import ( + "bytes" + "io" "log" "testing" "time" @@ -22,90 +24,119 @@ func validate(t *testing.T, actual, expected []float64) { } func TestEmpty(t *testing.T) { - bw, err := NewBandwidth([]int{1, 10, 60}, 100*time.Millisecond) + bw, err := New([]int{1, 10, 60}, 100*time.Second) if err != nil { t.Error(err) } - go bw.Run() - bw.rxstream = []int{1, 10, 60} - validate(t, <-bw.Rx, []float64{}) + bw.timeBuckets = []int64{1, 10, 60} + validate(t, bw.Cur(), []float64{0, 0, 0}) + bw.Close() } func TestEmptySeconds(t *testing.T) { - _, err := NewBandwidth([]int{}, 100*time.Millisecond) + _, err := New([]int{}, 100*time.Second) if err == nil { - t.Error(err) + t.Errorf("got no error wanted one: %v") } } -func TestStreamSize(t *testing.T) { - bw, _ := NewBandwidth([]int{1, 2, 5}, 1*time.Second) - if len(bw.rxstream) != 5 { - t.Errorf("rxstream slice wrong length: %d, expected %d", len(bw.rxstream), 5) - } - if len(bw.txstream) != 5 { - t.Errorf("txstream slice wrong length: %d, expected %d", len(bw.rxstream), 5) +func TestTimeBucketsSize(t *testing.T) { + bw, _ := New([]int{1, 2, 5}, 1*time.Second) + if len(bw.timeBuckets) != 5 { + t.Errorf("buckets slice wrong length: %d, expected %d", len(bw.timeBuckets), 5) } + bw.Close() } func TestOnes(t *testing.T) { - bw, _ := NewBandwidth([]int{1, 2, 5}, 1*time.Second) - for i := 0; i < bw.max; i++ { - bw.rxstream[i] = 1.0 + bw, _ := New([]int{1, 2, 5}, 1*time.Second) + var i int64 + for i = 0; i < bw.max; i++ { + bw.timeBuckets[i] = 1.0 } - avgs := bw.averages(bw.rxstream) + avgs := bw.averages(bw.timeBuckets) // try a large range of starting points: for i := 0; i < 10; i++ { bw.timeI = i validate(t, avgs, []float64{1.0, 1.0, 1.0}) } + bw.Close() } func TestManyOnes(t *testing.T) { - bw, _ := NewBandwidth([]int{1, 10, 60}, 1*time.Second) - for i := 0; i < bw.max; i++ { - bw.rxstream[i] = 1.0 + bw, _ := New([]int{1, 10, 60}, 1*time.Second) + var i int64 + for i = 0; i < bw.max; i++ { + bw.timeBuckets[i] = 1.0 } - avgs := bw.averages(bw.rxstream) + avgs := bw.averages(bw.timeBuckets) // try a large range of starting points: for i := -70; i < 70; i++ { bw.timeI = i validate(t, avgs, []float64{1.0, 1.0, 1.0}) } + bw.Close() } func TestLinear(t *testing.T) { - bw, _ := NewBandwidth([]int{1, 10, 60}, 1*time.Second) - for i := 0; i < bw.max; i++ { - bw.rxstream[i] = i + bw, _ := New([]int{1, 10, 60}, 1*time.Second) + var i int64 + for i = 0; i < bw.max; i++ { + bw.timeBuckets[i] = i } - avgs := bw.averages(bw.rxstream) + avgs := bw.averages(bw.timeBuckets) validate(t, avgs, []float64{0.0, 4.5, 29.5}) + bw.Close() } func TestInverseLinear(t *testing.T) { - bw, _ := NewBandwidth([]int{1, 10, 60}, 1*time.Second) - for i := 0; i < bw.max; i++ { - bw.rxstream[i] = bw.max - i + bw, _ := New([]int{1, 10, 60}, 1*time.Second) + var i int64 + for i = 0; i < bw.max; i++ { + bw.timeBuckets[i] = bw.max - i } - avgs := bw.averages(bw.rxstream) + avgs := bw.averages(bw.timeBuckets) validate(t, avgs, []float64{60.0, 55.5, 30.5}) + bw.Close() } func TestSpecific(t *testing.T) { - bw, _ := NewBandwidth([]int{1, 2, 10}, 1*time.Second) - bw.rxstream[8] = 1.0 + bw, _ := New([]int{1, 2, 10}, 1*time.Second) + bw.timeBuckets[8] = 1.0 var avgs []float64 bw.timeI = 8 - avgs = bw.averages(bw.rxstream) + avgs = bw.averages(bw.timeBuckets) validate(t, avgs, []float64{1.0, 0.5, 0.1}) bw.timeI = 7 - avgs = bw.averages(bw.rxstream) + avgs = bw.averages(bw.timeBuckets) validate(t, avgs, []float64{0.0, 0.5, 0.1}) bw.timeI = 9 - avgs = bw.averages(bw.rxstream) + avgs = bw.averages(bw.timeBuckets) validate(t, avgs, []float64{0.0, 0.0, 0.1}) + bw.Close() +} + +func TestWriter(t *testing.T) { + t.Parallel() + bw, _ := New([]int{1, 10, 100}, 1*time.Second) + defer bw.Close() + b := &bytes.Buffer{} + b.Write([]byte("helloooooooooooooooooooooooooooooooooooooooooooooo")) + io.Copy(bw, b) + validate(t, bw.Cur(), []float64{50, 5, 0.5}) + time.Sleep(3 * time.Second) + validate(t, bw.Cur(), []float64{0, 5, 0.5}) +} + +func TestAdd(t *testing.T) { + t.Parallel() + bw, _ := New([]int{1, 10, 100}, 1*time.Second) + bw.Add(314) + validate(t, bw.Cur(), []float64{314, 31.4, 3.14}) + time.Sleep(3 * time.Second) + validate(t, bw.Cur(), []float64{0, 31.4, 3.14}) + bw.Close() } diff --git a/readme.md b/readme.md index 7921250..d19609a 100644 --- a/readme.md +++ b/readme.md @@ -1,18 +1,14 @@ -bandwidth -========= +# bps -`bandwidth` is a little package to be used in collecting and reporting on bandwidth numbers. +`bps` is used to keep track of rates It is intended to be used as such: - -```go - // report on 1, 10, 60 second intervals, update every 100 milliseconds - bw, _ := NewBandwidth([]int{1, 10, 60}, 100*time.Millisecond) - go bw.Run() - bw.AddRx <- 10 - bw.AddTx <- 10 + // report on 1, 10, 60 second intervals, update every second + bw, _ := NewBandwidth([]int{1, 10, 60}, time.Second) + bw.Add(10) + b := &bytes.Buffer{} + b.Write([]byte("helloooooooooooooooooooooooooooooooooooooooooooooo")) + io.Copy(bw, b) time.Sleep(1 * time.Second) - log.Printf("%+v", <-bw.Rx) - log.Printf("%+v", <-bw.Tx) -``` + log.Printf("%+v", bw.Cur())