bps/bps.go

139 lines
3.1 KiB
Go

// Package bps keeps track of byte rates
package bps
import (
"fmt"
"io/ioutil"
"sync"
"time"
)
const (
minResolution = 3
)
// BPS calculates bandwidth rates.
//
// Instantiate a BPS using one of the New* functions then feed bytes either via
// BPS.Add, or writing to it (it implements io.Writer). When the accumulated
// values are needed call BPS.Rate. When the BPS is no loner needed call
// BPS.Close
type BPS struct {
sync.RWMutex
quit chan interface{}
closed chan interface{}
interval time.Duration
dt time.Duration
// curBs bytes read for this dt
curBs int64
// buckets contains an entry for bytes read for each dt of time in interval
buckets []int64
// timeI keys into buckets for the current point in time
timeI int
}
// New Returns a populated and ready to use BPS.
//
// bytes per second are reported for the last time interval (defined by
// interval), and is updated every (interval / 100) seconds.
func New(interval time.Duration) (*BPS, error) {
return NewPrecise(interval, 100)
}
// NewPrecise behaves like New, but allows the user to specify the temporal
// resolution.
//
// If you need to query more or less frequently than interval / 100 you'll want
// to use this function.
func NewPrecise(interval time.Duration, resolution uint) (*BPS, error) {
if resolution < minResolution {
return nil, fmt.Errorf("resolution must be larger than %d", minResolution)
}
dtns := interval.Nanoseconds() / int64(resolution)
dt := time.Duration(dtns) * time.Nanosecond
r := &BPS{
interval: interval,
dt: dt,
quit: make(chan interface{}),
closed: make(chan interface{}),
buckets: make([]int64, resolution),
}
go r.run()
return r, nil
}
func (b *BPS) runLoop() {
t := time.NewTicker(b.dt)
for {
select {
case <-t.C:
b.Lock()
b.buckets[b.timeI] = b.curBs
b.curBs = 0
b.timeI = (b.timeI + 1) % int(len(b.buckets))
b.Unlock()
case <-b.quit:
return
}
}
}
// Run is a method of BPS that must be started in a goroutine in order
// for things to be functional.
func (b *BPS) run() {
b.runLoop()
close(b.closed)
}
// Write implements io.Writer so that one can simply write bytes to the struct.
func (b *BPS) Write(p []byte) (int, error) {
n, err := ioutil.Discard.Write(p)
b.Add(int64(n))
return n, err
}
// Add adds i to the current time bucket.
func (b *BPS) Add(i int64) {
b.Lock()
b.curBs += i
b.buckets[b.timeI] = b.curBs
b.Unlock()
}
// Rate returns the current rate (bytes / second).
func (b *BPS) Rate() float64 {
b.Lock()
var total int64
for _, b := range b.buckets {
total += b
}
b.Unlock()
return float64(total) / b.interval.Seconds()
}
// HumanRate returns a human-friendly (e.g. 23.3MB/s) rate.
func (b *BPS) HumanRate() string {
return human(uint64(b.Rate())) + "/s"
}
// Close cleans up and shuts down a BPS.
func (b *BPS) Close() {
close(b.quit)
<-b.closed
}
// Data returns an slice of all the buckets, the most recent bucket first.
func (b *BPS) Data() []int {
r := []int{}
for i := len(b.buckets) - 1; i >= 0; i-- {
r = append(r, int(b.buckets[(b.timeI+i)%len(b.buckets)]))
}
return r
}