122 lines
2.8 KiB
Go
122 lines
2.8 KiB
Go
// Package bandwidth is a simple tool for keeping track of transmitted and
|
|
// received counts (ostensibly bytes).
|
|
package bandwidth
|
|
|
|
import (
|
|
"errors"
|
|
"sort"
|
|
"time"
|
|
)
|
|
|
|
// Bandwidth keeps track of state for Rx and Tx (byte) counts. Instantiate
|
|
// a Bandwidth then feed values using the AddRx and AddTx channels. When the
|
|
// accumulated values are needed, simply read from the Rx or Tx chans. When the
|
|
// Bandwidth is no loner needed close the Quit chan.
|
|
// At this time it uses container/list and leaks memory like a sieve.
|
|
type Bandwidth struct {
|
|
// write number of bytes to us
|
|
AddRx chan int
|
|
AddTx chan int
|
|
|
|
// read stats from us
|
|
Rx chan []float64
|
|
Tx chan []float64
|
|
|
|
Quit chan interface{}
|
|
|
|
rxSnap []float64
|
|
txSnap []float64
|
|
dt time.Duration
|
|
dts []int
|
|
|
|
curRx int
|
|
curTx int
|
|
|
|
rxstream []int
|
|
txstream []int
|
|
|
|
timeI int
|
|
max int
|
|
}
|
|
|
|
// NewBandwidth Returns a populated and ready to launch Bandwidth. seconds is
|
|
// a slice of multiples of dt on which to report (e.g. 1x, 10x, 60x dt). dt is
|
|
// also how often the values used to send to Rx and Tx are updated.
|
|
func NewBandwidth(dts []int, dt time.Duration) (*Bandwidth, error) {
|
|
if len(dts) < 1 {
|
|
return nil, errors.New("must specify at least one interval lenght")
|
|
}
|
|
sort.Ints(dts)
|
|
max := dts[len(dts)-1]
|
|
r := &Bandwidth{
|
|
AddRx: make(chan int, 1024),
|
|
AddTx: make(chan int, 1024),
|
|
Rx: make(chan []float64),
|
|
Tx: make(chan []float64),
|
|
dt: dt,
|
|
dts: dts,
|
|
Quit: make(chan interface{}),
|
|
rxstream: make([]int, max),
|
|
txstream: make([]int, max),
|
|
max: max,
|
|
}
|
|
return r, nil
|
|
}
|
|
|
|
// Run is a method of Bandwidth that must be started in a goroutine in order
|
|
// for things to be functional.
|
|
func (bw *Bandwidth) Run() {
|
|
t := time.NewTicker(bw.dt)
|
|
outer:
|
|
for {
|
|
select {
|
|
case <-t.C:
|
|
bw.rxstream[bw.timeI] = bw.curRx
|
|
bw.txstream[bw.timeI] = bw.curTx
|
|
|
|
bw.rxSnap = bw.averages(bw.rxstream)
|
|
bw.txSnap = bw.averages(bw.txstream)
|
|
|
|
bw.curTx = 0
|
|
bw.curRx = 0
|
|
|
|
// n.b.: here we march forward through time by going backward in
|
|
// our slice.
|
|
bw.timeI = (bw.timeI - 1) % bw.max
|
|
// lol: because modulo does unexpected things for negative numbers.
|
|
if bw.timeI < 0 {
|
|
bw.timeI = bw.timeI + bw.max
|
|
}
|
|
// log.Printf("%d %+v", bw.timeI, bw.rxstream)
|
|
case bw.Rx <- bw.rxSnap:
|
|
case bw.Tx <- bw.txSnap:
|
|
case s := <-bw.AddRx:
|
|
bw.curRx += s
|
|
case s := <-bw.AddTx:
|
|
bw.curTx += s
|
|
case <-bw.Quit:
|
|
break outer
|
|
}
|
|
}
|
|
close(bw.AddRx)
|
|
close(bw.AddTx)
|
|
close(bw.Rx)
|
|
close(bw.Tx)
|
|
}
|
|
|
|
func (bw *Bandwidth) averages(state []int) []float64 {
|
|
r := []float64{}
|
|
var i int = 0
|
|
total := 0
|
|
for _, ti := range bw.dts {
|
|
for ; ; i++ {
|
|
if i == ti {
|
|
break
|
|
}
|
|
total += state[(bw.timeI+i)%bw.max]
|
|
}
|
|
r = append(r, float64(total)/float64(ti))
|
|
}
|
|
return r
|
|
}
|