Skip to content
This repository was archived by the owner on Aug 13, 2019. It is now read-only.

Commit b1cd829

Browse files
authored
Reuse Chunk Iterator (#642)
* Reset method for chunkenc.Iterator Signed-off-by: Ganesh Vernekar <[email protected]> * Reset method only for XORIterator Signed-off-by: Ganesh Vernekar <[email protected]> * Use Reset(...) in querier.go Signed-off-by: Ganesh Vernekar <[email protected]> * Reuse deletedIterator Signed-off-by: Ganesh Vernekar <[email protected]> * Another way of reusing chunk iterators Signed-off-by: Ganesh Vernekar <[email protected]> * Unexport xorIterator Signed-off-by: Ganesh Vernekar <[email protected]> * Fix memSeries.iterator(...) Signed-off-by: Ganesh Vernekar <[email protected]> * Add some comments Signed-off-by: Ganesh Vernekar <[email protected]>
1 parent 8dfa537 commit b1cd829

11 files changed

+90
-45
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
## Master / unreleased
22

3+
- [FEATURE] `chunckenc.Chunk.Iterator` method now takes a `chunckenc.Iterator` interface as an argument for reuse.
4+
35
## 0.9.1
46

57
- [CHANGE] LiveReader metrics are now injected rather than global.
@@ -19,6 +21,7 @@
1921
- [ENHANCEMENT] Reduced disk usage for WAL for small setups.
2022
- [ENHANCEMENT] Optimize queries using regexp for set lookups.
2123

24+
2225
## 0.8.0
2326

2427
- [BUGFIX] Calling `Close` more than once on a querier returns an error instead of a panic.

chunkenc/chunk.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@ type Chunk interface {
4444
Bytes() []byte
4545
Encoding() Encoding
4646
Appender() (Appender, error)
47-
Iterator() Iterator
47+
// The iterator passed as argument is for re-use.
48+
// Depending on implementation, the iterator can
49+
// be re-used or a new iterator can be allocated.
50+
Iterator(Iterator) Iterator
4851
NumSamples() int
4952
}
5053

chunkenc/chunk_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func testChunk(c Chunk) error {
7777
// fmt.Println("appended", len(c.Bytes()), c.Bytes())
7878
}
7979

80-
it := c.Iterator()
80+
it := c.Iterator(nil)
8181
var res []pair
8282
for it.Next() {
8383
ts, v := it.At()
@@ -133,9 +133,10 @@ func benchmarkIterator(b *testing.B, newChunk func() Chunk) {
133133

134134
res := make([]float64, 0, 1024)
135135

136+
var it Iterator
136137
for i := 0; i < len(chunks); i++ {
137138
c := chunks[i]
138-
it := c.Iterator()
139+
it := c.Iterator(it)
139140

140141
for it.Next() {
141142
_, v := it.At()

chunkenc/xor.go

+25-4
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func (c *XORChunk) NumSamples() int {
7777

7878
// Appender implements the Chunk interface.
7979
func (c *XORChunk) Appender() (Appender, error) {
80-
it := c.iterator()
80+
it := c.iterator(nil)
8181

8282
// To get an appender we must know the state it would have if we had
8383
// appended all existing data from scratch.
@@ -102,19 +102,25 @@ func (c *XORChunk) Appender() (Appender, error) {
102102
return a, nil
103103
}
104104

105-
func (c *XORChunk) iterator() *xorIterator {
105+
func (c *XORChunk) iterator(it Iterator) *xorIterator {
106106
// Should iterators guarantee to act on a copy of the data so it doesn't lock append?
107107
// When using striped locks to guard access to chunks, probably yes.
108108
// Could only copy data if the chunk is not completed yet.
109+
if xorIter, ok := it.(*xorIterator); ok {
110+
xorIter.Reset(c.b.bytes())
111+
return xorIter
112+
}
109113
return &xorIterator{
114+
// The first 2 bytes contain chunk headers.
115+
// We skip that for actual samples.
110116
br: newBReader(c.b.bytes()[2:]),
111117
numTotal: binary.BigEndian.Uint16(c.b.bytes()),
112118
}
113119
}
114120

115121
// Iterator implements the Chunk interface.
116-
func (c *XORChunk) Iterator() Iterator {
117-
return c.iterator()
122+
func (c *XORChunk) Iterator(it Iterator) Iterator {
123+
return c.iterator(it)
118124
}
119125

120126
type xorAppender struct {
@@ -243,6 +249,21 @@ func (it *xorIterator) Err() error {
243249
return it.err
244250
}
245251

252+
func (it *xorIterator) Reset(b []byte) {
253+
// The first 2 bytes contain chunk headers.
254+
// We skip that for actual samples.
255+
it.br = newBReader(b[2:])
256+
it.numTotal = binary.BigEndian.Uint16(b)
257+
258+
it.numRead = 0
259+
it.t = 0
260+
it.val = 0
261+
it.leading = 0
262+
it.trailing = 0
263+
it.tDelta = 0
264+
it.err = nil
265+
}
266+
246267
func (it *xorIterator) Next() bool {
247268
if it.err != nil || it.numRead == it.numTotal {
248269
return false

chunks/chunks.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -245,8 +245,8 @@ func MergeChunks(a, b chunkenc.Chunk) (*chunkenc.XORChunk, error) {
245245
if err != nil {
246246
return nil, err
247247
}
248-
ait := a.Iterator()
249-
bit := b.Iterator()
248+
ait := a.Iterator(nil)
249+
bit := b.Iterator(nil)
250250
aok, bok := ait.Next(), bit.Next()
251251
for aok && bok {
252252
at, av := ait.At()

compact.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -736,6 +736,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
736736
return errors.Wrap(err, "add symbols")
737737
}
738738

739+
delIter := &deletedIterator{}
739740
for set.Next() {
740741
select {
741742
case <-c.ctx.Done():
@@ -788,17 +789,18 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
788789
return err
789790
}
790791

791-
it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges}
792+
delIter.it = chk.Chunk.Iterator(delIter.it)
793+
delIter.intervals = dranges
792794

793795
var (
794796
t int64
795797
v float64
796798
)
797-
for it.Next() {
798-
t, v = it.At()
799+
for delIter.Next() {
800+
t, v = delIter.At()
799801
app.Append(t, v)
800802
}
801-
if err := it.Err(); err != nil {
803+
if err := delIter.Err(); err != nil {
802804
return errors.Wrap(err, "iterate chunk while re-encoding")
803805
}
804806

head.go

+13-7
Original file line numberDiff line numberDiff line change
@@ -1185,9 +1185,9 @@ type safeChunk struct {
11851185
cid int
11861186
}
11871187

1188-
func (c *safeChunk) Iterator() chunkenc.Iterator {
1188+
func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator {
11891189
c.s.Lock()
1190-
it := c.s.iterator(c.cid)
1190+
it := c.s.iterator(c.cid, reuseIter)
11911191
c.s.Unlock()
11921192
return it
11931193
}
@@ -1739,7 +1739,7 @@ func computeChunkEndTime(start, cur, max int64) int64 {
17391739
return start + (max-start)/a
17401740
}
17411741

1742-
func (s *memSeries) iterator(id int) chunkenc.Iterator {
1742+
func (s *memSeries) iterator(id int, it chunkenc.Iterator) chunkenc.Iterator {
17431743
c := s.chunk(id)
17441744
// TODO(fabxc): Work around! A querier may have retrieved a pointer to a series' chunk,
17451745
// which got then garbage collected before it got accessed.
@@ -1749,17 +1749,23 @@ func (s *memSeries) iterator(id int) chunkenc.Iterator {
17491749
}
17501750

17511751
if id-s.firstChunkID < len(s.chunks)-1 {
1752-
return c.chunk.Iterator()
1752+
return c.chunk.Iterator(it)
17531753
}
17541754
// Serve the last 4 samples for the last chunk from the sample buffer
17551755
// as their compressed bytes may be mutated by added samples.
1756-
it := &memSafeIterator{
1757-
Iterator: c.chunk.Iterator(),
1756+
if msIter, ok := it.(*memSafeIterator); ok {
1757+
msIter.Iterator = c.chunk.Iterator(msIter.Iterator)
1758+
msIter.i = -1
1759+
msIter.total = c.chunk.NumSamples()
1760+
msIter.buf = s.sampleBuf
1761+
return msIter
1762+
}
1763+
return &memSafeIterator{
1764+
Iterator: c.chunk.Iterator(it),
17581765
i: -1,
17591766
total: c.chunk.NumSamples(),
17601767
buf: s.sampleBuf,
17611768
}
1762-
return it
17631769
}
17641770

17651771
func (s *memSeries) head() *memChunk {

head_test.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,9 @@ func TestHead_ReadWAL(t *testing.T) {
159159
testutil.Ok(t, c.Err())
160160
return x
161161
}
162-
testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0)))
163-
testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0)))
164-
testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0)))
162+
testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil)))
163+
testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil)))
164+
testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0, nil)))
165165
})
166166
}
167167
}
@@ -313,11 +313,11 @@ func TestMemSeries_truncateChunks(t *testing.T) {
313313

314314
// Validate that the series' sample buffer is applied correctly to the last chunk
315315
// after truncation.
316-
it1 := s.iterator(s.chunkID(len(s.chunks) - 1))
316+
it1 := s.iterator(s.chunkID(len(s.chunks)-1), nil)
317317
_, ok := it1.(*memSafeIterator)
318318
testutil.Assert(t, ok == true, "")
319319

320-
it2 := s.iterator(s.chunkID(len(s.chunks) - 2))
320+
it2 := s.iterator(s.chunkID(len(s.chunks)-2), nil)
321321
_, ok = it2.(*memSafeIterator)
322322
testutil.Assert(t, ok == false, "non-last chunk incorrectly wrapped with sample buffer")
323323
}
@@ -451,10 +451,11 @@ func TestHeadDeleteSimple(t *testing.T) {
451451

452452
chunkr, err := h.Chunks()
453453
testutil.Ok(t, err)
454+
var ii chunkenc.Iterator
454455
for _, meta := range chkMetas {
455456
chk, err := chunkr.Chunk(meta.Ref)
456457
testutil.Ok(t, err)
457-
ii := chk.Iterator()
458+
ii = chk.Iterator(ii)
458459
for ii.Next() {
459460
t, v := ii.At()
460461
actSamples = append(actSamples, sample{t: t, v: v})

mocks_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package tsdb
1515

1616
import (
17+
"github.com/prometheus/tsdb/chunkenc"
1718
"github.com/prometheus/tsdb/chunks"
1819
"github.com/prometheus/tsdb/index"
1920
"github.com/prometheus/tsdb/labels"
@@ -40,10 +41,11 @@ func (m *mockIndexWriter) AddSeries(ref uint64, l labels.Labels, chunks ...chunk
4041
i = len(m.series) - 1
4142
}
4243

44+
var iter chunkenc.Iterator
4345
for _, chk := range chunks {
4446
samples := make([]sample, 0, chk.Chunk.NumSamples())
4547

46-
iter := chk.Chunk.Iterator()
48+
iter = chk.Chunk.Iterator(iter)
4749
for iter.Next() {
4850
s := sample{}
4951
s.t, s.v = iter.At()

querier.go

+23-17
Original file line numberDiff line numberDiff line change
@@ -1060,30 +1060,42 @@ func (it *verticalMergeSeriesIterator) Err() error {
10601060
type chunkSeriesIterator struct {
10611061
chunks []chunks.Meta
10621062

1063-
i int
1064-
cur chunkenc.Iterator
1063+
i int
1064+
cur chunkenc.Iterator
1065+
bufDelIter *deletedIterator
10651066

10661067
maxt, mint int64
10671068

10681069
intervals Intervals
10691070
}
10701071

10711072
func newChunkSeriesIterator(cs []chunks.Meta, dranges Intervals, mint, maxt int64) *chunkSeriesIterator {
1072-
it := cs[0].Chunk.Iterator()
1073-
1074-
if len(dranges) > 0 {
1075-
it = &deletedIterator{it: it, intervals: dranges}
1076-
}
1077-
return &chunkSeriesIterator{
1073+
csi := &chunkSeriesIterator{
10781074
chunks: cs,
10791075
i: 0,
1080-
cur: it,
10811076

10821077
mint: mint,
10831078
maxt: maxt,
10841079

10851080
intervals: dranges,
10861081
}
1082+
csi.resetCurIterator()
1083+
1084+
return csi
1085+
}
1086+
1087+
func (it *chunkSeriesIterator) resetCurIterator() {
1088+
if len(it.intervals) == 0 {
1089+
it.cur = it.chunks[it.i].Chunk.Iterator(it.cur)
1090+
return
1091+
}
1092+
if it.bufDelIter == nil {
1093+
it.bufDelIter = &deletedIterator{
1094+
intervals: it.intervals,
1095+
}
1096+
}
1097+
it.bufDelIter.it = it.chunks[it.i].Chunk.Iterator(it.bufDelIter.it)
1098+
it.cur = it.bufDelIter
10871099
}
10881100

10891101
func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
@@ -1102,10 +1114,7 @@ func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
11021114
}
11031115
}
11041116

1105-
it.cur = it.chunks[it.i].Chunk.Iterator()
1106-
if len(it.intervals) > 0 {
1107-
it.cur = &deletedIterator{it: it.cur, intervals: it.intervals}
1108-
}
1117+
it.resetCurIterator()
11091118

11101119
for it.cur.Next() {
11111120
t0, _ := it.cur.At()
@@ -1145,10 +1154,7 @@ func (it *chunkSeriesIterator) Next() bool {
11451154
}
11461155

11471156
it.i++
1148-
it.cur = it.chunks[it.i].Chunk.Iterator()
1149-
if len(it.intervals) > 0 {
1150-
it.cur = &deletedIterator{it: it.cur, intervals: it.intervals}
1151-
}
1157+
it.resetCurIterator()
11521158

11531159
return it.Next()
11541160
}

querier_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1270,7 +1270,7 @@ func TestDeletedIterator(t *testing.T) {
12701270

12711271
for _, c := range cases {
12721272
i := int64(-1)
1273-
it := &deletedIterator{it: chk.Iterator(), intervals: c.r[:]}
1273+
it := &deletedIterator{it: chk.Iterator(nil), intervals: c.r[:]}
12741274
ranges := c.r[:]
12751275
for it.Next() {
12761276
i++

0 commit comments

Comments
 (0)