diff --git a/pkg/crosscluster/logical/lww_row_processor.go b/pkg/crosscluster/logical/lww_row_processor.go index d9818d3ee050..468fa3bcf065 100644 --- a/pkg/crosscluster/logical/lww_row_processor.go +++ b/pkg/crosscluster/logical/lww_row_processor.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" @@ -80,6 +81,15 @@ type querier interface { RequiresParsedBeforeRow(catid.DescID) bool } +// isLwwLoser returns true if the error is a ConditionFailedError with an +// OriginTimestampOlderThan set. +func isLwwLoser(err error) bool { + if condErr := (*kvpb.ConditionFailedError)(nil); errors.As(err, &condErr) { + return condErr.OriginTimestampOlderThan.IsSet() + } + return false +} + type queryBuilder struct { // stmts are parsed SQL statements. They should have the same number // of inputs. @@ -592,6 +602,9 @@ func (lww *lwwQuerier) InsertRow( sess.QualityOfService = nil } if _, err = ie.ExecParsed(ctx, replicatedOptimisticInsertOpName, kvTxn, sess, stmt, datums...); err != nil { + if isLwwLoser(err) { + return batchStats{}, nil + } // If the optimistic insert failed with unique violation, we have to // fall back to the pessimistic path. If we got a different error, // then we bail completely. @@ -615,7 +628,11 @@ func (lww *lwwQuerier) InsertRow( sess.QualityOfService = nil } sess.OriginTimestampForLogicalDataReplication = row.MvccTimestamp - if _, err = ie.ExecParsed(ctx, replicatedInsertOpName, kvTxn, sess, stmt, datums...); err != nil { + _, err = ie.ExecParsed(ctx, replicatedInsertOpName, kvTxn, sess, stmt, datums...) + if isLwwLoser(err) { + return batchStats{}, nil + } + if err != nil { log.Warningf(ctx, "replicated insert failed (query: %s): %s", stmt.SQL, err.Error()) return batchStats{}, err } diff --git a/pkg/crosscluster/logical/lww_row_processor_test.go b/pkg/crosscluster/logical/lww_row_processor_test.go index e9d6af36053f..7de87aac296f 100644 --- a/pkg/crosscluster/logical/lww_row_processor_test.go +++ b/pkg/crosscluster/logical/lww_row_processor_test.go @@ -459,9 +459,6 @@ func TestLWWConflictResolution(t *testing.T) { runner.CheckQueryResults(t, fmt.Sprintf("SELECT * from %s", tableNameDst), expectedRows) }) t.Run("cross-cluster-local-delete", func(t *testing.T) { - if !useKVProc { - skip.IgnoreLint(t, "local delete ordering is not handled correctly by the SQL processor") - } tableNameDst, rp, encoder := setup(t, useKVProc) runner.Exec(t, fmt.Sprintf("INSERT INTO %s VALUES ($1, $2)", tableNameDst), row1...) @@ -507,9 +504,6 @@ func TestLWWConflictResolution(t *testing.T) { runner.CheckQueryResults(t, fmt.Sprintf("SELECT * from %s", tableNameDst), expectedRows) }) t.Run("remote-delete-after-local-delete", func(t *testing.T) { - if !useKVProc { - skip.IgnoreLint(t, "local delete ordering is not handled correctly by the SQL processor") - } tableNameDst, rp, encoder := setup(t, useKVProc) runner.Exec(t, fmt.Sprintf("INSERT INTO %s VALUES ($1, $2)", tableNameDst), row1...) diff --git a/pkg/sql/row/errors.go b/pkg/sql/row/errors.go index a7e856303a32..db584232c547 100644 --- a/pkg/sql/row/errors.go +++ b/pkg/sql/row/errors.go @@ -41,6 +41,11 @@ func ConvertBatchError(ctx context.Context, tableDesc catalog.TableDescriptor, b ) case *kvpb.ConditionFailedError: + if !v.OriginTimestampOlderThan.IsEmpty() { + // NOTE: we return the go error here because this error should never be + // communicated to pgwire. It's exposed for the LDR writer. + return origPErr.GoError() + } if origPErr.Index == nil { break }