From a23ebdb44bcf4d5da605370c904c6656ce875790 Mon Sep 17 00:00:00 2001 From: stephen mcquay Date: Wed, 30 Sep 2015 22:57:14 -0700 Subject: [PATCH] further simplified implementation --- bandwidth.go | 107 ++++++++++++-------------------- bandwidth_test.go | 154 ++++++++++++++-------------------------------- 2 files changed, 83 insertions(+), 178 deletions(-) diff --git a/bandwidth.go b/bandwidth.go index 745e6bf..a3b9e3c 100644 --- a/bandwidth.go +++ b/bandwidth.go @@ -3,13 +3,16 @@ package bps import ( - "errors" + "fmt" "io/ioutil" - "sort" "sync" "time" ) +const ( + minResolution = 3 +) + // BPS keeps track of state for byte counts // // Instantiate a BPS then feed bytes either via BPS.Add, or writing to it. When @@ -21,46 +24,40 @@ type BPS struct { quit chan interface{} closed chan interface{} - snapshot []float64 - - dt time.Duration - dts []int64 + interval time.Duration + dt time.Duration // curBs bytes read for this dt curBs int64 - // timeBuckets contains an entry for bytes read for each dt of time up to - // the longest recoreded time slice. - timeBuckets []int64 + // buckets contains an entry for bytes read for each dt of time in interval + buckets []int64 - // timeI keys into timeBuckets for the current point in time + // timeI keys into buckets for the current point in time timeI int - - // max is defined in New to be the maximum number of temporal buckets - // required. - max int64 } -// New Returns a populated and ready to launch BPS. dts is -// a slice of multiples of dt on which to report (e.g. 1x, 10x, 60x dt). dt is -// also how often the values used to send out in Cur are updated. -func New(dts []int, dt time.Duration) (*BPS, error) { - if len(dts) < 1 { - return nil, errors.New("must specify at least one interval lenght") +// New Returns a populated and ready to use BPS. interval is the amount of time +// (for example 60 seconds) over which to track byte flow (bytes/second for the +// last interval), and resolution is used in the following calculation: +// +// dt = interval / resolution (s) +// +// where the dt is the temporal resolution of the updates (add or remove +// information every dt). +func New(interval time.Duration, resolution uint) (*BPS, error) { + if resolution < minResolution { + return nil, fmt.Errorf("resolution must be larger than %d", minResolution) } - sort.Ints(dts) - convertedDts := []int64{} - for _, dt := range dts { - convertedDts = append(convertedDts, int64(dt)) - } - max := convertedDts[len(convertedDts)-1] + + dtns := interval.Nanoseconds() / int64(resolution) + dt := time.Duration(dtns) * time.Nanosecond r := &BPS{ - dt: dt, - dts: convertedDts, - quit: make(chan interface{}), - closed: make(chan interface{}), - timeBuckets: make([]int64, max), - max: max, + interval: interval, + dt: dt, + quit: make(chan interface{}), + closed: make(chan interface{}), + buckets: make([]int64, resolution), } go r.run() return r, nil @@ -72,20 +69,9 @@ func (b *BPS) runLoop() { select { case <-t.C: b.Lock() - - b.timeBuckets[b.timeI] = b.curBs - - b.snapshot = b.averages(b.timeBuckets) - + b.buckets[b.timeI] = b.curBs b.curBs = 0 - - // Here we march forward through time by going backward in our - // slice. - b.timeI = (b.timeI - 1) % int(b.max) - // because modulo does unexpected things for negative numbers. - if b.timeI < 0 { - b.timeI = b.timeI + int(b.max) - } + b.timeI = (b.timeI + 1) % int(len(b.buckets)) b.Unlock() case <-b.quit: return @@ -100,22 +86,6 @@ func (b *BPS) run() { close(b.closed) } -func (b *BPS) averages(state []int64) []float64 { - r := []float64{} - var i int64 = 0 - var total int64 = 0 - for _, ti := range b.dts { - for ; ; i++ { - if i == ti { - break - } - total += state[(int64(b.timeI)+i)%b.max] - } - r = append(r, float64(total)/float64(ti)) - } - return r -} - // Write implements io.Writer so that one can simply write bytes to the struct. func (b *BPS) Write(p []byte) (int, error) { n, err := ioutil.Discard.Write(p) @@ -127,20 +97,19 @@ func (b *BPS) Write(p []byte) (int, error) { func (b *BPS) Add(i int64) { b.Lock() b.curBs += i - b.timeBuckets[b.timeI] = b.curBs - b.snapshot = b.averages(b.timeBuckets) + b.buckets[b.timeI] = b.curBs b.Unlock() } -// Cur returns a slice containing the currenly tracked rates. -func (b *BPS) Cur() []float64 { - r := make([]float64, len(b.dts)) +// Rate returns the current rate (bytes / second). +func (b *BPS) Rate() float64 { b.Lock() - for i := range b.snapshot { - r[i] = b.snapshot[i] + var total int64 = 0 + for _, b := range b.buckets { + total += b } b.Unlock() - return r + return float64(total) / b.interval.Seconds() } // Close cleans up and shuts down a BPS. diff --git a/bandwidth_test.go b/bandwidth_test.go index 9ff90ae..0ba189b 100644 --- a/bandwidth_test.go +++ b/bandwidth_test.go @@ -12,131 +12,67 @@ func init() { log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) } -func validate(t *testing.T, actual, expected []float64) { - if len(actual) != len(expected) { - t.Errorf("len is not same: %d expected %d", len(actual), len(expected)) - } - for i, _ := range actual { - if actual[i] != expected[i] { - t.Errorf("%dth: got %f expected %f", i, actual[i], expected[i]) - } - } -} - -func TestEmpty(t *testing.T) { - bw, err := New([]int{1, 10, 60}, 100*time.Second) +func TestSimple(t *testing.T) { + bw, err := New(1*time.Second, 100) if err != nil { - t.Error(err) + t.Fatalf("failure to make reasonable BPS: %v", err) } - bw.timeBuckets = []int64{1, 10, 60} - validate(t, bw.Cur(), []float64{0, 0, 0}) bw.Close() } -func TestEmptySeconds(t *testing.T) { - _, err := New([]int{}, 100*time.Second) +func TestBadResolution(t *testing.T) { + _, err := New(1*time.Hour, 1) if err == nil { - t.Errorf("got no error wanted one: %v") + t.Fatal("expected an error, got nil") } } -func TestTimeBucketsSize(t *testing.T) { - bw, _ := New([]int{1, 2, 5}, 1*time.Second) - if len(bw.timeBuckets) != 5 { - t.Errorf("buckets slice wrong length: %d, expected %d", len(bw.timeBuckets), 5) - } - bw.Close() -} - -func TestOnes(t *testing.T) { - bw, _ := New([]int{1, 2, 5}, 1*time.Second) - var i int64 - for i = 0; i < bw.max; i++ { - bw.timeBuckets[i] = 1.0 - } - avgs := bw.averages(bw.timeBuckets) - // try a large range of starting points: - for i := 0; i < 10; i++ { - bw.timeI = i - validate(t, avgs, []float64{1.0, 1.0, 1.0}) - } - bw.Close() -} - -func TestManyOnes(t *testing.T) { - bw, _ := New([]int{1, 10, 60}, 1*time.Second) - var i int64 - for i = 0; i < bw.max; i++ { - bw.timeBuckets[i] = 1.0 - } - avgs := bw.averages(bw.timeBuckets) - // try a large range of starting points: - for i := -70; i < 70; i++ { - bw.timeI = i - validate(t, avgs, []float64{1.0, 1.0, 1.0}) - } - bw.Close() -} - -func TestLinear(t *testing.T) { - bw, _ := New([]int{1, 10, 60}, 1*time.Second) - var i int64 - for i = 0; i < bw.max; i++ { - bw.timeBuckets[i] = i - } - avgs := bw.averages(bw.timeBuckets) - validate(t, avgs, []float64{0.0, 4.5, 29.5}) - bw.Close() -} - -func TestInverseLinear(t *testing.T) { - bw, _ := New([]int{1, 10, 60}, 1*time.Second) - var i int64 - for i = 0; i < bw.max; i++ { - bw.timeBuckets[i] = bw.max - i - } - avgs := bw.averages(bw.timeBuckets) - validate(t, avgs, []float64{60.0, 55.5, 30.5}) - bw.Close() -} - -func TestSpecific(t *testing.T) { - bw, _ := New([]int{1, 2, 10}, 1*time.Second) - bw.timeBuckets[8] = 1.0 - var avgs []float64 - - bw.timeI = 8 - avgs = bw.averages(bw.timeBuckets) - validate(t, avgs, []float64{1.0, 0.5, 0.1}) - - bw.timeI = 7 - avgs = bw.averages(bw.timeBuckets) - validate(t, avgs, []float64{0.0, 0.5, 0.1}) - - bw.timeI = 9 - avgs = bw.averages(bw.timeBuckets) - validate(t, avgs, []float64{0.0, 0.0, 0.1}) - bw.Close() -} - func TestWriter(t *testing.T) { t.Parallel() - bw, _ := New([]int{1, 10, 100}, 1*time.Second) - defer bw.Close() + bw, err := New(1*time.Second, 100) + if err != nil { + t.Fatalf("failure to make reasonable BPS: %v", err) + } b := &bytes.Buffer{} b.Write([]byte("helloooooooooooooooooooooooooooooooooooooooooooooo")) - io.Copy(bw, b) - validate(t, bw.Cur(), []float64{50, 5, 0.5}) - time.Sleep(3 * time.Second) - validate(t, bw.Cur(), []float64{0, 5, 0.5}) + for i := 0; i < 90; i++ { + io.Copy(bw, b) + last := bw.Rate() + time.Sleep(10 * time.Millisecond) + if last > bw.Rate() { + t.Errorf("rate should be increasing, it isn't: last: %f > current: %f", last, bw.Rate()) + } + } + time.Sleep(2 * time.Second) + for i := 0; i < 100; i++ { + if bw.Rate() > 0 { + t.Errorf("got high rate: got, want 0.0000", bw.Rate()) + } + time.Sleep(9 * time.Millisecond) + } + bw.Close() } func TestAdd(t *testing.T) { t.Parallel() - bw, _ := New([]int{1, 10, 100}, 1*time.Second) - bw.Add(314) - validate(t, bw.Cur(), []float64{314, 31.4, 3.14}) - time.Sleep(3 * time.Second) - validate(t, bw.Cur(), []float64{0, 31.4, 3.14}) + bw, err := New(1*time.Second, 100) + if err != nil { + t.Fatalf("failure to make reasonable BPS: %v", err) + } + for i := 0; i < 90; i++ { + bw.Add(1024) + last := bw.Rate() + time.Sleep(10 * time.Millisecond) + if last > bw.Rate() { + t.Errorf("rate should be increasing, it isn't: last: %f > current: %f", last, bw.Rate()) + } + } + time.Sleep(2 * time.Second) + for i := 0; i < 100; i++ { + if bw.Rate() > 0 { + t.Errorf("got high rate: got, want 0.0000", bw.Rate()) + } + time.Sleep(9 * time.Millisecond) + } bw.Close() }