...
1 package utils
2
3 import (
4 "sync"
5 "time"
6 )
7
8 type MiniTimeSeries struct {
9 sync.Mutex
10 max_keep time.Duration
11 values map[int64]float64
12 }
13
14
15 func NewMiniTimeSeries(keep time.Duration) *MiniTimeSeries {
16 res := &MiniTimeSeries{max_keep: keep, values: make(map[int64]float64)}
17 return res
18 }
19
20
21 func (mt *MiniTimeSeries) GC() {
22 mt.Lock()
23 defer mt.Unlock()
24 cutoff := time.Now().Add(0 - mt.max_keep).Unix()
25 var deletes []int64
26 for i, _ := range mt.values {
27 if i <= cutoff {
28 deletes = append(deletes, i)
29 }
30 }
31 for _, d := range deletes {
32 delete(mt.values, d)
33 }
34
35 }
36
37
38 func (mt *MiniTimeSeries) Difference() float64 {
39 mt.GC()
40 _, fl := mt.EarliestValue()
41 _, fh := mt.LatestValue()
42 return fh - fl
43 }
44
45 func (mt *MiniTimeSeries) EarliestValue() (time.Time, float64) {
46 mt.Lock()
47 defer mt.Unlock()
48 cur_ts := int64(0)
49 cur_val := 0.0
50 for ts, val := range mt.values {
51 if cur_ts == 0 || cur_ts >= ts {
52 cur_ts = ts
53 cur_val = val
54 }
55 }
56 t := time.Unix(cur_ts, 0)
57 return t, cur_val
58 }
59 func (mt *MiniTimeSeries) LatestValue() (time.Time, float64) {
60 mt.Lock()
61 defer mt.Unlock()
62 cur_ts := int64(0)
63 cur_val := 0.0
64 for ts, val := range mt.values {
65 if cur_ts == 0 || cur_ts <= ts {
66 cur_ts = ts
67 cur_val = val
68 }
69 }
70 t := time.Unix(cur_ts, 0)
71 return t, cur_val
72 }
73
74
75 func (mt *MiniTimeSeries) Add(value float64) {
76 mt.Lock()
77 defer mt.Unlock()
78 now := time.Now().Unix()
79 mt.values[now] = value
80
81 }
82
83
84 func (mt *MiniTimeSeries) AddWithTimestamp(ts time.Time, value float64) {
85 mt.Lock()
86 defer mt.Unlock()
87 now := ts.Unix()
88 mt.values[now] = value
89
90 }
91
92
93 func (mt *MiniTimeSeries) All() map[int64]float64 {
94 mt.Lock()
95 defer mt.Unlock()
96 res := make(map[int64]float64)
97 for k, v := range mt.values {
98 res[k] = v
99 }
100 return res
101 }
102
View as plain text