Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

crosscluster: handle lww condition failures #142368

Merged
merged 1 commit into from
Mar 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion pkg/crosscluster/logical/lww_row_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand Down Expand Up @@ -80,6 +81,15 @@ type querier interface {
RequiresParsedBeforeRow(catid.DescID) bool
}

// isLwwLoser returns true if the error is a ConditionFailedError with an
// OriginTimestampOlderThan set.
func isLwwLoser(err error) bool {
if condErr := (*kvpb.ConditionFailedError)(nil); errors.As(err, &condErr) {
return condErr.OriginTimestampOlderThan.IsSet()
}
return false
}

type queryBuilder struct {
// stmts are parsed SQL statements. They should have the same number
// of inputs.
Expand Down Expand Up @@ -592,6 +602,9 @@ func (lww *lwwQuerier) InsertRow(
sess.QualityOfService = nil
}
if _, err = ie.ExecParsed(ctx, replicatedOptimisticInsertOpName, kvTxn, sess, stmt, datums...); err != nil {
if isLwwLoser(err) {
return batchStats{}, nil
}
// If the optimistic insert failed with unique violation, we have to
// fall back to the pessimistic path. If we got a different error,
// then we bail completely.
Expand All @@ -615,7 +628,11 @@ func (lww *lwwQuerier) InsertRow(
sess.QualityOfService = nil
}
sess.OriginTimestampForLogicalDataReplication = row.MvccTimestamp
if _, err = ie.ExecParsed(ctx, replicatedInsertOpName, kvTxn, sess, stmt, datums...); err != nil {
_, err = ie.ExecParsed(ctx, replicatedInsertOpName, kvTxn, sess, stmt, datums...)
if isLwwLoser(err) {
return batchStats{}, nil
}
if err != nil {
log.Warningf(ctx, "replicated insert failed (query: %s): %s", stmt.SQL, err.Error())
return batchStats{}, err
}
Expand Down
6 changes: 0 additions & 6 deletions pkg/crosscluster/logical/lww_row_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,6 @@ func TestLWWConflictResolution(t *testing.T) {
runner.CheckQueryResults(t, fmt.Sprintf("SELECT * from %s", tableNameDst), expectedRows)
})
t.Run("cross-cluster-local-delete", func(t *testing.T) {
if !useKVProc {
skip.IgnoreLint(t, "local delete ordering is not handled correctly by the SQL processor")
}
Comment on lines -462 to -464
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

tableNameDst, rp, encoder := setup(t, useKVProc)

runner.Exec(t, fmt.Sprintf("INSERT INTO %s VALUES ($1, $2)", tableNameDst), row1...)
Expand Down Expand Up @@ -507,9 +504,6 @@ func TestLWWConflictResolution(t *testing.T) {
runner.CheckQueryResults(t, fmt.Sprintf("SELECT * from %s", tableNameDst), expectedRows)
})
t.Run("remote-delete-after-local-delete", func(t *testing.T) {
if !useKVProc {
skip.IgnoreLint(t, "local delete ordering is not handled correctly by the SQL processor")
}
tableNameDst, rp, encoder := setup(t, useKVProc)

runner.Exec(t, fmt.Sprintf("INSERT INTO %s VALUES ($1, $2)", tableNameDst), row1...)
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/row/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ func ConvertBatchError(ctx context.Context, tableDesc catalog.TableDescriptor, b
)

case *kvpb.ConditionFailedError:
if !v.OriginTimestampOlderThan.IsEmpty() {
// NOTE: we return the go error here because this error should never be
// communicated to pgwire. It's exposed for the LDR writer.
return origPErr.GoError()
}
if origPErr.Index == nil {
break
}
Expand Down