Skip to content

Commit 8a0c59b

Browse files
TimeSeries insertion filters for close samples (redis#3003)
* TimeSeries insertion filters for close samples * fix * fix * fix * fix * fix --------- Co-authored-by: Vladyslav Vildanov <[email protected]>
1 parent 7539858 commit 8a0c59b

File tree

2 files changed

+186
-19
lines changed

2 files changed

+186
-19
lines changed

timeseries_commands.go

+42-14
Original file line numberDiff line numberDiff line change
@@ -40,25 +40,32 @@ type TimeseriesCmdable interface {
4040
}
4141

4242
type TSOptions struct {
43-
Retention int
44-
ChunkSize int
45-
Encoding string
46-
DuplicatePolicy string
47-
Labels map[string]string
43+
Retention int
44+
ChunkSize int
45+
Encoding string
46+
DuplicatePolicy string
47+
Labels map[string]string
48+
IgnoreMaxTimeDiff int64
49+
IgnoreMaxValDiff float64
4850
}
4951
type TSIncrDecrOptions struct {
50-
Timestamp int64
51-
Retention int
52-
ChunkSize int
53-
Uncompressed bool
54-
Labels map[string]string
52+
Timestamp int64
53+
Retention int
54+
ChunkSize int
55+
Uncompressed bool
56+
DuplicatePolicy string
57+
Labels map[string]string
58+
IgnoreMaxTimeDiff int64
59+
IgnoreMaxValDiff float64
5560
}
5661

5762
type TSAlterOptions struct {
58-
Retention int
59-
ChunkSize int
60-
DuplicatePolicy string
61-
Labels map[string]string
63+
Retention int
64+
ChunkSize int
65+
DuplicatePolicy string
66+
Labels map[string]string
67+
IgnoreMaxTimeDiff int64
68+
IgnoreMaxValDiff float64
6269
}
6370

6471
type TSCreateRuleOptions struct {
@@ -223,6 +230,9 @@ func (c cmdable) TSAddWithArgs(ctx context.Context, key string, timestamp interf
223230
args = append(args, label, value)
224231
}
225232
}
233+
if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
234+
args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
235+
}
226236
}
227237
cmd := NewIntCmd(ctx, args...)
228238
_ = c(ctx, cmd)
@@ -264,6 +274,9 @@ func (c cmdable) TSCreateWithArgs(ctx context.Context, key string, options *TSOp
264274
args = append(args, label, value)
265275
}
266276
}
277+
if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
278+
args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
279+
}
267280
}
268281
cmd := NewStatusCmd(ctx, args...)
269282
_ = c(ctx, cmd)
@@ -292,6 +305,9 @@ func (c cmdable) TSAlter(ctx context.Context, key string, options *TSAlterOption
292305
args = append(args, label, value)
293306
}
294307
}
308+
if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
309+
args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
310+
}
295311
}
296312
cmd := NewStatusCmd(ctx, args...)
297313
_ = c(ctx, cmd)
@@ -351,12 +367,18 @@ func (c cmdable) TSIncrByWithArgs(ctx context.Context, key string, timestamp flo
351367
if options.Uncompressed {
352368
args = append(args, "UNCOMPRESSED")
353369
}
370+
if options.DuplicatePolicy != "" {
371+
args = append(args, "DUPLICATE_POLICY", options.DuplicatePolicy)
372+
}
354373
if options.Labels != nil {
355374
args = append(args, "LABELS")
356375
for label, value := range options.Labels {
357376
args = append(args, label, value)
358377
}
359378
}
379+
if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
380+
args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
381+
}
360382
}
361383
cmd := NewIntCmd(ctx, args...)
362384
_ = c(ctx, cmd)
@@ -391,12 +413,18 @@ func (c cmdable) TSDecrByWithArgs(ctx context.Context, key string, timestamp flo
391413
if options.Uncompressed {
392414
args = append(args, "UNCOMPRESSED")
393415
}
416+
if options.DuplicatePolicy != "" {
417+
args = append(args, "DUPLICATE_POLICY", options.DuplicatePolicy)
418+
}
394419
if options.Labels != nil {
395420
args = append(args, "LABELS")
396421
for label, value := range options.Labels {
397422
args = append(args, label, value)
398423
}
399424
}
425+
if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
426+
args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
427+
}
400428
}
401429
cmd := NewIntCmd(ctx, args...)
402430
_ = c(ctx, cmd)

timeseries_commands_test.go

+144-5
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
2323
Expect(client.Close()).NotTo(HaveOccurred())
2424
})
2525

26-
It("should TSCreate and TSCreateWithArgs", Label("timeseries", "tscreate", "tscreateWithArgs"), func() {
26+
It("should TSCreate and TSCreateWithArgs", Label("timeseries", "tscreate", "tscreateWithArgs", "NonRedisEnterprise"), func() {
2727
result, err := client.TSCreate(ctx, "1").Result()
2828
Expect(err).NotTo(HaveOccurred())
2929
Expect(result).To(BeEquivalentTo("OK"))
@@ -62,10 +62,60 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
6262
resultInfo, err = client.TSInfo(ctx, keyName).Result()
6363
Expect(err).NotTo(HaveOccurred())
6464
Expect(strings.ToUpper(resultInfo["duplicatePolicy"].(string))).To(BeEquivalentTo(dup))
65-
6665
}
66+
// Test insertion filters
67+
opt = &redis.TSOptions{IgnoreMaxTimeDiff: 5, DuplicatePolicy: "LAST", IgnoreMaxValDiff: 10.0}
68+
result, err = client.TSCreateWithArgs(ctx, "ts-if-1", opt).Result()
69+
Expect(err).NotTo(HaveOccurred())
70+
Expect(result).To(BeEquivalentTo("OK"))
71+
resultAdd, err := client.TSAdd(ctx, "ts-if-1", 1000, 1.0).Result()
72+
Expect(err).NotTo(HaveOccurred())
73+
Expect(resultAdd).To(BeEquivalentTo(1000))
74+
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1010, 11.0).Result()
75+
Expect(err).NotTo(HaveOccurred())
76+
Expect(resultAdd).To(BeEquivalentTo(1010))
77+
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1013, 10.0).Result()
78+
Expect(err).NotTo(HaveOccurred())
79+
Expect(resultAdd).To(BeEquivalentTo(1010))
80+
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1020, 11.5).Result()
81+
Expect(err).NotTo(HaveOccurred())
82+
Expect(resultAdd).To(BeEquivalentTo(1020))
83+
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1021, 22.0).Result()
84+
Expect(err).NotTo(HaveOccurred())
85+
Expect(resultAdd).To(BeEquivalentTo(1021))
86+
87+
rangePoints, err := client.TSRange(ctx, "ts-if-1", 1000, 1021).Result()
88+
Expect(err).NotTo(HaveOccurred())
89+
Expect(len(rangePoints)).To(BeEquivalentTo(4))
90+
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{
91+
{Timestamp: 1000, Value: 1.0},
92+
{Timestamp: 1010, Value: 11.0},
93+
{Timestamp: 1020, Value: 11.5},
94+
{Timestamp: 1021, Value: 22.0}}))
95+
// Test insertion filters with other duplicate policy
96+
opt = &redis.TSOptions{IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0}
97+
result, err = client.TSCreateWithArgs(ctx, "ts-if-2", opt).Result()
98+
Expect(err).NotTo(HaveOccurred())
99+
Expect(result).To(BeEquivalentTo("OK"))
100+
resultAdd1, err := client.TSAdd(ctx, "ts-if-1", 1000, 1.0).Result()
101+
Expect(err).NotTo(HaveOccurred())
102+
Expect(resultAdd1).To(BeEquivalentTo(1000))
103+
resultAdd1, err = client.TSAdd(ctx, "ts-if-1", 1010, 11.0).Result()
104+
Expect(err).NotTo(HaveOccurred())
105+
Expect(resultAdd1).To(BeEquivalentTo(1010))
106+
resultAdd1, err = client.TSAdd(ctx, "ts-if-1", 1013, 10.0).Result()
107+
Expect(err).NotTo(HaveOccurred())
108+
Expect(resultAdd1).To(BeEquivalentTo(1013))
109+
110+
rangePoints, err = client.TSRange(ctx, "ts-if-1", 1000, 1013).Result()
111+
Expect(err).NotTo(HaveOccurred())
112+
Expect(len(rangePoints)).To(BeEquivalentTo(3))
113+
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{
114+
{Timestamp: 1000, Value: 1.0},
115+
{Timestamp: 1010, Value: 11.0},
116+
{Timestamp: 1013, Value: 10.0}}))
67117
})
68-
It("should TSAdd and TSAddWithArgs", Label("timeseries", "tsadd", "tsaddWithArgs"), func() {
118+
It("should TSAdd and TSAddWithArgs", Label("timeseries", "tsadd", "tsaddWithArgs", "NonRedisEnterprise"), func() {
69119
result, err := client.TSAdd(ctx, "1", 1, 1).Result()
70120
Expect(err).NotTo(HaveOccurred())
71121
Expect(result).To(BeEquivalentTo(1))
@@ -138,9 +188,23 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
138188
resultGet, err = client.TSGet(ctx, "tsami-1").Result()
139189
Expect(err).NotTo(HaveOccurred())
140190
Expect(resultGet.Value).To(BeEquivalentTo(5))
191+
// Insertion filters
192+
opt = &redis.TSOptions{IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0, DuplicatePolicy: "LAST"}
193+
result, err = client.TSAddWithArgs(ctx, "ts-if-1", 1000, 1.0, opt).Result()
194+
Expect(err).NotTo(HaveOccurred())
195+
Expect(result).To(BeEquivalentTo(1000))
196+
197+
result, err = client.TSAddWithArgs(ctx, "ts-if-1", 1004, 3.0, opt).Result()
198+
Expect(err).NotTo(HaveOccurred())
199+
Expect(result).To(BeEquivalentTo(1000))
200+
201+
rangePoints, err := client.TSRange(ctx, "ts-if-1", 1000, 1004).Result()
202+
Expect(err).NotTo(HaveOccurred())
203+
Expect(len(rangePoints)).To(BeEquivalentTo(1))
204+
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: 1.0}}))
141205
})
142206

143-
It("should TSAlter", Label("timeseries", "tsalter"), func() {
207+
It("should TSAlter", Label("timeseries", "tsalter", "NonRedisEnterprise"), func() {
144208
result, err := client.TSCreate(ctx, "1").Result()
145209
Expect(err).NotTo(HaveOccurred())
146210
Expect(result).To(BeEquivalentTo("OK"))
@@ -179,6 +243,33 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
179243
resultInfo, err = client.TSInfo(ctx, "1").Result()
180244
Expect(err).NotTo(HaveOccurred())
181245
Expect(resultInfo["duplicatePolicy"]).To(BeEquivalentTo("min"))
246+
// Test insertion filters
247+
resultAdd, err := client.TSAdd(ctx, "ts-if-1", 1000, 1.0).Result()
248+
Expect(err).NotTo(HaveOccurred())
249+
Expect(resultAdd).To(BeEquivalentTo(1000))
250+
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1010, 11.0).Result()
251+
Expect(err).NotTo(HaveOccurred())
252+
Expect(resultAdd).To(BeEquivalentTo(1010))
253+
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1013, 10.0).Result()
254+
Expect(err).NotTo(HaveOccurred())
255+
Expect(resultAdd).To(BeEquivalentTo(1013))
256+
257+
alterOpt := &redis.TSAlterOptions{IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0, DuplicatePolicy: "LAST"}
258+
resultAlter, err = client.TSAlter(ctx, "ts-if-1", alterOpt).Result()
259+
Expect(err).NotTo(HaveOccurred())
260+
Expect(resultAlter).To(BeEquivalentTo("OK"))
261+
262+
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1015, 11.5).Result()
263+
Expect(err).NotTo(HaveOccurred())
264+
Expect(resultAdd).To(BeEquivalentTo(1013))
265+
266+
rangePoints, err := client.TSRange(ctx, "ts-if-1", 1000, 1013).Result()
267+
Expect(err).NotTo(HaveOccurred())
268+
Expect(len(rangePoints)).To(BeEquivalentTo(3))
269+
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{
270+
{Timestamp: 1000, Value: 1.0},
271+
{Timestamp: 1010, Value: 11.0},
272+
{Timestamp: 1013, Value: 10.0}}))
182273
})
183274

184275
It("should TSCreateRule and TSDeleteRule", Label("timeseries", "tscreaterule", "tsdeleterule"), func() {
@@ -216,7 +307,7 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
216307
Expect(resultInfo["rules"]).To(BeEquivalentTo(map[interface{}]interface{}{}))
217308
})
218309

219-
It("should TSIncrBy, TSIncrByWithArgs, TSDecrBy and TSDecrByWithArgs", Label("timeseries", "tsincrby", "tsdecrby", "tsincrbyWithArgs", "tsdecrbyWithArgs"), func() {
310+
It("should TSIncrBy, TSIncrByWithArgs, TSDecrBy and TSDecrByWithArgs", Label("timeseries", "tsincrby", "tsdecrby", "tsincrbyWithArgs", "tsdecrbyWithArgs", "NonRedisEnterprise"), func() {
220311
for i := 0; i < 100; i++ {
221312
_, err := client.TSIncrBy(ctx, "1", 1).Result()
222313
Expect(err).NotTo(HaveOccurred())
@@ -277,6 +368,54 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
277368
resultInfo, err = client.TSInfo(ctx, "4").Result()
278369
Expect(err).NotTo(HaveOccurred())
279370
Expect(resultInfo["chunkSize"]).To(BeEquivalentTo(128))
371+
372+
// Test insertion filters INCRBY
373+
opt = &redis.TSIncrDecrOptions{Timestamp: 1000, IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0, DuplicatePolicy: "LAST"}
374+
res, err := client.TSIncrByWithArgs(ctx, "ts-if-1", 1.0, opt).Result()
375+
Expect(err).NotTo(HaveOccurred())
376+
Expect(res).To(BeEquivalentTo(1000))
377+
378+
res, err = client.TSIncrByWithArgs(ctx, "ts-if-1", 3.0, &redis.TSIncrDecrOptions{Timestamp: 1000}).Result()
379+
Expect(err).NotTo(HaveOccurred())
380+
Expect(res).To(BeEquivalentTo(1000))
381+
382+
rangePoints, err := client.TSRange(ctx, "ts-if-1", 1000, 1004).Result()
383+
Expect(err).NotTo(HaveOccurred())
384+
Expect(len(rangePoints)).To(BeEquivalentTo(1))
385+
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: 1.0}}))
386+
387+
res, err = client.TSIncrByWithArgs(ctx, "ts-if-1", 10.1, &redis.TSIncrDecrOptions{Timestamp: 1000}).Result()
388+
Expect(err).NotTo(HaveOccurred())
389+
Expect(res).To(BeEquivalentTo(1000))
390+
391+
rangePoints, err = client.TSRange(ctx, "ts-if-1", 1000, 1004).Result()
392+
Expect(err).NotTo(HaveOccurred())
393+
Expect(len(rangePoints)).To(BeEquivalentTo(1))
394+
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: 11.1}}))
395+
396+
// Test insertion filters DECRBY
397+
opt = &redis.TSIncrDecrOptions{Timestamp: 1000, IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0, DuplicatePolicy: "LAST"}
398+
res, err = client.TSDecrByWithArgs(ctx, "ts-if-2", 1.0, opt).Result()
399+
Expect(err).NotTo(HaveOccurred())
400+
Expect(res).To(BeEquivalentTo(1000))
401+
402+
res, err = client.TSDecrByWithArgs(ctx, "ts-if-2", 3.0, &redis.TSIncrDecrOptions{Timestamp: 1000}).Result()
403+
Expect(err).NotTo(HaveOccurred())
404+
Expect(res).To(BeEquivalentTo(1000))
405+
406+
rangePoints, err = client.TSRange(ctx, "ts-if-2", 1000, 1004).Result()
407+
Expect(err).NotTo(HaveOccurred())
408+
Expect(len(rangePoints)).To(BeEquivalentTo(1))
409+
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: -1.0}}))
410+
411+
res, err = client.TSDecrByWithArgs(ctx, "ts-if-2", 10.1, &redis.TSIncrDecrOptions{Timestamp: 1000}).Result()
412+
Expect(err).NotTo(HaveOccurred())
413+
Expect(res).To(BeEquivalentTo(1000))
414+
415+
rangePoints, err = client.TSRange(ctx, "ts-if-2", 1000, 1004).Result()
416+
Expect(err).NotTo(HaveOccurred())
417+
Expect(len(rangePoints)).To(BeEquivalentTo(1))
418+
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: -11.1}}))
280419
})
281420

282421
It("should TSGet", Label("timeseries", "tsget"), func() {

0 commit comments

Comments
 (0)