Added docs

This commit is contained in:
Stephen McQuay 2014-03-02 23:03:00 -08:00
parent ae82433a8f
commit adbdc7abe6
2 changed files with 72 additions and 53 deletions

View File

@ -1,3 +1,5 @@
// Package bandwidth is a simple tool for keeping track of transmitted and
// received counts (ostensibly bytes).
package bandwidth package bandwidth
import ( import (
@ -7,6 +9,74 @@ import (
"time" "time"
) )
// Bandwidth keeps track of state for Rx and Tx (byte) counts. Instantiate
// a Bandwidth then feed values using the AddRx and AddTx channels. When the
// accumulated values are needed, simply read from the Rx or Tx chans. When the
// Bandwidth is no loner needed close the Quit chan.
type Bandwidth struct {
// write number of bytes to us
AddRx chan int
AddTx chan int
// read stats from us
Rx chan []float64
Tx chan []float64
Quit chan interface{}
rxSnap []float64
txSnap []float64
dt time.Duration
rxstream *stream
txstream *stream
}
// NewBandwidth Returns a populated and ready to launch Bandwidth. seconds is
// a slice of seconds on which to report (e.g. 1, 10, 60 seconds). dt is how
// often the values used to send to Rx and Tx are updated.
func NewBandwidth(seconds []int, dt time.Duration) (*Bandwidth, error) {
if len(seconds) < 1 {
return nil, errors.New("must specify at least one interval lenght")
}
r := &Bandwidth{
AddRx: make(chan int, 1024),
AddTx: make(chan int, 1024),
Rx: make(chan []float64),
Tx: make(chan []float64),
dt: dt,
Quit: make(chan interface{}),
rxstream: newStream(seconds),
txstream: newStream(seconds),
}
return r, nil
}
// Run is a method of Bandwidth that must be started in a goroutine in order
// for things to be functional.
func (bw *Bandwidth) Run() {
t := time.NewTicker(bw.dt)
outer:
for {
select {
case <-t.C:
bw.rxSnap = bw.rxstream.averages()
bw.txSnap = bw.txstream.averages()
case bw.Rx <- bw.rxSnap:
case bw.Tx <- bw.txSnap:
case s := <-bw.AddRx:
bw.rxstream.add(datum{float64(s), time.Now()})
case s := <-bw.AddTx:
bw.txstream.add(datum{float64(s), time.Now()})
case <-bw.Quit:
break outer
}
}
close(bw.AddRx)
close(bw.AddTx)
}
// datum stores both a piece of info, plus the time at which that info was
// collected.
type datum struct { type datum struct {
value float64 value float64
at time.Time at time.Time
@ -23,6 +93,7 @@ func (d datum) String() string {
return fmt.Sprintf("%+v", r) return fmt.Sprintf("%+v", r)
} }
// stream manages a list of datum, and prunes itself on add
type stream struct { type stream struct {
data *list.List data *list.List
extents []time.Duration extents []time.Duration
@ -88,55 +159,3 @@ func (s *stream) averages() []float64 {
return totals return totals
} }
type Bandwidth struct {
AddRx chan int
AddTx chan int
Rx chan []float64
Tx chan []float64
rxSnap []float64
txSnap []float64
dt time.Duration
rxstream *stream
txstream *stream
Quit chan interface{}
}
func NewBandwidth(seconds []int, dt time.Duration) (*Bandwidth, error) {
if len(seconds) < 1 {
return nil, errors.New("must specify at least one interval lenght")
}
r := &Bandwidth{
AddRx: make(chan int, 1024),
AddTx: make(chan int, 1024),
Rx: make(chan []float64),
Tx: make(chan []float64),
dt: dt,
Quit: make(chan interface{}),
rxstream: newStream(seconds),
txstream: newStream(seconds),
}
return r, nil
}
func (bw *Bandwidth) run() {
t := time.NewTicker(bw.dt)
outer:
for {
select {
case <-t.C:
bw.rxSnap = bw.rxstream.averages()
bw.txSnap = bw.txstream.averages()
case bw.Rx <- bw.rxSnap:
case bw.Tx <- bw.txSnap:
case s := <-bw.AddRx:
bw.rxstream.add(datum{float64(s), time.Now()})
case s := <-bw.AddTx:
bw.txstream.add(datum{float64(s), time.Now()})
case <-bw.Quit:
break outer
}
}
close(bw.AddRx)
close(bw.AddTx)
}

View File

@ -15,7 +15,7 @@ func general(t *testing.T) {
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
go bw.run() go bw.Run()
bw.AddRx <- 10 bw.AddRx <- 10
bw.AddTx <- 10 bw.AddTx <- 10
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)