Skip to content

Commit 434bfc7

Browse files
committed
crosscluster: use new origin header values
This change aims to correct the SQL writer's LWW semantics, enabling the retirement of the KV writer. The PR updates the implementation of put, cput, delete, and range delete to return an error if a replicated write with an origin timestamp would overwrite a value with a more recent origin or MVCC timestamp. As part of this update, tie-handling logic has been removed since the current KV writer does not use it, and a simple "bool wins ties" hint is insufficient for proper LWW implementation. No explicit version checks are included. In a mixed-version environment, a SQL writer running 25.1 and writing to a 25.2 KV server will encounter "condition failed" errors and retry the write until it is upgraded or the LDR PTS ages out. This is acceptable, as LDR remains in preview and the SQL writer is neither the default nor the recommended writer. Release note: none Epic: CRDB-48647
1 parent f0a4f11 commit 434bfc7

19 files changed

+700
-120
lines changed

pkg/crosscluster/logical/lww_row_processor.go

+18-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1616
"github.com/cockroachdb/cockroach/pkg/keys"
1717
"github.com/cockroachdb/cockroach/pkg/kv"
18+
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1819
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
1920
"github.com/cockroachdb/cockroach/pkg/roachpb"
2021
"github.com/cockroachdb/cockroach/pkg/settings"
@@ -80,6 +81,15 @@ type querier interface {
8081
RequiresParsedBeforeRow(catid.DescID) bool
8182
}
8283

84+
// isLwwLoser returns true if the error is a ConditionFailedError with an
85+
// OriginTimestampOlderThan set.
86+
func isLwwLoser(err error) bool {
87+
if condErr := (*kvpb.ConditionFailedError)(nil); errors.As(err, &condErr) {
88+
return condErr.OriginTimestampOlderThan.IsSet()
89+
}
90+
return false
91+
}
92+
8393
type queryBuilder struct {
8494
// stmts are parsed SQL statements. They should have the same number
8595
// of inputs.
@@ -592,6 +602,9 @@ func (lww *lwwQuerier) InsertRow(
592602
sess.QualityOfService = nil
593603
}
594604
if _, err = ie.ExecParsed(ctx, replicatedOptimisticInsertOpName, kvTxn, sess, stmt, datums...); err != nil {
605+
if isLwwLoser(err) {
606+
return batchStats{}, nil
607+
}
595608
// If the optimistic insert failed with unique violation, we have to
596609
// fall back to the pessimistic path. If we got a different error,
597610
// then we bail completely.
@@ -615,7 +628,11 @@ func (lww *lwwQuerier) InsertRow(
615628
sess.QualityOfService = nil
616629
}
617630
sess.OriginTimestampForLogicalDataReplication = row.MvccTimestamp
618-
if _, err = ie.ExecParsed(ctx, replicatedInsertOpName, kvTxn, sess, stmt, datums...); err != nil {
631+
_, err = ie.ExecParsed(ctx, replicatedInsertOpName, kvTxn, sess, stmt, datums...)
632+
if isLwwLoser(err) {
633+
return batchStats{}, nil
634+
}
635+
if err != nil {
619636
log.Warningf(ctx, "replicated insert failed (query: %s): %s", stmt.SQL, err.Error())
620637
return batchStats{}, err
621638
}

pkg/crosscluster/logical/lww_row_processor_test.go

-6
Original file line numberDiff line numberDiff line change
@@ -459,9 +459,6 @@ func TestLWWConflictResolution(t *testing.T) {
459459
runner.CheckQueryResults(t, fmt.Sprintf("SELECT * from %s", tableNameDst), expectedRows)
460460
})
461461
t.Run("cross-cluster-local-delete", func(t *testing.T) {
462-
if !useKVProc {
463-
skip.IgnoreLint(t, "local delete ordering is not handled correctly by the SQL processor")
464-
}
465462
tableNameDst, rp, encoder := setup(t, useKVProc)
466463

467464
runner.Exec(t, fmt.Sprintf("INSERT INTO %s VALUES ($1, $2)", tableNameDst), row1...)
@@ -507,9 +504,6 @@ func TestLWWConflictResolution(t *testing.T) {
507504
runner.CheckQueryResults(t, fmt.Sprintf("SELECT * from %s", tableNameDst), expectedRows)
508505
})
509506
t.Run("remote-delete-after-local-delete", func(t *testing.T) {
510-
if !useKVProc {
511-
skip.IgnoreLint(t, "local delete ordering is not handled correctly by the SQL processor")
512-
}
513507
tableNameDst, rp, encoder := setup(t, useKVProc)
514508

515509
runner.Exec(t, fmt.Sprintf("INSERT INTO %s VALUES ($1, $2)", tableNameDst), row1...)

pkg/kv/BUILD.bazel

+1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ go_test(
5151
"admission_test.go",
5252
"client_test.go",
5353
"db_test.go",
54+
"kv_client_lww_test.go",
5455
"main_test.go",
5556
"range_lookup_test.go",
5657
"txn_external_test.go",

pkg/kv/batch.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -578,9 +578,7 @@ func (b *Batch) CPutAllowingIfNotExists(key, value interface{}, expValue []byte)
578578
// This is used by logical data replication and other uses of this API
579579
// are discouraged since the semantics are subject to change as
580580
// required by that feature.
581-
func (b *Batch) CPutWithOriginTimestamp(
582-
key, value interface{}, expValue []byte, ts hlc.Timestamp, shouldWinTie bool,
583-
) {
581+
func (b *Batch) CPutWithOriginTimestamp(key, value interface{}, expValue []byte, ts hlc.Timestamp) {
584582
k, err := marshalKey(key)
585583
if err != nil {
586584
b.initResult(0, 1, notRaw, err)
@@ -594,7 +592,6 @@ func (b *Batch) CPutWithOriginTimestamp(
594592
}
595593
r := kvpb.NewConditionalPut(k, v, expValue, false)
596594
r.(*kvpb.ConditionalPutRequest).OriginTimestamp = ts
597-
r.(*kvpb.ConditionalPutRequest).ShouldWinOriginTimestampTie = shouldWinTie
598595
b.appendReqs(r)
599596
b.approxMutationReqBytes += len(k) + len(v.RawBytes)
600597
b.initResult(1, 1, notRaw, nil)

0 commit comments

Comments
 (0)