forked from chenjiandongx/mandodb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
segment.go
131 lines (101 loc) · 2.32 KB
/
segment.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package mandodb
import (
"os"
"sync"
"github.com/chenjiandongx/mandodb/pkg/sortedlist"
)
type SegmentType string
const (
DiskSegmentType SegmentType = "DISK"
MemorySegmentType = "MEMORY"
)
type Segment interface {
InsertRows(row []*Row)
QueryRange(lms LabelMatcherSet, start, end int64) ([]MetricRet, error)
QuerySeries(lms LabelMatcherSet) ([]LabelSet, error)
QueryLabelValues(label string) []string
MinTs() int64
MaxTs() int64
Frozen() bool
Close() error
Cleanup() error
Type() SegmentType
Load() Segment
}
type Desc struct {
SeriesCount int64 `json:"seriesCount"`
DataPointsCount int64 `json:"dataPointsCount"`
MaxTs int64 `json:"maxTs"`
MinTs int64 `json:"minTs"`
}
type segmentList struct {
mut sync.Mutex
head Segment
lst sortedlist.List
}
func newSegmentList() *segmentList {
return &segmentList{head: newMemorySegment(), lst: sortedlist.NewTree()}
}
func (sl *segmentList) Get(start, end int64) []Segment {
sl.mut.Lock()
defer sl.mut.Unlock()
segs := make([]Segment, 0)
iter := sl.lst.All()
for iter.Next() {
seg := iter.Value().(Segment)
if sl.Choose(seg, start, end) {
segs = append(segs, seg)
}
}
// 头部永远是最新的 所以放最后
if sl.Choose(sl.head, start, end) {
segs = append(segs, sl.head)
}
return segs
}
func (sl *segmentList) Choose(seg Segment, start, end int64) bool {
if seg.MinTs() < start && seg.MaxTs() > start {
return true
}
if seg.MinTs() > start && seg.MaxTs() < end {
return true
}
if seg.MinTs() < end && seg.MaxTs() > end {
return true
}
return false
}
func (sl *segmentList) Add(segment Segment) {
sl.mut.Lock()
defer sl.mut.Unlock()
sl.lst.Add(segment.MinTs(), segment)
}
func (sl *segmentList) Remove(segment Segment) error {
sl.mut.Lock()
defer sl.mut.Unlock()
if err := segment.Close(); err != nil {
return err
}
if err := segment.Cleanup(); err != nil {
return err
}
sl.lst.Remove(segment.MinTs())
return nil
}
func (sl *segmentList) Replace(pre, nxt Segment) error {
sl.mut.Lock()
defer sl.mut.Unlock()
if err := pre.Close(); err != nil {
return err
}
if err := pre.Cleanup(); err != nil {
return err
}
sl.lst.Add(pre.MinTs(), nxt)
return nil
}
const metricName = "__name__"
func isFileExist(path string) bool {
_, err := os.Stat(path)
return !os.IsNotExist(err)
}