bandwidth/bandwidth.go

143 lines
2.6 KiB
Go
Raw Normal View History

2014-03-02 22:47:28 -08:00
package bandwidth
2014-03-02 22:44:09 -08:00
import (
"container/list"
"errors"
"fmt"
"time"
)
type datum struct {
value float64
at time.Time
}
func (d datum) String() string {
r := struct {
value string
at string
}{
value: fmt.Sprintf("%04f", d.value),
at: fmt.Sprintf("%s", d.at),
}
return fmt.Sprintf("%+v", r)
}
type stream struct {
data *list.List
extents []time.Duration
max time.Duration
}
2014-03-02 22:47:28 -08:00
func newStream(seconds []int) *stream {
2014-03-02 22:44:09 -08:00
extents := []time.Duration{}
for _, s := range seconds {
extents = append(extents, time.Duration(-s)*time.Second)
}
max := extents[len(extents)-1]
return &stream{
data: list.New(),
extents: extents,
max: max,
}
}
func (s *stream) add(v datum) {
s.data.PushBack(v)
newest := s.data.Back().Value.(datum)
cutoff := newest.at.Add(s.max)
for e := s.data.Front(); e != nil; e = e.Next() {
cur := e.Value.(datum).at
if cur.Before(cutoff) {
s.data.Remove(e)
} else {
break
}
}
}
func (s *stream) averages() []float64 {
var limit time.Time
total := 0.0
totals := []float64{}
if s.data.Back() == nil {
return totals
}
newest := s.data.Back().Value.(datum)
e := s.data.Back()
for _, extent := range s.extents {
limit = newest.at.Add(extent)
for ; e != nil; e = e.Prev() {
if e.Prev() == nil {
break
}
next := e.Prev().Value.(datum)
if next.at.Before(limit) {
break
}
cur := e.Value.(datum)
total += cur.value
}
totals = append(totals, total/float64(-int(extent/time.Second)))
}
return totals
}
type Bandwidth struct {
AddRx chan int
AddTx chan int
Rx chan []float64
Tx chan []float64
rxSnap []float64
txSnap []float64
dt time.Duration
rxstream *stream
txstream *stream
Quit chan interface{}
}
func NewBandwidth(seconds []int, dt time.Duration) (*Bandwidth, error) {
if len(seconds) < 1 {
return nil, errors.New("must specify at least one interval lenght")
}
r := &Bandwidth{
AddRx: make(chan int, 1024),
AddTx: make(chan int, 1024),
Rx: make(chan []float64),
Tx: make(chan []float64),
dt: dt,
Quit: make(chan interface{}),
2014-03-02 22:47:28 -08:00
rxstream: newStream(seconds),
txstream: newStream(seconds),
2014-03-02 22:44:09 -08:00
}
return r, nil
}
func (bw *Bandwidth) run() {
t := time.NewTicker(bw.dt)
outer:
for {
select {
case <-t.C:
bw.rxSnap = bw.rxstream.averages()
bw.txSnap = bw.txstream.averages()
case bw.Rx <- bw.rxSnap:
case bw.Tx <- bw.txSnap:
case s := <-bw.AddRx:
bw.rxstream.add(datum{float64(s), time.Now()})
case s := <-bw.AddTx:
bw.txstream.add(datum{float64(s), time.Now()})
case <-bw.Quit:
break outer
}
}
close(bw.AddRx)
close(bw.AddTx)
}