// Package bps is a simple tool for keeping track of the rate of bytes // transmitted package bps import ( "fmt" "io/ioutil" "sync" "time" ) const ( minResolution = 3 ) // BPS keeps track of state for byte counts // // Instantiate a BPS then feed bytes either via BPS.Add, or writing to it. When // the accumulated values are needed call BPS.Cur. When the BPS is no loner // needed call BPS.Close type BPS struct { sync.RWMutex quit chan interface{} closed chan interface{} interval time.Duration dt time.Duration // curBs bytes read for this dt curBs int64 // buckets contains an entry for bytes read for each dt of time in interval buckets []int64 // timeI keys into buckets for the current point in time timeI int } // New Returns a populated and ready to use BPS. interval is the amount of time // (for example 60 seconds) over which to track byte flow (bytes/second for the // last interval), and resolution is used in the following calculation: // // dt = interval / resolution (s) // // where the dt is the temporal resolution of the updates (add or remove // information every dt). func New(interval time.Duration, resolution uint) (*BPS, error) { if resolution < minResolution { return nil, fmt.Errorf("resolution must be larger than %d", minResolution) } dtns := interval.Nanoseconds() / int64(resolution) dt := time.Duration(dtns) * time.Nanosecond r := &BPS{ interval: interval, dt: dt, quit: make(chan interface{}), closed: make(chan interface{}), buckets: make([]int64, resolution), } go r.run() return r, nil } func (b *BPS) runLoop() { t := time.NewTicker(b.dt) for { select { case <-t.C: b.Lock() b.buckets[b.timeI] = b.curBs b.curBs = 0 b.timeI = (b.timeI + 1) % int(len(b.buckets)) b.Unlock() case <-b.quit: return } } } // Run is a method of BPS that must be started in a goroutine in order // for things to be functional. func (b *BPS) run() { b.runLoop() close(b.closed) } // Write implements io.Writer so that one can simply write bytes to the struct. func (b *BPS) Write(p []byte) (int, error) { n, err := ioutil.Discard.Write(p) b.Add(int64(n)) return n, err } // Add adds i to the current time bucket. func (b *BPS) Add(i int64) { b.Lock() b.curBs += i b.buckets[b.timeI] = b.curBs b.Unlock() } // Rate returns the current rate (bytes / second). func (b *BPS) Rate() float64 { b.Lock() var total int64 = 0 for _, b := range b.buckets { total += b } b.Unlock() return float64(total) / b.interval.Seconds() } // HumanRate returns a human-friendly (e.g. 23.3MB/s) rate. func (b *BPS) HumanRate() string { return Bytes(uint64(b.Rate())) + "/s" } // Close cleans up and shuts down a BPS. func (b *BPS) Close() { close(b.quit) <-b.closed }