// Package bps keeps track of byte rates package bps import ( "fmt" "io/ioutil" "sync" "time" ) const ( minResolution = 3 ) // BPS calculates bandwith rates. // // Instantiate a BPS using one of the New* functions then feed bytes either via // BPS.Add, or writing to it (it implements io.Writer). When the accumulated // values are needed call BPS.Rate. When the BPS is no loner needed call // BPS.Close type BPS struct { sync.RWMutex quit chan interface{} closed chan interface{} interval time.Duration dt time.Duration // curBs bytes read for this dt curBs int64 // buckets contains an entry for bytes read for each dt of time in interval buckets []int64 // timeI keys into buckets for the current point in time timeI int } // New Returns a populated and ready to use BPS. // // bytes per second are reported for the last time interval (defined by // interval), and is updated every (interval / 100) seconds. func New(interval time.Duration) (*BPS, error) { return NewPrecise(interval, 100) } // MustNew behaves like New, but panics on error. func MustNew(interval time.Duration) *BPS { b, err := New(interval) if err != nil { panic(err) } return b } // NewPrecise behaves like New, but allows the user to specify the temporal // resolution. // // If you need to query more or less frequently than interval / 100 you'll want // to use this function. func NewPrecise(interval time.Duration, resolution uint) (*BPS, error) { if resolution < minResolution { return nil, fmt.Errorf("resolution must be larger than %d", minResolution) } dtns := interval.Nanoseconds() / int64(resolution) dt := time.Duration(dtns) * time.Nanosecond r := &BPS{ interval: interval, dt: dt, quit: make(chan interface{}), closed: make(chan interface{}), buckets: make([]int64, resolution), } go r.run() return r, nil } func (b *BPS) runLoop() { t := time.NewTicker(b.dt) for { select { case <-t.C: b.Lock() b.buckets[b.timeI] = b.curBs b.curBs = 0 b.timeI = (b.timeI + 1) % int(len(b.buckets)) b.Unlock() case <-b.quit: return } } } // Run is a method of BPS that must be started in a goroutine in order // for things to be functional. func (b *BPS) run() { b.runLoop() close(b.closed) } // Write implements io.Writer so that one can simply write bytes to the struct. func (b *BPS) Write(p []byte) (int, error) { n, err := ioutil.Discard.Write(p) b.Add(int64(n)) return n, err } // Add adds i to the current time bucket. func (b *BPS) Add(i int64) { b.Lock() b.curBs += i b.buckets[b.timeI] = b.curBs b.Unlock() } // Rate returns the current rate (bytes / second). func (b *BPS) Rate() float64 { b.Lock() var total int64 = 0 for _, b := range b.buckets { total += b } b.Unlock() return float64(total) / b.interval.Seconds() } // HumanRate returns a human-friendly (e.g. 23.3MB/s) rate. func (b *BPS) HumanRate() string { return human(uint64(b.Rate())) + "/s" } // Close cleans up and shuts down a BPS. func (b *BPS) Close() { close(b.quit) <-b.closed }