// Package bandwidth is a simple tool for keeping track of transmitted and // received counts (ostensibly bytes). package bandwidth import ( "container/list" "errors" "fmt" "time" ) // Bandwidth keeps track of state for Rx and Tx (byte) counts. Instantiate // a Bandwidth then feed values using the AddRx and AddTx channels. When the // accumulated values are needed, simply read from the Rx or Tx chans. When the // Bandwidth is no loner needed close the Quit chan. type Bandwidth struct { // write number of bytes to us AddRx chan int AddTx chan int // read stats from us Rx chan []float64 Tx chan []float64 Quit chan interface{} rxSnap []float64 txSnap []float64 dt time.Duration rxstream *stream txstream *stream } // NewBandwidth Returns a populated and ready to launch Bandwidth. seconds is // a slice of seconds on which to report (e.g. 1, 10, 60 seconds). dt is how // often the values used to send to Rx and Tx are updated. func NewBandwidth(seconds []int, dt time.Duration) (*Bandwidth, error) { if len(seconds) < 1 { return nil, errors.New("must specify at least one interval lenght") } r := &Bandwidth{ AddRx: make(chan int, 1024), AddTx: make(chan int, 1024), Rx: make(chan []float64), Tx: make(chan []float64), dt: dt, Quit: make(chan interface{}), rxstream: newStream(seconds), txstream: newStream(seconds), } return r, nil } // Run is a method of Bandwidth that must be started in a goroutine in order // for things to be functional. func (bw *Bandwidth) Run() { t := time.NewTicker(bw.dt) outer: for { select { case <-t.C: bw.rxSnap = bw.rxstream.averages() bw.txSnap = bw.txstream.averages() case bw.Rx <- bw.rxSnap: case bw.Tx <- bw.txSnap: case s := <-bw.AddRx: bw.rxstream.add(datum{float64(s), time.Now()}) case s := <-bw.AddTx: bw.txstream.add(datum{float64(s), time.Now()}) case <-bw.Quit: break outer } } close(bw.AddRx) close(bw.AddTx) close(bw.Rx) close(bw.Tx) } // datum stores both a piece of info, plus the time at which that info was // collected. type datum struct { value float64 at time.Time } func (d datum) String() string { r := struct { value string at string }{ value: fmt.Sprintf("%04f", d.value), at: fmt.Sprintf("%s", d.at), } return fmt.Sprintf("%+v", r) } // stream manages a list of datum, and prunes itself on add type stream struct { data *list.List extents []time.Duration max time.Duration } func newStream(seconds []int) *stream { extents := []time.Duration{} for _, s := range seconds { extents = append(extents, time.Duration(-s)*time.Second) } max := extents[len(extents)-1] return &stream{ data: list.New(), extents: extents, max: max, } } func (s *stream) add(v datum) { s.data.PushBack(v) } func (s *stream) averages() []float64 { var limit time.Time total := 0.0 totals := []float64{} if s.data.Back() == nil { return totals } now := time.Now() cutoff := now.Add(s.max) for e := s.data.Front(); e != nil; e = e.Next() { cur := e.Value.(datum).at if cur.Before(cutoff) { s.data.Remove(e) } else { break } } e := s.data.Back() for _, extent := range s.extents { limit = now.Add(extent) for ; e != nil; e = e.Prev() { if e.Prev() == nil { break } next := e.Prev().Value.(datum) if next.at.Before(limit) { break } cur := e.Value.(datum) total += cur.value } totals = append(totals, total/float64(-int(extent/time.Second))) } return totals }