// Package bandwidth is a simple tool for keeping track of transmitted and // received counts (ostensibly bytes). package bandwidth import ( "errors" "sort" "time" ) // Bandwidth keeps track of state for Rx and Tx (byte) counts. Instantiate // a Bandwidth then feed values using the AddRx and AddTx channels. When the // accumulated values are needed, simply read from the Rx or Tx chans. When the // Bandwidth is no loner needed close the Quit chan. type Bandwidth struct { // write number of bytes to us AddRx chan int AddTx chan int // read stats from us Rx chan []float64 Tx chan []float64 Quit chan interface{} rxSnap []float64 txSnap []float64 dt time.Duration dts []int curRx int curTx int rxstream []int txstream []int timeI int max int } // NewBandwidth Returns a populated and ready to launch Bandwidth. seconds is // a slice of multiples of dt on which to report (e.g. 1x, 10x, 60x dt). dt is // also how often the values used to send to Rx and Tx are updated. func NewBandwidth(dts []int, dt time.Duration) (*Bandwidth, error) { if len(dts) < 1 { return nil, errors.New("must specify at least one interval lenght") } sort.Ints(dts) max := dts[len(dts)-1] r := &Bandwidth{ AddRx: make(chan int, 1024), AddTx: make(chan int, 1024), Rx: make(chan []float64), Tx: make(chan []float64), dt: dt, dts: dts, Quit: make(chan interface{}), rxstream: make([]int, max), txstream: make([]int, max), max: max, } return r, nil } // Run is a method of Bandwidth that must be started in a goroutine in order // for things to be functional. func (bw *Bandwidth) Run() { t := time.NewTicker(bw.dt) outer: for { select { case <-t.C: bw.rxstream[bw.timeI] = bw.curRx bw.txstream[bw.timeI] = bw.curTx bw.rxSnap = bw.averages(bw.rxstream) bw.txSnap = bw.averages(bw.txstream) bw.curTx = 0 bw.curRx = 0 // n.b.: here we march forward through time by going backward in // our slice. bw.timeI = (bw.timeI - 1) % bw.max // lol: because modulo does unexpected things for negative numbers. if bw.timeI < 0 { bw.timeI = bw.timeI + bw.max } // log.Printf("%d %+v", bw.timeI, bw.rxstream) case bw.Rx <- bw.rxSnap: case bw.Tx <- bw.txSnap: case s := <-bw.AddRx: bw.curRx += s case s := <-bw.AddTx: bw.curTx += s case <-bw.Quit: break outer } } close(bw.AddRx) close(bw.AddTx) close(bw.Rx) close(bw.Tx) } func (bw *Bandwidth) averages(state []int) []float64 { r := []float64{} var i int = 0 total := 0 for _, ti := range bw.dts { for ; ; i++ { if i == ti { break } total += state[(bw.timeI+i)%bw.max] } r = append(r, float64(total)/float64(ti)) } return r }