bps/bandwidth.go
2015-09-30 23:15:53 -07:00

125 lines
2.7 KiB
Go

// Package bps is a simple tool for keeping track of the rate of bytes
// transmitted
package bps
import (
"fmt"
"io/ioutil"
"sync"
"time"
)
const (
minResolution = 3
)
// BPS keeps track of state for byte counts
//
// Instantiate a BPS then feed bytes either via BPS.Add, or writing to it. When
// the accumulated values are needed call BPS.Cur. When the BPS is no loner
// needed call BPS.Close
type BPS struct {
sync.RWMutex
quit chan interface{}
closed chan interface{}
interval time.Duration
dt time.Duration
// curBs bytes read for this dt
curBs int64
// buckets contains an entry for bytes read for each dt of time in interval
buckets []int64
// timeI keys into buckets for the current point in time
timeI int
}
// New Returns a populated and ready to use BPS. interval is the amount of time
// (for example 60 seconds) over which to track byte flow (bytes/second for the
// last interval), and resolution is used in the following calculation:
//
// dt = interval / resolution (s)
//
// where the dt is the temporal resolution of the updates (add or remove
// information every dt).
func New(interval time.Duration, resolution uint) (*BPS, error) {
if resolution < minResolution {
return nil, fmt.Errorf("resolution must be larger than %d", minResolution)
}
dtns := interval.Nanoseconds() / int64(resolution)
dt := time.Duration(dtns) * time.Nanosecond
r := &BPS{
interval: interval,
dt: dt,
quit: make(chan interface{}),
closed: make(chan interface{}),
buckets: make([]int64, resolution),
}
go r.run()
return r, nil
}
func (b *BPS) runLoop() {
t := time.NewTicker(b.dt)
for {
select {
case <-t.C:
b.Lock()
b.buckets[b.timeI] = b.curBs
b.curBs = 0
b.timeI = (b.timeI + 1) % int(len(b.buckets))
b.Unlock()
case <-b.quit:
return
}
}
}
// Run is a method of BPS that must be started in a goroutine in order
// for things to be functional.
func (b *BPS) run() {
b.runLoop()
close(b.closed)
}
// Write implements io.Writer so that one can simply write bytes to the struct.
func (b *BPS) Write(p []byte) (int, error) {
n, err := ioutil.Discard.Write(p)
b.Add(int64(n))
return n, err
}
// Add adds i to the current time bucket.
func (b *BPS) Add(i int64) {
b.Lock()
b.curBs += i
b.buckets[b.timeI] = b.curBs
b.Unlock()
}
// Rate returns the current rate (bytes / second).
func (b *BPS) Rate() float64 {
b.Lock()
var total int64 = 0
for _, b := range b.buckets {
total += b
}
b.Unlock()
return float64(total) / b.interval.Seconds()
}
// HumanRate returns a human-friendly (e.g. 23.3MB/s) rate.
func (b *BPS) HumanRate() string {
return Bytes(uint64(b.Rate())) + "/s"
}
// Close cleans up and shuts down a BPS.
func (b *BPS) Close() {
close(b.quit)
<-b.closed
}