Skip to content

Commit b90af6a

Browse files
committedMar 19, 2025
changefeedccl: ensure that spans planned from cdc queries have end keys
When a cdc query results in a plan that scans a single key, the end key of the span is unset. This causes the feed to fail on startup. We now set span.EndKey = span.Key.Next(). Fixes: cockroachdb#143101 Release note (bug fix): Fixed a bug that caused feeds to fail on startup when scanning a single key.
1 parent cf99d13 commit b90af6a

File tree

2 files changed

+61
-1
lines changed

2 files changed

+61
-1
lines changed
 

‎pkg/ccl/changefeedccl/cdceval/plan.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,15 @@ func SpansForExpression(
132132
return nil, withErrorHint(err, d.FamilyName, d.HasOtherFamilies)
133133
}
134134

135-
return plan.Spans, nil
135+
// Make sure any single-key spans are expanded to have end keys.
136+
spans := plan.Spans
137+
for i := range spans {
138+
if len(spans[i].EndKey) == 0 {
139+
spans[i].EndKey = spans[i].Key.Clone().Next()
140+
}
141+
}
142+
143+
return spans, nil
136144
}
137145

138146
// withErrorHint wraps error with error hints.

‎pkg/ccl/changefeedccl/changefeed_test.go

+52
Original file line numberDiff line numberDiff line change
@@ -9829,3 +9829,55 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
98299829

98309830
cdcTest(t, testFn, feedTestForceSink("kafka"), withTxnRetries)
98319831
}
9832+
9833+
func TestCDCQuerySelectSingleRow(t *testing.T) {
9834+
defer leaktest.AfterTest(t)()
9835+
defer log.Scope(t).Close(t)
9836+
9837+
errCh := make(chan error, 1)
9838+
knobsFn := func(knobs *base.TestingKnobs) {
9839+
if knobs.DistSQL == nil {
9840+
knobs.DistSQL = &execinfra.TestingKnobs{}
9841+
}
9842+
if knobs.DistSQL.(*execinfra.TestingKnobs).Changefeed == nil {
9843+
knobs.DistSQL.(*execinfra.TestingKnobs).Changefeed = &TestingKnobs{}
9844+
}
9845+
cfKnobs := knobs.DistSQL.(*execinfra.TestingKnobs).Changefeed.(*TestingKnobs)
9846+
cfKnobs.HandleDistChangefeedError = func(err error) error {
9847+
errCh <- err
9848+
return err
9849+
}
9850+
}
9851+
9852+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
9853+
db := sqlutils.MakeSQLRunner(s.DB)
9854+
db.Exec(t, `CREATE TABLE foo (key INT PRIMARY KEY);`)
9855+
db.Exec(t, `INSERT INTO foo VALUES (1), (2), (3);`)
9856+
9857+
// initial_scan='only' is not required, but it makes testing this easier.
9858+
foo := feed(t, f, `CREATE CHANGEFEED WITH initial_scan='only' AS SELECT * FROM foo WHERE key = 1`)
9859+
defer closeFeed(t, foo)
9860+
9861+
done := make(chan struct{})
9862+
go func() {
9863+
defer close(done)
9864+
assertPayloads(t, foo, []string{`foo: [1]->{"key": 1}`})
9865+
}()
9866+
9867+
select {
9868+
case err := <-errCh:
9869+
// Ignore any error after the above assertion completed, because
9870+
// it's likely just due to feed shutdown.
9871+
select {
9872+
case <-done:
9873+
default:
9874+
t.Fatalf("unexpected error: %v", err)
9875+
}
9876+
case <-time.After(30 * time.Second):
9877+
t.Fatal("timed out")
9878+
case <-done:
9879+
return
9880+
}
9881+
}
9882+
cdcTest(t, testFn, withKnobsFn(knobsFn))
9883+
}

0 commit comments

Comments
 (0)
Please sign in to comment.