bandwidth is a little package to be used in collecting and reporting on bandwidth numbers.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

121 lines
2.7KB

  1. // Package bandwidth is a simple tool for keeping track of transmitted and
  2. // received counts (ostensibly bytes).
  3. package bandwidth
  4. import (
  5. "errors"
  6. "sort"
  7. "time"
  8. )
  9. // Bandwidth keeps track of state for Rx and Tx (byte) counts. Instantiate
  10. // a Bandwidth then feed values using the AddRx and AddTx channels. When the
  11. // accumulated values are needed, simply read from the Rx or Tx chans. When the
  12. // Bandwidth is no loner needed close the Quit chan.
  13. type Bandwidth struct {
  14. // write number of bytes to us
  15. AddRx chan int
  16. AddTx chan int
  17. // read stats from us
  18. Rx chan []float64
  19. Tx chan []float64
  20. Quit chan interface{}
  21. rxSnap []float64
  22. txSnap []float64
  23. dt time.Duration
  24. dts []int
  25. curRx int
  26. curTx int
  27. rxstream []int
  28. txstream []int
  29. timeI int
  30. max int
  31. }
  32. // NewBandwidth Returns a populated and ready to launch Bandwidth. seconds is
  33. // a slice of multiples of dt on which to report (e.g. 1x, 10x, 60x dt). dt is
  34. // also how often the values used to send to Rx and Tx are updated.
  35. func NewBandwidth(dts []int, dt time.Duration) (*Bandwidth, error) {
  36. if len(dts) < 1 {
  37. return nil, errors.New("must specify at least one interval lenght")
  38. }
  39. sort.Ints(dts)
  40. max := dts[len(dts)-1]
  41. r := &Bandwidth{
  42. AddRx: make(chan int, 1024),
  43. AddTx: make(chan int, 1024),
  44. Rx: make(chan []float64),
  45. Tx: make(chan []float64),
  46. dt: dt,
  47. dts: dts,
  48. Quit: make(chan interface{}),
  49. rxstream: make([]int, max),
  50. txstream: make([]int, max),
  51. max: max,
  52. }
  53. return r, nil
  54. }
  55. // Run is a method of Bandwidth that must be started in a goroutine in order
  56. // for things to be functional.
  57. func (bw *Bandwidth) Run() {
  58. t := time.NewTicker(bw.dt)
  59. outer:
  60. for {
  61. select {
  62. case <-t.C:
  63. bw.rxstream[bw.timeI] = bw.curRx
  64. bw.txstream[bw.timeI] = bw.curTx
  65. bw.rxSnap = bw.averages(bw.rxstream)
  66. bw.txSnap = bw.averages(bw.txstream)
  67. bw.curTx = 0
  68. bw.curRx = 0
  69. // n.b.: here we march forward through time by going backward in
  70. // our slice.
  71. bw.timeI = (bw.timeI - 1) % bw.max
  72. // lol: because modulo does unexpected things for negative numbers.
  73. if bw.timeI < 0 {
  74. bw.timeI = bw.timeI + bw.max
  75. }
  76. // log.Printf("%d %+v", bw.timeI, bw.rxstream)
  77. case bw.Rx <- bw.rxSnap:
  78. case bw.Tx <- bw.txSnap:
  79. case s := <-bw.AddRx:
  80. bw.curRx += s
  81. case s := <-bw.AddTx:
  82. bw.curTx += s
  83. case <-bw.Quit:
  84. break outer
  85. }
  86. }
  87. close(bw.AddRx)
  88. close(bw.AddTx)
  89. close(bw.Rx)
  90. close(bw.Tx)
  91. }
  92. func (bw *Bandwidth) averages(state []int) []float64 {
  93. r := []float64{}
  94. var i int = 0
  95. total := 0
  96. for _, ti := range bw.dts {
  97. for ; ; i++ {
  98. if i == ti {
  99. break
  100. }
  101. total += state[(bw.timeI+i)%bw.max]
  102. }
  103. r = append(r, float64(total)/float64(ti))
  104. }
  105. return r
  106. }