bandwidth -> bps, and behavior change

- bps.BPS only keeps track of rates of a single rate
- bps.BPS implements io.Writer
- removed chan semantics, interact via .Add or .Write
- fleshed out docs
This commit is contained in:
Stephen McQuay 2015-09-29 23:29:49 -07:00
parent 9afe8cd2ae
commit 0ad73b982e
3 changed files with 174 additions and 121 deletions

View File

@ -1,120 +1,146 @@
// Package bandwidth is a simple tool for keeping track of transmitted and // Package bps is a simple tool for keeping track of the rate of bytes
// received counts (ostensibly bytes). // transmitted
package bandwidth package bps
import ( import (
"errors" "errors"
"io/ioutil"
"sort" "sort"
"sync"
"time" "time"
) )
// Bandwidth keeps track of state for Rx and Tx (byte) counts. Instantiate // BPS keeps track of state for byte counts
// a Bandwidth then feed values using the AddRx and AddTx channels. When the //
// accumulated values are needed, simply read from the Rx or Tx chans. When the // Instantiate a BPS then feed bytes either via BPS.Add, or writing to it. When
// Bandwidth is no loner needed close the Quit chan. // the accumulated values are needed call BPS.Cur. When the BPS is no loner
type Bandwidth struct { // needed call BPS.Close
// write number of bytes to us type BPS struct {
AddRx chan int sync.RWMutex
AddTx chan int
// read stats from us quit chan interface{}
Rx chan []float64 closed chan interface{}
Tx chan []float64
Quit chan interface{} snapshot []float64
rxSnap []float64 dt time.Duration
txSnap []float64 dts []int64
dt time.Duration
dts []int
curRx int // curBs bytes read for this dt
curTx int curBs int64
rxstream []int // timeBuckets contains an entry for bytes read for each dt of time up to
txstream []int // the longest recoreded time slice.
timeBuckets []int64
// timeI keys into timeBuckets for the current point in time
timeI int timeI int
max int
// max is defined in New to be the maximum number of temporal buckets
// required.
max int64
} }
// NewBandwidth Returns a populated and ready to launch Bandwidth. seconds is // New Returns a populated and ready to launch BPS. dts is
// a slice of multiples of dt on which to report (e.g. 1x, 10x, 60x dt). dt is // a slice of multiples of dt on which to report (e.g. 1x, 10x, 60x dt). dt is
// also how often the values used to send to Rx and Tx are updated. // also how often the values used to send to Rx and Tx are updated.
func NewBandwidth(dts []int, dt time.Duration) (*Bandwidth, error) { func New(dts []int, dt time.Duration) (*BPS, error) {
if len(dts) < 1 { if len(dts) < 1 {
return nil, errors.New("must specify at least one interval lenght") return nil, errors.New("must specify at least one interval lenght")
} }
sort.Ints(dts) sort.Ints(dts)
max := dts[len(dts)-1] convertedDts := []int64{}
r := &Bandwidth{ for _, dt := range dts {
AddRx: make(chan int, 1024), convertedDts = append(convertedDts, int64(dt))
AddTx: make(chan int, 1024),
Rx: make(chan []float64),
Tx: make(chan []float64),
dt: dt,
dts: dts,
Quit: make(chan interface{}),
rxstream: make([]int, max),
txstream: make([]int, max),
max: max,
} }
max := convertedDts[len(convertedDts)-1]
r := &BPS{
dt: dt,
dts: convertedDts,
quit: make(chan interface{}),
closed: make(chan interface{}),
timeBuckets: make([]int64, max),
max: max,
}
go r.run()
return r, nil return r, nil
} }
// Run is a method of Bandwidth that must be started in a goroutine in order func (b *BPS) runLoop() {
// for things to be functional. t := time.NewTicker(b.dt)
func (bw *Bandwidth) Run() {
t := time.NewTicker(bw.dt)
outer:
for { for {
select { select {
case <-t.C: case <-t.C:
bw.rxstream[bw.timeI] = bw.curRx b.Lock()
bw.txstream[bw.timeI] = bw.curTx
bw.rxSnap = bw.averages(bw.rxstream) b.timeBuckets[b.timeI] = b.curBs
bw.txSnap = bw.averages(bw.txstream)
bw.curTx = 0 b.snapshot = b.averages(b.timeBuckets)
bw.curRx = 0
// n.b.: here we march forward through time by going backward in b.curBs = 0
// our slice.
bw.timeI = (bw.timeI - 1) % bw.max // Here we march forward through time by going backward in our
// lol: because modulo does unexpected things for negative numbers. // slice.
if bw.timeI < 0 { b.timeI = (b.timeI - 1) % int(b.max)
bw.timeI = bw.timeI + bw.max // because modulo does unexpected things for negative numbers.
if b.timeI < 0 {
b.timeI = b.timeI + int(b.max)
} }
// log.Printf("%d %+v", bw.timeI, bw.rxstream) b.Unlock()
case bw.Rx <- bw.rxSnap: case <-b.quit:
case bw.Tx <- bw.txSnap: return
case s := <-bw.AddRx:
bw.curRx += s
case s := <-bw.AddTx:
bw.curTx += s
case <-bw.Quit:
break outer
} }
} }
close(bw.AddRx)
close(bw.AddTx)
close(bw.Rx)
close(bw.Tx)
} }
func (bw *Bandwidth) averages(state []int) []float64 { // Run is a method of BPS that must be started in a goroutine in order
// for things to be functional.
func (b *BPS) run() {
b.runLoop()
close(b.closed)
}
func (b *BPS) averages(state []int64) []float64 {
r := []float64{} r := []float64{}
var i int = 0 var i int64 = 0
total := 0 var total int64 = 0
for _, ti := range bw.dts { for _, ti := range b.dts {
for ; ; i++ { for ; ; i++ {
if i == ti { if i == ti {
break break
} }
total += state[(bw.timeI+i)%bw.max] total += state[(int64(b.timeI)+i)%b.max]
} }
r = append(r, float64(total)/float64(ti)) r = append(r, float64(total)/float64(ti))
} }
return r return r
} }
func (b *BPS) Write(p []byte) (int, error) {
n, err := ioutil.Discard.Write(p)
b.Add(int64(n))
return n, err
}
func (b *BPS) Add(i int64) {
b.Lock()
b.curBs += i
b.timeBuckets[b.timeI] = b.curBs
b.snapshot = b.averages(b.timeBuckets)
b.Unlock()
}
func (b *BPS) Cur() []float64 {
r := make([]float64, len(b.dts))
b.Lock()
for i := range b.snapshot {
r[i] = b.snapshot[i]
}
b.Unlock()
return r
}
func (b *BPS) Close() {
close(b.quit)
<-b.closed
}

View File

@ -1,6 +1,8 @@
package bandwidth package bps
import ( import (
"bytes"
"io"
"log" "log"
"testing" "testing"
"time" "time"
@ -22,90 +24,119 @@ func validate(t *testing.T, actual, expected []float64) {
} }
func TestEmpty(t *testing.T) { func TestEmpty(t *testing.T) {
bw, err := NewBandwidth([]int{1, 10, 60}, 100*time.Millisecond) bw, err := New([]int{1, 10, 60}, 100*time.Second)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
go bw.Run() bw.timeBuckets = []int64{1, 10, 60}
bw.rxstream = []int{1, 10, 60} validate(t, bw.Cur(), []float64{0, 0, 0})
validate(t, <-bw.Rx, []float64{}) bw.Close()
} }
func TestEmptySeconds(t *testing.T) { func TestEmptySeconds(t *testing.T) {
_, err := NewBandwidth([]int{}, 100*time.Millisecond) _, err := New([]int{}, 100*time.Second)
if err == nil { if err == nil {
t.Error(err) t.Errorf("got no error wanted one: %v")
} }
} }
func TestStreamSize(t *testing.T) { func TestTimeBucketsSize(t *testing.T) {
bw, _ := NewBandwidth([]int{1, 2, 5}, 1*time.Second) bw, _ := New([]int{1, 2, 5}, 1*time.Second)
if len(bw.rxstream) != 5 { if len(bw.timeBuckets) != 5 {
t.Errorf("rxstream slice wrong length: %d, expected %d", len(bw.rxstream), 5) t.Errorf("buckets slice wrong length: %d, expected %d", len(bw.timeBuckets), 5)
}
if len(bw.txstream) != 5 {
t.Errorf("txstream slice wrong length: %d, expected %d", len(bw.rxstream), 5)
} }
bw.Close()
} }
func TestOnes(t *testing.T) { func TestOnes(t *testing.T) {
bw, _ := NewBandwidth([]int{1, 2, 5}, 1*time.Second) bw, _ := New([]int{1, 2, 5}, 1*time.Second)
for i := 0; i < bw.max; i++ { var i int64
bw.rxstream[i] = 1.0 for i = 0; i < bw.max; i++ {
bw.timeBuckets[i] = 1.0
} }
avgs := bw.averages(bw.rxstream) avgs := bw.averages(bw.timeBuckets)
// try a large range of starting points: // try a large range of starting points:
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
bw.timeI = i bw.timeI = i
validate(t, avgs, []float64{1.0, 1.0, 1.0}) validate(t, avgs, []float64{1.0, 1.0, 1.0})
} }
bw.Close()
} }
func TestManyOnes(t *testing.T) { func TestManyOnes(t *testing.T) {
bw, _ := NewBandwidth([]int{1, 10, 60}, 1*time.Second) bw, _ := New([]int{1, 10, 60}, 1*time.Second)
for i := 0; i < bw.max; i++ { var i int64
bw.rxstream[i] = 1.0 for i = 0; i < bw.max; i++ {
bw.timeBuckets[i] = 1.0
} }
avgs := bw.averages(bw.rxstream) avgs := bw.averages(bw.timeBuckets)
// try a large range of starting points: // try a large range of starting points:
for i := -70; i < 70; i++ { for i := -70; i < 70; i++ {
bw.timeI = i bw.timeI = i
validate(t, avgs, []float64{1.0, 1.0, 1.0}) validate(t, avgs, []float64{1.0, 1.0, 1.0})
} }
bw.Close()
} }
func TestLinear(t *testing.T) { func TestLinear(t *testing.T) {
bw, _ := NewBandwidth([]int{1, 10, 60}, 1*time.Second) bw, _ := New([]int{1, 10, 60}, 1*time.Second)
for i := 0; i < bw.max; i++ { var i int64
bw.rxstream[i] = i for i = 0; i < bw.max; i++ {
bw.timeBuckets[i] = i
} }
avgs := bw.averages(bw.rxstream) avgs := bw.averages(bw.timeBuckets)
validate(t, avgs, []float64{0.0, 4.5, 29.5}) validate(t, avgs, []float64{0.0, 4.5, 29.5})
bw.Close()
} }
func TestInverseLinear(t *testing.T) { func TestInverseLinear(t *testing.T) {
bw, _ := NewBandwidth([]int{1, 10, 60}, 1*time.Second) bw, _ := New([]int{1, 10, 60}, 1*time.Second)
for i := 0; i < bw.max; i++ { var i int64
bw.rxstream[i] = bw.max - i for i = 0; i < bw.max; i++ {
bw.timeBuckets[i] = bw.max - i
} }
avgs := bw.averages(bw.rxstream) avgs := bw.averages(bw.timeBuckets)
validate(t, avgs, []float64{60.0, 55.5, 30.5}) validate(t, avgs, []float64{60.0, 55.5, 30.5})
bw.Close()
} }
func TestSpecific(t *testing.T) { func TestSpecific(t *testing.T) {
bw, _ := NewBandwidth([]int{1, 2, 10}, 1*time.Second) bw, _ := New([]int{1, 2, 10}, 1*time.Second)
bw.rxstream[8] = 1.0 bw.timeBuckets[8] = 1.0
var avgs []float64 var avgs []float64
bw.timeI = 8 bw.timeI = 8
avgs = bw.averages(bw.rxstream) avgs = bw.averages(bw.timeBuckets)
validate(t, avgs, []float64{1.0, 0.5, 0.1}) validate(t, avgs, []float64{1.0, 0.5, 0.1})
bw.timeI = 7 bw.timeI = 7
avgs = bw.averages(bw.rxstream) avgs = bw.averages(bw.timeBuckets)
validate(t, avgs, []float64{0.0, 0.5, 0.1}) validate(t, avgs, []float64{0.0, 0.5, 0.1})
bw.timeI = 9 bw.timeI = 9
avgs = bw.averages(bw.rxstream) avgs = bw.averages(bw.timeBuckets)
validate(t, avgs, []float64{0.0, 0.0, 0.1}) validate(t, avgs, []float64{0.0, 0.0, 0.1})
bw.Close()
}
func TestWriter(t *testing.T) {
t.Parallel()
bw, _ := New([]int{1, 10, 100}, 1*time.Second)
defer bw.Close()
b := &bytes.Buffer{}
b.Write([]byte("helloooooooooooooooooooooooooooooooooooooooooooooo"))
io.Copy(bw, b)
validate(t, bw.Cur(), []float64{50, 5, 0.5})
time.Sleep(3 * time.Second)
validate(t, bw.Cur(), []float64{0, 5, 0.5})
}
func TestAdd(t *testing.T) {
t.Parallel()
bw, _ := New([]int{1, 10, 100}, 1*time.Second)
bw.Add(314)
validate(t, bw.Cur(), []float64{314, 31.4, 3.14})
time.Sleep(3 * time.Second)
validate(t, bw.Cur(), []float64{0, 31.4, 3.14})
bw.Close()
} }

View File

@ -1,18 +1,14 @@
bandwidth # bps
=========
`bandwidth` is a little package to be used in collecting and reporting on bandwidth numbers. `bps` is used to keep track of rates
It is intended to be used as such: It is intended to be used as such:
// report on 1, 10, 60 second intervals, update every second
```go bw, _ := NewBandwidth([]int{1, 10, 60}, time.Second)
// report on 1, 10, 60 second intervals, update every 100 milliseconds bw.Add(10)
bw, _ := NewBandwidth([]int{1, 10, 60}, 100*time.Millisecond) b := &bytes.Buffer{}
go bw.Run() b.Write([]byte("helloooooooooooooooooooooooooooooooooooooooooooooo"))
bw.AddRx <- 10 io.Copy(bw, b)
bw.AddTx <- 10
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
log.Printf("%+v", <-bw.Rx) log.Printf("%+v", bw.Cur())
log.Printf("%+v", <-bw.Tx)
```