Started down a different path; using a circular buffer instead.
After talks on #go-nuts, it seems as though I should look into using something like the following: https://github.com/cespare/goproc/tree/master/nwstat And this makes sense. I have trimmed out the old tests and rearranged some code here.
This commit is contained in:
parent
c33a5561bb
commit
bb8ce3c1ea
124
bandwidth.go
124
bandwidth.go
@ -3,9 +3,9 @@
|
|||||||
package bandwidth
|
package bandwidth
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/list"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"log"
|
||||||
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -27,17 +27,26 @@ type Bandwidth struct {
|
|||||||
rxSnap []float64
|
rxSnap []float64
|
||||||
txSnap []float64
|
txSnap []float64
|
||||||
dt time.Duration
|
dt time.Duration
|
||||||
rxstream *stream
|
|
||||||
txstream *stream
|
curRx int
|
||||||
|
curTx int
|
||||||
|
|
||||||
|
rxstream []int
|
||||||
|
txstream []int
|
||||||
|
|
||||||
|
timeI int
|
||||||
|
max int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBandwidth Returns a populated and ready to launch Bandwidth. seconds is
|
// NewBandwidth Returns a populated and ready to launch Bandwidth. seconds is
|
||||||
// a slice of seconds on which to report (e.g. 1, 10, 60 seconds). dt is how
|
// a slice of seconds on which to report (e.g. 1, 10, 60 seconds). dt is how
|
||||||
// often the values used to send to Rx and Tx are updated.
|
// often the values used to send to Rx and Tx are updated.
|
||||||
func NewBandwidth(seconds []int, dt time.Duration) (*Bandwidth, error) {
|
func NewBandwidth(dts []int, dt time.Duration) (*Bandwidth, error) {
|
||||||
if len(seconds) < 1 {
|
if len(dts) < 1 {
|
||||||
return nil, errors.New("must specify at least one interval lenght")
|
return nil, errors.New("must specify at least one interval lenght")
|
||||||
}
|
}
|
||||||
|
sort.Ints(dts)
|
||||||
|
max := dts[len(dts)-1]
|
||||||
r := &Bandwidth{
|
r := &Bandwidth{
|
||||||
AddRx: make(chan int, 1024),
|
AddRx: make(chan int, 1024),
|
||||||
AddTx: make(chan int, 1024),
|
AddTx: make(chan int, 1024),
|
||||||
@ -45,8 +54,9 @@ func NewBandwidth(seconds []int, dt time.Duration) (*Bandwidth, error) {
|
|||||||
Tx: make(chan []float64),
|
Tx: make(chan []float64),
|
||||||
dt: dt,
|
dt: dt,
|
||||||
Quit: make(chan interface{}),
|
Quit: make(chan interface{}),
|
||||||
rxstream: newStream(seconds),
|
rxstream: make([]int, max),
|
||||||
txstream: newStream(seconds),
|
txstream: make([]int, max),
|
||||||
|
max: max,
|
||||||
}
|
}
|
||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
@ -59,14 +69,17 @@ outer:
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
bw.rxSnap = bw.rxstream.averages()
|
bw.rxSnap = bw.averages(bw.rxstream)
|
||||||
bw.txSnap = bw.txstream.averages()
|
bw.txSnap = bw.averages(bw.txstream)
|
||||||
|
bw.curTx = 0
|
||||||
|
bw.curRx = 0
|
||||||
|
bw.timeI += 1
|
||||||
case bw.Rx <- bw.rxSnap:
|
case bw.Rx <- bw.rxSnap:
|
||||||
case bw.Tx <- bw.txSnap:
|
case bw.Tx <- bw.txSnap:
|
||||||
case s := <-bw.AddRx:
|
case s := <-bw.AddRx:
|
||||||
bw.rxstream.add(datum{float64(s), time.Now()})
|
bw.curTx += s
|
||||||
case s := <-bw.AddTx:
|
case s := <-bw.AddTx:
|
||||||
bw.txstream.add(datum{float64(s), time.Now()})
|
bw.curRx += s
|
||||||
case <-bw.Quit:
|
case <-bw.Quit:
|
||||||
break outer
|
break outer
|
||||||
}
|
}
|
||||||
@ -77,86 +90,9 @@ outer:
|
|||||||
close(bw.Tx)
|
close(bw.Tx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// datum stores both a piece of info, plus the time at which that info was
|
func (bw *Bandwidth) averages(state []int) []float64 {
|
||||||
// collected.
|
for i := 0; i < bw.max; i++ {
|
||||||
type datum struct {
|
log.Println(bw.timeI + i)
|
||||||
value float64
|
}
|
||||||
at time.Time
|
return nil
|
||||||
}
|
|
||||||
|
|
||||||
func (d datum) String() string {
|
|
||||||
r := struct {
|
|
||||||
value string
|
|
||||||
at string
|
|
||||||
}{
|
|
||||||
value: fmt.Sprintf("%04f", d.value),
|
|
||||||
at: fmt.Sprintf("%s", d.at),
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("%+v", r)
|
|
||||||
}
|
|
||||||
|
|
||||||
// stream manages a list of datum, and prunes itself on add
|
|
||||||
type stream struct {
|
|
||||||
data *list.List
|
|
||||||
extents []time.Duration
|
|
||||||
max time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
func newStream(seconds []int) *stream {
|
|
||||||
extents := []time.Duration{}
|
|
||||||
for _, s := range seconds {
|
|
||||||
extents = append(extents, time.Duration(-s)*time.Second)
|
|
||||||
|
|
||||||
}
|
|
||||||
max := extents[len(extents)-1]
|
|
||||||
return &stream{
|
|
||||||
data: list.New(),
|
|
||||||
extents: extents,
|
|
||||||
max: max,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *stream) add(v datum) {
|
|
||||||
s.data.PushBack(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *stream) averages() []float64 {
|
|
||||||
var limit time.Time
|
|
||||||
total := 0.0
|
|
||||||
totals := []float64{}
|
|
||||||
|
|
||||||
if s.data.Back() == nil {
|
|
||||||
return totals
|
|
||||||
}
|
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
cutoff := now.Add(s.max)
|
|
||||||
for e := s.data.Front(); e != nil; e = e.Next() {
|
|
||||||
cur := e.Value.(datum).at
|
|
||||||
if cur.Before(cutoff) {
|
|
||||||
s.data.Remove(e)
|
|
||||||
} else {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
e := s.data.Back()
|
|
||||||
|
|
||||||
for _, extent := range s.extents {
|
|
||||||
limit = now.Add(extent)
|
|
||||||
for ; e != nil; e = e.Prev() {
|
|
||||||
if e.Prev() == nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
next := e.Prev().Value.(datum)
|
|
||||||
if next.at.Before(limit) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
cur := e.Value.(datum)
|
|
||||||
total += cur.value
|
|
||||||
}
|
|
||||||
totals = append(totals, total/float64(-int(extent/time.Second)))
|
|
||||||
}
|
|
||||||
|
|
||||||
return totals
|
|
||||||
}
|
}
|
||||||
|
@ -10,35 +10,6 @@ func init() {
|
|||||||
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
|
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
|
||||||
}
|
}
|
||||||
|
|
||||||
func general(t *testing.T) {
|
|
||||||
bw, err := NewBandwidth([]int{1, 10, 60}, 100*time.Millisecond)
|
|
||||||
if err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
go bw.Run()
|
|
||||||
bw.AddRx <- 10
|
|
||||||
bw.AddTx <- 10
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
log.Printf("%+v", <-bw.Rx)
|
|
||||||
log.Printf("%+v", <-bw.Tx)
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
bw.AddRx <- (10 * i)
|
|
||||||
bw.AddRx <- (10 * i)
|
|
||||||
time.Sleep(500 * time.Millisecond)
|
|
||||||
log.Printf("%+v", <-bw.Rx)
|
|
||||||
log.Printf("%+v", <-bw.Tx)
|
|
||||||
}
|
|
||||||
log.Printf("%+v", <-bw.Rx)
|
|
||||||
log.Printf("%+v", <-bw.Tx)
|
|
||||||
time.Sleep(10 * time.Second)
|
|
||||||
log.Printf("%+v", <-bw.Rx)
|
|
||||||
log.Printf("%+v", <-bw.Tx)
|
|
||||||
close(bw.Quit)
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
log.Printf("%+v", <-bw.Rx)
|
|
||||||
log.Printf("%+v", <-bw.Tx)
|
|
||||||
}
|
|
||||||
|
|
||||||
func validate(t *testing.T, actual, expected []float64) {
|
func validate(t *testing.T, actual, expected []float64) {
|
||||||
if len(actual) != len(expected) {
|
if len(actual) != len(expected) {
|
||||||
t.Errorf("len is not same: %d expected %d", len(actual), len(expected))
|
t.Errorf("len is not same: %d expected %d", len(actual), len(expected))
|
||||||
@ -50,64 +21,14 @@ func validate(t *testing.T, actual, expected []float64) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestOncePerSecond(t *testing.T) {
|
|
||||||
bw, err := NewBandwidth([]int{1, 10, 60}, 100*time.Millisecond)
|
|
||||||
if err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
bw.rxstream = newStream([]int{1, 10, 60})
|
|
||||||
var i int64 = 0
|
|
||||||
for ; i < 1000; i++ {
|
|
||||||
d := datum{
|
|
||||||
value: 1.0,
|
|
||||||
at: time.Unix(1234567890+i, 0),
|
|
||||||
}
|
|
||||||
bw.rxstream.add(d)
|
|
||||||
}
|
|
||||||
validate(t, bw.rxstream.averages(), []float64{1.0, 1.0, 1.0})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestOneOverManySeconds(t *testing.T) {
|
|
||||||
bw, err := NewBandwidth([]int{1, 10, 60}, 100*time.Millisecond)
|
|
||||||
if err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
bw.rxstream = newStream([]int{1, 10, 60})
|
|
||||||
var i int64 = 0
|
|
||||||
for ; i < 1000; i++ {
|
|
||||||
d := datum{
|
|
||||||
value: 1.0,
|
|
||||||
at: time.Unix(1234567890+i*2, 0),
|
|
||||||
}
|
|
||||||
bw.rxstream.add(d)
|
|
||||||
}
|
|
||||||
validate(t, bw.rxstream.averages(), []float64{0.0, 0.5, 0.5})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestManyPerSecond(t *testing.T) {
|
|
||||||
bw, err := NewBandwidth([]int{1, 10, 60}, 100*time.Millisecond)
|
|
||||||
if err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
bw.rxstream = newStream([]int{1, 10, 60})
|
|
||||||
var i int64 = 0
|
|
||||||
for ; i < 10000; i++ {
|
|
||||||
d := datum{
|
|
||||||
value: 1.0,
|
|
||||||
at: time.Unix(1234567890, i*10000000),
|
|
||||||
}
|
|
||||||
bw.rxstream.add(d)
|
|
||||||
}
|
|
||||||
validate(t, bw.rxstream.averages(), []float64{100.0, 100.0, 100.0})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestEmpty(t *testing.T) {
|
func TestEmpty(t *testing.T) {
|
||||||
bw, err := NewBandwidth([]int{1, 10, 60}, 100*time.Millisecond)
|
bw, err := NewBandwidth([]int{1, 10, 60}, 100*time.Millisecond)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
bw.rxstream = newStream([]int{1, 10, 60})
|
go bw.Run()
|
||||||
validate(t, bw.rxstream.averages(), []float64{})
|
bw.rxstream = []int{1, 10, 60}
|
||||||
|
validate(t, <-bw.Rx, []float64{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEmptySeconds(t *testing.T) {
|
func TestEmptySeconds(t *testing.T) {
|
||||||
@ -116,3 +37,8 @@ func TestEmptySeconds(t *testing.T) {
|
|||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestA(t *testing.T) {
|
||||||
|
bw, _ := NewBandwidth([]int{1, 10, 30}, 1*time.Second)
|
||||||
|
log.Printf("%+v", bw)
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user