further simplified implementation
This commit is contained in:
parent
9c20a7ff0e
commit
a23ebdb44b
107
bandwidth.go
107
bandwidth.go
@ -3,13 +3,16 @@
|
|||||||
package bps
|
package bps
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"sort"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
minResolution = 3
|
||||||
|
)
|
||||||
|
|
||||||
// BPS keeps track of state for byte counts
|
// BPS keeps track of state for byte counts
|
||||||
//
|
//
|
||||||
// Instantiate a BPS then feed bytes either via BPS.Add, or writing to it. When
|
// Instantiate a BPS then feed bytes either via BPS.Add, or writing to it. When
|
||||||
@ -21,46 +24,40 @@ type BPS struct {
|
|||||||
quit chan interface{}
|
quit chan interface{}
|
||||||
closed chan interface{}
|
closed chan interface{}
|
||||||
|
|
||||||
snapshot []float64
|
interval time.Duration
|
||||||
|
dt time.Duration
|
||||||
dt time.Duration
|
|
||||||
dts []int64
|
|
||||||
|
|
||||||
// curBs bytes read for this dt
|
// curBs bytes read for this dt
|
||||||
curBs int64
|
curBs int64
|
||||||
|
|
||||||
// timeBuckets contains an entry for bytes read for each dt of time up to
|
// buckets contains an entry for bytes read for each dt of time in interval
|
||||||
// the longest recoreded time slice.
|
buckets []int64
|
||||||
timeBuckets []int64
|
|
||||||
|
|
||||||
// timeI keys into timeBuckets for the current point in time
|
// timeI keys into buckets for the current point in time
|
||||||
timeI int
|
timeI int
|
||||||
|
|
||||||
// max is defined in New to be the maximum number of temporal buckets
|
|
||||||
// required.
|
|
||||||
max int64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// New Returns a populated and ready to launch BPS. dts is
|
// New Returns a populated and ready to use BPS. interval is the amount of time
|
||||||
// a slice of multiples of dt on which to report (e.g. 1x, 10x, 60x dt). dt is
|
// (for example 60 seconds) over which to track byte flow (bytes/second for the
|
||||||
// also how often the values used to send out in Cur are updated.
|
// last interval), and resolution is used in the following calculation:
|
||||||
func New(dts []int, dt time.Duration) (*BPS, error) {
|
//
|
||||||
if len(dts) < 1 {
|
// dt = interval / resolution (s)
|
||||||
return nil, errors.New("must specify at least one interval lenght")
|
//
|
||||||
|
// where the dt is the temporal resolution of the updates (add or remove
|
||||||
|
// information every dt).
|
||||||
|
func New(interval time.Duration, resolution uint) (*BPS, error) {
|
||||||
|
if resolution < minResolution {
|
||||||
|
return nil, fmt.Errorf("resolution must be larger than %d", minResolution)
|
||||||
}
|
}
|
||||||
sort.Ints(dts)
|
|
||||||
convertedDts := []int64{}
|
dtns := interval.Nanoseconds() / int64(resolution)
|
||||||
for _, dt := range dts {
|
dt := time.Duration(dtns) * time.Nanosecond
|
||||||
convertedDts = append(convertedDts, int64(dt))
|
|
||||||
}
|
|
||||||
max := convertedDts[len(convertedDts)-1]
|
|
||||||
r := &BPS{
|
r := &BPS{
|
||||||
dt: dt,
|
interval: interval,
|
||||||
dts: convertedDts,
|
dt: dt,
|
||||||
quit: make(chan interface{}),
|
quit: make(chan interface{}),
|
||||||
closed: make(chan interface{}),
|
closed: make(chan interface{}),
|
||||||
timeBuckets: make([]int64, max),
|
buckets: make([]int64, resolution),
|
||||||
max: max,
|
|
||||||
}
|
}
|
||||||
go r.run()
|
go r.run()
|
||||||
return r, nil
|
return r, nil
|
||||||
@ -72,20 +69,9 @@ func (b *BPS) runLoop() {
|
|||||||
select {
|
select {
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
b.Lock()
|
b.Lock()
|
||||||
|
b.buckets[b.timeI] = b.curBs
|
||||||
b.timeBuckets[b.timeI] = b.curBs
|
|
||||||
|
|
||||||
b.snapshot = b.averages(b.timeBuckets)
|
|
||||||
|
|
||||||
b.curBs = 0
|
b.curBs = 0
|
||||||
|
b.timeI = (b.timeI + 1) % int(len(b.buckets))
|
||||||
// Here we march forward through time by going backward in our
|
|
||||||
// slice.
|
|
||||||
b.timeI = (b.timeI - 1) % int(b.max)
|
|
||||||
// because modulo does unexpected things for negative numbers.
|
|
||||||
if b.timeI < 0 {
|
|
||||||
b.timeI = b.timeI + int(b.max)
|
|
||||||
}
|
|
||||||
b.Unlock()
|
b.Unlock()
|
||||||
case <-b.quit:
|
case <-b.quit:
|
||||||
return
|
return
|
||||||
@ -100,22 +86,6 @@ func (b *BPS) run() {
|
|||||||
close(b.closed)
|
close(b.closed)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BPS) averages(state []int64) []float64 {
|
|
||||||
r := []float64{}
|
|
||||||
var i int64 = 0
|
|
||||||
var total int64 = 0
|
|
||||||
for _, ti := range b.dts {
|
|
||||||
for ; ; i++ {
|
|
||||||
if i == ti {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
total += state[(int64(b.timeI)+i)%b.max]
|
|
||||||
}
|
|
||||||
r = append(r, float64(total)/float64(ti))
|
|
||||||
}
|
|
||||||
return r
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write implements io.Writer so that one can simply write bytes to the struct.
|
// Write implements io.Writer so that one can simply write bytes to the struct.
|
||||||
func (b *BPS) Write(p []byte) (int, error) {
|
func (b *BPS) Write(p []byte) (int, error) {
|
||||||
n, err := ioutil.Discard.Write(p)
|
n, err := ioutil.Discard.Write(p)
|
||||||
@ -127,20 +97,19 @@ func (b *BPS) Write(p []byte) (int, error) {
|
|||||||
func (b *BPS) Add(i int64) {
|
func (b *BPS) Add(i int64) {
|
||||||
b.Lock()
|
b.Lock()
|
||||||
b.curBs += i
|
b.curBs += i
|
||||||
b.timeBuckets[b.timeI] = b.curBs
|
b.buckets[b.timeI] = b.curBs
|
||||||
b.snapshot = b.averages(b.timeBuckets)
|
|
||||||
b.Unlock()
|
b.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cur returns a slice containing the currenly tracked rates.
|
// Rate returns the current rate (bytes / second).
|
||||||
func (b *BPS) Cur() []float64 {
|
func (b *BPS) Rate() float64 {
|
||||||
r := make([]float64, len(b.dts))
|
|
||||||
b.Lock()
|
b.Lock()
|
||||||
for i := range b.snapshot {
|
var total int64 = 0
|
||||||
r[i] = b.snapshot[i]
|
for _, b := range b.buckets {
|
||||||
|
total += b
|
||||||
}
|
}
|
||||||
b.Unlock()
|
b.Unlock()
|
||||||
return r
|
return float64(total) / b.interval.Seconds()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close cleans up and shuts down a BPS.
|
// Close cleans up and shuts down a BPS.
|
||||||
|
@ -12,131 +12,67 @@ func init() {
|
|||||||
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
|
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
|
||||||
}
|
}
|
||||||
|
|
||||||
func validate(t *testing.T, actual, expected []float64) {
|
func TestSimple(t *testing.T) {
|
||||||
if len(actual) != len(expected) {
|
bw, err := New(1*time.Second, 100)
|
||||||
t.Errorf("len is not same: %d expected %d", len(actual), len(expected))
|
|
||||||
}
|
|
||||||
for i, _ := range actual {
|
|
||||||
if actual[i] != expected[i] {
|
|
||||||
t.Errorf("%dth: got %f expected %f", i, actual[i], expected[i])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestEmpty(t *testing.T) {
|
|
||||||
bw, err := New([]int{1, 10, 60}, 100*time.Second)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Fatalf("failure to make reasonable BPS: %v", err)
|
||||||
}
|
}
|
||||||
bw.timeBuckets = []int64{1, 10, 60}
|
|
||||||
validate(t, bw.Cur(), []float64{0, 0, 0})
|
|
||||||
bw.Close()
|
bw.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEmptySeconds(t *testing.T) {
|
func TestBadResolution(t *testing.T) {
|
||||||
_, err := New([]int{}, 100*time.Second)
|
_, err := New(1*time.Hour, 1)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("got no error wanted one: %v")
|
t.Fatal("expected an error, got nil")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTimeBucketsSize(t *testing.T) {
|
|
||||||
bw, _ := New([]int{1, 2, 5}, 1*time.Second)
|
|
||||||
if len(bw.timeBuckets) != 5 {
|
|
||||||
t.Errorf("buckets slice wrong length: %d, expected %d", len(bw.timeBuckets), 5)
|
|
||||||
}
|
|
||||||
bw.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOnes(t *testing.T) {
|
|
||||||
bw, _ := New([]int{1, 2, 5}, 1*time.Second)
|
|
||||||
var i int64
|
|
||||||
for i = 0; i < bw.max; i++ {
|
|
||||||
bw.timeBuckets[i] = 1.0
|
|
||||||
}
|
|
||||||
avgs := bw.averages(bw.timeBuckets)
|
|
||||||
// try a large range of starting points:
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
bw.timeI = i
|
|
||||||
validate(t, avgs, []float64{1.0, 1.0, 1.0})
|
|
||||||
}
|
|
||||||
bw.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestManyOnes(t *testing.T) {
|
|
||||||
bw, _ := New([]int{1, 10, 60}, 1*time.Second)
|
|
||||||
var i int64
|
|
||||||
for i = 0; i < bw.max; i++ {
|
|
||||||
bw.timeBuckets[i] = 1.0
|
|
||||||
}
|
|
||||||
avgs := bw.averages(bw.timeBuckets)
|
|
||||||
// try a large range of starting points:
|
|
||||||
for i := -70; i < 70; i++ {
|
|
||||||
bw.timeI = i
|
|
||||||
validate(t, avgs, []float64{1.0, 1.0, 1.0})
|
|
||||||
}
|
|
||||||
bw.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestLinear(t *testing.T) {
|
|
||||||
bw, _ := New([]int{1, 10, 60}, 1*time.Second)
|
|
||||||
var i int64
|
|
||||||
for i = 0; i < bw.max; i++ {
|
|
||||||
bw.timeBuckets[i] = i
|
|
||||||
}
|
|
||||||
avgs := bw.averages(bw.timeBuckets)
|
|
||||||
validate(t, avgs, []float64{0.0, 4.5, 29.5})
|
|
||||||
bw.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestInverseLinear(t *testing.T) {
|
|
||||||
bw, _ := New([]int{1, 10, 60}, 1*time.Second)
|
|
||||||
var i int64
|
|
||||||
for i = 0; i < bw.max; i++ {
|
|
||||||
bw.timeBuckets[i] = bw.max - i
|
|
||||||
}
|
|
||||||
avgs := bw.averages(bw.timeBuckets)
|
|
||||||
validate(t, avgs, []float64{60.0, 55.5, 30.5})
|
|
||||||
bw.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSpecific(t *testing.T) {
|
|
||||||
bw, _ := New([]int{1, 2, 10}, 1*time.Second)
|
|
||||||
bw.timeBuckets[8] = 1.0
|
|
||||||
var avgs []float64
|
|
||||||
|
|
||||||
bw.timeI = 8
|
|
||||||
avgs = bw.averages(bw.timeBuckets)
|
|
||||||
validate(t, avgs, []float64{1.0, 0.5, 0.1})
|
|
||||||
|
|
||||||
bw.timeI = 7
|
|
||||||
avgs = bw.averages(bw.timeBuckets)
|
|
||||||
validate(t, avgs, []float64{0.0, 0.5, 0.1})
|
|
||||||
|
|
||||||
bw.timeI = 9
|
|
||||||
avgs = bw.averages(bw.timeBuckets)
|
|
||||||
validate(t, avgs, []float64{0.0, 0.0, 0.1})
|
|
||||||
bw.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWriter(t *testing.T) {
|
func TestWriter(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
bw, _ := New([]int{1, 10, 100}, 1*time.Second)
|
bw, err := New(1*time.Second, 100)
|
||||||
defer bw.Close()
|
if err != nil {
|
||||||
|
t.Fatalf("failure to make reasonable BPS: %v", err)
|
||||||
|
}
|
||||||
b := &bytes.Buffer{}
|
b := &bytes.Buffer{}
|
||||||
b.Write([]byte("helloooooooooooooooooooooooooooooooooooooooooooooo"))
|
b.Write([]byte("helloooooooooooooooooooooooooooooooooooooooooooooo"))
|
||||||
io.Copy(bw, b)
|
for i := 0; i < 90; i++ {
|
||||||
validate(t, bw.Cur(), []float64{50, 5, 0.5})
|
io.Copy(bw, b)
|
||||||
time.Sleep(3 * time.Second)
|
last := bw.Rate()
|
||||||
validate(t, bw.Cur(), []float64{0, 5, 0.5})
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
if last > bw.Rate() {
|
||||||
|
t.Errorf("rate should be increasing, it isn't: last: %f > current: %f", last, bw.Rate())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
if bw.Rate() > 0 {
|
||||||
|
t.Errorf("got high rate: got, want 0.0000", bw.Rate())
|
||||||
|
}
|
||||||
|
time.Sleep(9 * time.Millisecond)
|
||||||
|
}
|
||||||
|
bw.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAdd(t *testing.T) {
|
func TestAdd(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
bw, _ := New([]int{1, 10, 100}, 1*time.Second)
|
bw, err := New(1*time.Second, 100)
|
||||||
bw.Add(314)
|
if err != nil {
|
||||||
validate(t, bw.Cur(), []float64{314, 31.4, 3.14})
|
t.Fatalf("failure to make reasonable BPS: %v", err)
|
||||||
time.Sleep(3 * time.Second)
|
}
|
||||||
validate(t, bw.Cur(), []float64{0, 31.4, 3.14})
|
for i := 0; i < 90; i++ {
|
||||||
|
bw.Add(1024)
|
||||||
|
last := bw.Rate()
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
if last > bw.Rate() {
|
||||||
|
t.Errorf("rate should be increasing, it isn't: last: %f > current: %f", last, bw.Rate())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
if bw.Rate() > 0 {
|
||||||
|
t.Errorf("got high rate: got, want 0.0000", bw.Rate())
|
||||||
|
}
|
||||||
|
time.Sleep(9 * time.Millisecond)
|
||||||
|
}
|
||||||
bw.Close()
|
bw.Close()
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user