Skip to content

Commit 9336d4f

Browse files
committed
kv: use origin timestamp to validate lww semantics
The PR updates the implementation of put, cput, delete, and range delete to return an error if a replicated write with an origin timestamp would overwrite a value with a more recent origin or MVCC timestamp. No explicit version checks are included. In a mixed-version environment, a SQL writer running 25.1 and writing to a 25.2 KV server will encounter "condition failed" errors if an insert would overwrite a tombstone and retry the write until it is upgraded or the LDR PTS ages out. This is acceptable, as LDR remains in preview and the SQL writer is neither the default nor the recommended writer. Release note: none Epic: CRDB-48647
1 parent 8c96475 commit 9336d4f

10 files changed

+676
-79
lines changed

pkg/kv/BUILD.bazel

+1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ go_test(
5151
"admission_test.go",
5252
"client_test.go",
5353
"db_test.go",
54+
"kv_client_lww_test.go",
5455
"main_test.go",
5556
"range_lookup_test.go",
5657
"txn_external_test.go",

pkg/kv/kv_client_lww_test.go

+355
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,355 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package kv_test
7+
8+
import (
9+
"context"
10+
"testing"
11+
12+
"github.com/cockroachdb/cockroach/pkg/base"
13+
"github.com/cockroachdb/cockroach/pkg/kv"
14+
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
15+
"github.com/cockroachdb/cockroach/pkg/roachpb"
16+
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
17+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
18+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
19+
"github.com/cockroachdb/cockroach/pkg/util/log"
20+
"github.com/cockroachdb/errors"
21+
"github.com/stretchr/testify/require"
22+
)
23+
24+
// unwrapConditionFailedError unwraps an error into a kvpb.ConditionFailedError.
25+
// It fails the test if there is no error or if the error cannot be unwrapped
26+
// into the expected type.
27+
func unwrapConditionFailedError(t *testing.T, err error) *kvpb.ConditionFailedError {
28+
t.Helper()
29+
require.Error(t, err, "expected an error")
30+
31+
var conditionFailedErr *kvpb.ConditionFailedError
32+
require.True(t, errors.As(err, &conditionFailedErr),
33+
"expected error to unwrap to *kvpb.ConditionFailedError, got: %v", err)
34+
35+
return conditionFailedErr
36+
}
37+
38+
func TestDeleteWithOriginTimestamp(t *testing.T) {
39+
defer leaktest.AfterTest(t)()
40+
defer log.Scope(t).Close(t)
41+
42+
s, _, db := serverutils.StartServer(t, base.TestServerArgs{})
43+
defer s.Stopper().Stop(context.Background())
44+
45+
ctx := context.Background()
46+
key := roachpb.Key("lww-test-key")
47+
value := []byte("value")
48+
49+
// Put a value with the current timestamp.
50+
require.NoError(t, db.Put(ctx, key, value))
51+
52+
// Get the current timestamp to use as a reference.
53+
gr, err := db.Get(ctx, key)
54+
require.NoError(t, err)
55+
currentTS := gr.Value.Timestamp
56+
57+
// Helper function to attempt a delete with a specific origin timestamp
58+
attemptDelete := func(ts hlc.Timestamp) error {
59+
b := &kv.Batch{}
60+
b.Del(key)
61+
b.Header.WriteOptions = &kvpb.WriteOptions{
62+
OriginTimestamp: ts,
63+
}
64+
return db.Run(ctx, b)
65+
}
66+
67+
t.Run("OlderTimestamp", func(t *testing.T) {
68+
// Delete with an origin timestamp older than the mvcc timestamp should fail
69+
err := attemptDelete(currentTS.Prev())
70+
condErr := unwrapConditionFailedError(t, err)
71+
require.False(t, condErr.OriginTimestampOlderThan.IsEmpty(), "expected OriginTimestampOlderThan to be set")
72+
require.Equal(t, currentTS, condErr.OriginTimestampOlderThan, "expected OriginTimestampOlderThan to match current timestamp")
73+
74+
// Verify the original value still exists
75+
gr, err := db.Get(ctx, key)
76+
require.NoError(t, err)
77+
require.NotNil(t, gr.Value, "expected value to exist")
78+
require.Equal(t, value, gr.ValueBytes(), "value should match what was put")
79+
})
80+
81+
t.Run("NewerTimestamp", func(t *testing.T) {
82+
// Delete with an origin timestamp newer than the mvcc timestamp should succeed
83+
err := attemptDelete(currentTS.Next())
84+
require.NoError(t, err)
85+
86+
// Verify the value was deleted
87+
gr, err := db.Get(ctx, key)
88+
require.NoError(t, err)
89+
require.Nil(t, gr.Value, "expected value to be deleted")
90+
})
91+
}
92+
93+
func TestDelRangeWithOriginTimestamp(t *testing.T) {
94+
defer leaktest.AfterTest(t)()
95+
defer log.Scope(t).Close(t)
96+
97+
s, _, db := serverutils.StartServer(t, base.TestServerArgs{})
98+
defer s.Stopper().Stop(context.Background())
99+
100+
ctx := context.Background()
101+
102+
// Setup test data once for all subtests
103+
keys := []roachpb.Key{
104+
roachpb.Key("lww-delrange-a"),
105+
roachpb.Key("lww-delrange-b"),
106+
roachpb.Key("lww-delrange-c"),
107+
}
108+
startKey := keys[0]
109+
endKey := keys[len(keys)-1].Next()
110+
value := []byte("value")
111+
112+
// Put values for all keys
113+
for _, key := range keys {
114+
require.NoError(t, db.Put(ctx, key, value))
115+
}
116+
117+
// Get the current timestamp to use as a reference
118+
gr, err := db.Get(ctx, keys[2])
119+
require.NoError(t, err)
120+
currentTS := gr.Value.Timestamp
121+
122+
t.Run("OlderTimestamp", func(t *testing.T) {
123+
// DelRange with an origin timestamp older than the mvcc timestamp should fail
124+
b := &kv.Batch{}
125+
b.DelRange(startKey, endKey, false)
126+
b.Header.WriteOptions = &kvpb.WriteOptions{
127+
OriginTimestamp: currentTS.Prev(),
128+
}
129+
err := db.Run(ctx, b)
130+
condErr := unwrapConditionFailedError(t, err)
131+
require.False(t, condErr.OriginTimestampOlderThan.IsEmpty(), "expected OriginTimestampOlderThan to be set")
132+
require.Equal(t, currentTS, condErr.OriginTimestampOlderThan, "expected OriginTimestampOlderThan to match current timestamp")
133+
134+
// Verify the original values still exist
135+
for _, key := range keys {
136+
gr, err := db.Get(ctx, key)
137+
require.NoError(t, err)
138+
require.NotNil(t, gr.Value, "expected value to exist at %s", key)
139+
require.Equal(t, value, gr.ValueBytes(), "value should match what was put")
140+
}
141+
})
142+
143+
t.Run("NewerTimestamp", func(t *testing.T) {
144+
// Delete the range with the newer timestamp
145+
b := &kv.Batch{}
146+
b.DelRange(startKey, endKey, false)
147+
b.Header.WriteOptions = &kvpb.WriteOptions{
148+
OriginTimestamp: currentTS.Next(),
149+
}
150+
err := db.Run(ctx, b)
151+
require.NoError(t, err, "unexpected error when deleting range with newer origin timestamp")
152+
153+
// Verify all values were deleted
154+
for _, key := range keys {
155+
gr, err := db.Get(ctx, key)
156+
require.NoError(t, err)
157+
require.Nil(t, gr.Value, "expected value to be deleted at %s", key)
158+
}
159+
})
160+
}
161+
162+
func TestPutWithOriginTimestamp(t *testing.T) {
163+
defer leaktest.AfterTest(t)()
164+
defer log.Scope(t).Close(t)
165+
166+
s, _, db := serverutils.StartServer(t, base.TestServerArgs{})
167+
defer s.Stopper().Stop(context.Background())
168+
169+
ctx := context.Background()
170+
key := roachpb.Key("lww-put-test-key")
171+
value1 := []byte("value1")
172+
value2 := []byte("value2")
173+
174+
// Put a value with the current timestamp.
175+
require.NoError(t, db.Put(ctx, key, value1))
176+
177+
// Get the current timestamp to use as a reference.
178+
gr, err := db.Get(ctx, key)
179+
require.NoError(t, err)
180+
currentTS := gr.Value.Timestamp
181+
182+
// Helper function to attempt a put with a specific origin timestamp
183+
attemptPut := func(ts hlc.Timestamp, value []byte) error {
184+
b := &kv.Batch{}
185+
b.Put(key, value)
186+
b.Header.WriteOptions = &kvpb.WriteOptions{
187+
OriginTimestamp: ts,
188+
}
189+
return db.Run(ctx, b)
190+
}
191+
192+
t.Run("OlderTimestamp", func(t *testing.T) {
193+
// Put with an origin timestamp older than the mvcc timestamp should fail
194+
olderTS := currentTS.Prev()
195+
err := attemptPut(olderTS, value2)
196+
condErr := unwrapConditionFailedError(t, err)
197+
require.False(t, condErr.OriginTimestampOlderThan.IsEmpty(), "expected OriginTimestampOlderThan to be set")
198+
require.Equal(t, currentTS, condErr.OriginTimestampOlderThan, "expected OriginTimestampOlderThan to match current timestamp")
199+
200+
// Verify the original value still exists
201+
gr, err := db.Get(ctx, key)
202+
require.NoError(t, err)
203+
require.NotNil(t, gr.Value, "expected value to exist")
204+
require.Equal(t, value1, gr.ValueBytes(), "value should match expected")
205+
})
206+
207+
t.Run("NewerTimestamp", func(t *testing.T) {
208+
// Put with an origin timestamp newer than the mvcc timestamp should succeed
209+
newerTS := currentTS.Next()
210+
err := attemptPut(newerTS, value2)
211+
require.NoError(t, err, "unexpected error when putting with newer origin timestamp")
212+
213+
// Verify the value was updated
214+
gr, err := db.Get(ctx, key)
215+
require.NoError(t, err)
216+
require.NotNil(t, gr.Value, "expected value to exist")
217+
require.Equal(t, value2, gr.ValueBytes(), "value should match expected")
218+
219+
// Get the new timestamp
220+
gr, err = db.Get(ctx, key)
221+
require.NoError(t, err)
222+
updatedTS := gr.Value.Timestamp
223+
224+
// Try to put with an even newer timestamp
225+
newestValue := []byte("newest-value")
226+
newestTS := updatedTS.Next()
227+
err = attemptPut(newestTS, newestValue)
228+
require.NoError(t, err, "unexpected error when putting with newest origin timestamp")
229+
230+
// Verify the newest value was written
231+
gr, err = db.Get(ctx, key)
232+
require.NoError(t, err)
233+
require.NotNil(t, gr.Value, "expected value to exist")
234+
require.Equal(t, newestValue, gr.ValueBytes(), "value should match new value")
235+
})
236+
}
237+
238+
func TestCPutWithOriginTimestamp(t *testing.T) {
239+
defer leaktest.AfterTest(t)()
240+
defer log.Scope(t).Close(t)
241+
242+
s, _, db := serverutils.StartServer(t, base.TestServerArgs{})
243+
defer s.Stopper().Stop(context.Background())
244+
245+
ctx := context.Background()
246+
key := roachpb.Key("lww-cput-test-key")
247+
initialValue := []byte("initial")
248+
newValue := []byte("new-value")
249+
wrongExpectedValue := []byte("wrong-expected")
250+
251+
// Put a value with the current timestamp.
252+
require.NoError(t, db.Put(ctx, key, initialValue))
253+
254+
// Get the current timestamp to use as a reference.
255+
gr, err := db.Get(ctx, key)
256+
require.NoError(t, err)
257+
currentTS := gr.Value.Timestamp
258+
259+
// Helper function to attempt a conditional put with a specific origin timestamp
260+
attemptCPut := func(ts hlc.Timestamp, expectedValue, newValue []byte) error {
261+
b := &kv.Batch{}
262+
b.CPut(key, newValue, expectedValue)
263+
b.Header.WriteOptions = &kvpb.WriteOptions{
264+
OriginTimestamp: ts,
265+
}
266+
return db.Run(ctx, b)
267+
}
268+
269+
t.Run("ConditionFailsAndLWWFails", func(t *testing.T) {
270+
// Both the condition fails (wrong expected value) and the origin timestamp is older
271+
olderTS := currentTS.Prev()
272+
err := attemptCPut(olderTS, wrongExpectedValue, newValue)
273+
condErr := unwrapConditionFailedError(t, err)
274+
275+
// The error should indicate the origin timestamp is older
276+
require.False(t, condErr.OriginTimestampOlderThan.IsEmpty(), "expected OriginTimestampOlderThan to be set")
277+
require.Equal(t, currentTS, condErr.OriginTimestampOlderThan, "expected OriginTimestampOlderThan to match current timestamp")
278+
279+
// Verify the original value still exists
280+
gr, err := db.Get(ctx, key)
281+
require.NoError(t, err)
282+
require.NotNil(t, gr.Value, "expected value to exist")
283+
require.Equal(t, initialValue, gr.ValueBytes(), "value should match initial value")
284+
})
285+
286+
t.Run("ConditionPassesButLWWFails", func(t *testing.T) {
287+
// The condition passes (correct expected value) but the origin timestamp is older
288+
olderTS := currentTS.Prev()
289+
err := attemptCPut(olderTS, initialValue, newValue)
290+
condErr := unwrapConditionFailedError(t, err)
291+
292+
// The error should indicate the origin timestamp is older
293+
require.False(t, condErr.OriginTimestampOlderThan.IsEmpty(), "expected OriginTimestampOlderThan to be set")
294+
require.Equal(t, currentTS, condErr.OriginTimestampOlderThan, "expected OriginTimestampOlderThan to match current timestamp")
295+
296+
// Verify the original value still exists
297+
gr, err := db.Get(ctx, key)
298+
require.NoError(t, err)
299+
require.NotNil(t, gr.Value, "expected value to exist")
300+
require.Equal(t, initialValue, gr.ValueBytes(), "value should match initial value")
301+
})
302+
303+
t.Run("ConditionFailsButLWWPasses", func(t *testing.T) {
304+
// The condition fails (wrong expected value) but the origin timestamp is newer
305+
newerTS := currentTS.Next()
306+
err := attemptCPut(newerTS, wrongExpectedValue, newValue)
307+
condErr := unwrapConditionFailedError(t, err)
308+
309+
// The error should indicate the condition failed but not that the timestamp is older
310+
require.True(t, condErr.OriginTimestampOlderThan.IsEmpty(), "expected OriginTimestampOlderThan to be empty")
311+
require.NotNil(t, condErr.ActualValue, "expected ActualValue to be set")
312+
313+
// Verify the original value still exists
314+
gr, err := db.Get(ctx, key)
315+
require.NoError(t, err)
316+
require.NotNil(t, gr.Value, "expected value to exist")
317+
require.Equal(t, initialValue, gr.ValueBytes(), "value should match initial value")
318+
})
319+
320+
t.Run("BothConditionAndLWWPass", func(t *testing.T) {
321+
// Create a new key for this test to avoid interference from previous test cases
322+
successKey := roachpb.Key("lww-cput-success-key")
323+
324+
// Put an initial value
325+
require.NoError(t, db.Put(ctx, successKey, initialValue))
326+
327+
// Get the current value and timestamp
328+
gr, err := db.Get(ctx, successKey)
329+
require.NoError(t, err)
330+
require.NotNil(t, gr.Value)
331+
currentValue := gr.Value.TagAndDataBytes()
332+
currentTS := gr.Value.Timestamp
333+
334+
// Both the condition passes and the origin timestamp is newer
335+
newerTS := currentTS.Next()
336+
337+
// Use a helper function to perform the CPut with the current value
338+
cputErr := func() error {
339+
b := &kv.Batch{}
340+
b.CPut(successKey, newValue, currentValue)
341+
b.Header.WriteOptions = &kvpb.WriteOptions{
342+
OriginTimestamp: newerTS,
343+
}
344+
return db.Run(ctx, b)
345+
}()
346+
347+
require.NoError(t, cputErr, "unexpected error when both condition and timestamp are valid")
348+
349+
// Verify the value was updated
350+
gr, err = db.Get(ctx, successKey)
351+
require.NoError(t, err)
352+
require.NotNil(t, gr.Value, "expected value to exist")
353+
require.Equal(t, newValue, gr.ValueBytes(), "value should match new value")
354+
})
355+
}

0 commit comments

Comments
 (0)