bps/bps.go

184 lines
3.9 KiB
Go

// Package bps keeps track of byte rates
package bps
import (
"fmt"
"io/ioutil"
"sync"
"time"
)
const (
minResolution = 3
)
// BPS calculates bandwith rates.
//
// Instantiate a BPS using one of the New* functions then feed bytes either via
// BPS.Add, or writing to it (it implements io.Writer). When the accumulated
// values are needed call BPS.Rate. When the BPS is no loner needed call
// BPS.Close
type BPS struct {
sync.RWMutex
quit chan interface{}
closed chan interface{}
interval time.Duration
dt time.Duration
// curBs bytes read for this dt
curBs int64
// buckets contains an entry for bytes read for each dt of time in interval
buckets []int64
// timeI keys into buckets for the current point in time
timeI int
}
// New Returns a populated and ready to use BPS.
//
// bytes per second are reported for the last time interval (defined by
// interval), and is updated every (interval / 100) seconds.
func New(interval time.Duration) (*BPS, error) {
return NewPrecise(interval, 100)
}
// MustNew behaves like New, but panics on error.
func MustNew(interval time.Duration) *BPS {
b, err := New(interval)
if err != nil {
panic(err)
}
return b
}
// NewPrecise behaves like New, but allows the user to specify the temporal
// resolution.
//
// If you need to query more or less frequently than interval / 100 you'll want
// to use this function.
func NewPrecise(interval time.Duration, resolution uint) (*BPS, error) {
if resolution < minResolution {
return nil, fmt.Errorf("resolution must be larger than %d", minResolution)
}
dtns := interval.Nanoseconds() / int64(resolution)
dt := time.Duration(dtns) * time.Nanosecond
r := &BPS{
interval: interval,
dt: dt,
quit: make(chan interface{}),
closed: make(chan interface{}),
buckets: make([]int64, resolution),
}
go r.run()
return r, nil
}
func (b *BPS) runLoop() {
t := time.NewTicker(b.dt)
for {
select {
case <-t.C:
b.Lock()
b.buckets[b.timeI] = b.curBs
b.curBs = 0
b.timeI = (b.timeI + 1) % int(len(b.buckets))
b.Unlock()
case <-b.quit:
return
}
}
}
// Run is a method of BPS that must be started in a goroutine in order
// for things to be functional.
func (b *BPS) run() {
b.runLoop()
close(b.closed)
}
// Write implements io.Writer so that one can simply write bytes to the struct.
func (b *BPS) Write(p []byte) (int, error) {
n, err := ioutil.Discard.Write(p)
b.Add(int64(n))
return n, err
}
// Add adds i to the current time bucket.
func (b *BPS) Add(i int64) {
b.Lock()
b.curBs += i
b.buckets[b.timeI] = b.curBs
b.Unlock()
}
// Rate returns the current rate (bytes / second).
func (b *BPS) Rate() float64 {
b.Lock()
var total int64 = 0
for _, b := range b.buckets {
total += b
}
b.Unlock()
return float64(total) / b.interval.Seconds()
}
// HumanRate returns a human-friendly (e.g. 23.3MB/s) rate.
func (b *BPS) HumanRate() string {
return human(uint64(b.Rate())) + "/s"
}
// Sparkline returns a human-friendly sprakline of history
func (b *BPS) Sparkline(count int) string {
if count <= 0 || count > len(b.buckets) {
return "bucket count inappropriate"
}
sparks := []string{"▁", "▂", "▃", "▄", "▅", "▆", "▇", "█"}
line := ""
bucketsPer := len(b.buckets) / count
b.Lock()
defer b.Unlock()
var max int64
var cur int64
// find max
for i := 0; i < len(b.buckets); i++ {
cur += b.buckets[i]
if i%bucketsPer == 0 {
if cur > max {
max = cur
}
cur = 0
}
}
if max == 0 {
for i := 0; i < count; i++ {
line += sparks[0]
}
return line
}
cur = 0
for i := 0; i < len(b.buckets); i++ {
val := b.buckets[(b.timeI+i)%len(b.buckets)]
cur += val
if i%bucketsPer == 0 {
approx := int((float64(cur) / float64(max)) * float64(len(sparks)-1))
line += sparks[approx]
cur = 0
}
}
return line
}
// Close cleans up and shuts down a BPS.
func (b *BPS) Close() {
close(b.quit)
<-b.closed
}