bps/bandwidth.go

122 lines
2.8 KiB
Go

// Package bandwidth is a simple tool for keeping track of transmitted and
// received counts (ostensibly bytes).
package bandwidth
import (
"errors"
"sort"
"time"
)
// Bandwidth keeps track of state for Rx and Tx (byte) counts. Instantiate
// a Bandwidth then feed values using the AddRx and AddTx channels. When the
// accumulated values are needed, simply read from the Rx or Tx chans. When the
// Bandwidth is no loner needed close the Quit chan.
// At this time it uses container/list and leaks memory like a sieve.
type Bandwidth struct {
// write number of bytes to us
AddRx chan int
AddTx chan int
// read stats from us
Rx chan []float64
Tx chan []float64
Quit chan interface{}
rxSnap []float64
txSnap []float64
dt time.Duration
dts []int
curRx int
curTx int
rxstream []int
txstream []int
timeI int
max int
}
// NewBandwidth Returns a populated and ready to launch Bandwidth. seconds is
// a slice of multiples of dt on which to report (e.g. 1x, 10x, 60x dt). dt is
// also how often the values used to send to Rx and Tx are updated.
func NewBandwidth(dts []int, dt time.Duration) (*Bandwidth, error) {
if len(dts) < 1 {
return nil, errors.New("must specify at least one interval lenght")
}
sort.Ints(dts)
max := dts[len(dts)-1]
r := &Bandwidth{
AddRx: make(chan int, 1024),
AddTx: make(chan int, 1024),
Rx: make(chan []float64),
Tx: make(chan []float64),
dt: dt,
dts: dts,
Quit: make(chan interface{}),
rxstream: make([]int, max),
txstream: make([]int, max),
max: max,
}
return r, nil
}
// Run is a method of Bandwidth that must be started in a goroutine in order
// for things to be functional.
func (bw *Bandwidth) Run() {
t := time.NewTicker(bw.dt)
outer:
for {
select {
case <-t.C:
bw.rxstream[bw.timeI] = bw.curRx
bw.txstream[bw.timeI] = bw.curTx
bw.rxSnap = bw.averages(bw.rxstream)
bw.txSnap = bw.averages(bw.txstream)
bw.curTx = 0
bw.curRx = 0
// n.b.: here we march forward through time by going backward in
// our slice.
bw.timeI = (bw.timeI - 1) % bw.max
// lol: because modulo does unexpected things for negative numbers.
if bw.timeI < 0 {
bw.timeI = bw.timeI + bw.max
}
// log.Printf("%d %+v", bw.timeI, bw.rxstream)
case bw.Rx <- bw.rxSnap:
case bw.Tx <- bw.txSnap:
case s := <-bw.AddRx:
bw.curRx += s
case s := <-bw.AddTx:
bw.curTx += s
case <-bw.Quit:
break outer
}
}
close(bw.AddRx)
close(bw.AddTx)
close(bw.Rx)
close(bw.Tx)
}
func (bw *Bandwidth) averages(state []int) []float64 {
r := []float64{}
var i int = 0
total := 0
for _, ti := range bw.dts {
for ; ; i++ {
if i == ti {
break
}
total += state[(bw.timeI+i)%bw.max]
}
r = append(r, float64(total)/float64(ti))
}
return r
}