Skip to content

Commit 51a8b45

Browse files
committedMar 18, 2025
changefeedccl: ensure that spans planned from cdc queries have end keys
When a cdc query results in a plan that scans a single key, the end key of the span is unset. This causes the feed to fail on startup. We now set span.EndKey = span.Key.Next(). Fixes: cockroachdb#143101 Release note (bug fix): Fixed a bug that caused feeds to fail on startup when scanning a single key.
1 parent cb94136 commit 51a8b45

File tree

2 files changed

+57
-1
lines changed

2 files changed

+57
-1
lines changed
 

‎pkg/ccl/changefeedccl/cdceval/plan.go

+14-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package cdceval
77

88
import (
99
"context"
10+
"fmt"
1011
"strings"
1112

1213
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
@@ -132,7 +133,19 @@ func SpansForExpression(
132133
return nil, withErrorHint(err, d.FamilyName, d.HasOtherFamilies)
133134
}
134135

135-
return plan.Spans, nil
136+
fmt.Printf("spans for expression: %s\n", plan.Spans)
137+
138+
spans := plan.Spans
139+
// Make sure any single-key spans are expanded to have end keys.
140+
for i := range spans {
141+
if len(spans[i].EndKey) == 0 {
142+
spans[i].EndKey = spans[i].Key.Clone().Next()
143+
}
144+
}
145+
146+
fmt.Printf("new spans for expression: %s\n", spans)
147+
148+
return spans, nil
136149
}
137150

138151
// withErrorHint wraps error with error hints.

‎pkg/ccl/changefeedccl/changefeed_test.go

+43
Original file line numberDiff line numberDiff line change
@@ -10794,6 +10794,49 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
1079410794
cdcTest(t, testFn, feedTestForceSink("kafka"), withTxnRetries)
1079510795
}
1079610796

10797+
func TestCDCQuerySelectSingleRow(t *testing.T) {
10798+
defer leaktest.AfterTest(t)()
10799+
defer log.Scope(t).Close(t)
10800+
10801+
errCh := make(chan error, 1)
10802+
knobsFn := func(knobs *base.TestingKnobs) {
10803+
// TODO: this error should probably be terminal too.
10804+
if knobs.DistSQL == nil {
10805+
knobs.DistSQL = &execinfra.TestingKnobs{}
10806+
}
10807+
if knobs.DistSQL.(*execinfra.TestingKnobs).Changefeed == nil {
10808+
knobs.DistSQL.(*execinfra.TestingKnobs).Changefeed = &TestingKnobs{}
10809+
}
10810+
cfKnobs := knobs.DistSQL.(*execinfra.TestingKnobs).Changefeed.(*TestingKnobs)
10811+
cfKnobs.HandleDistChangefeedError = func(err error) error {
10812+
errCh <- err
10813+
return err
10814+
}
10815+
}
10816+
10817+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
10818+
db := sqlutils.MakeSQLRunner(s.DB)
10819+
db.Exec(t, `CREATE TABLE foo (key INT PRIMARY KEY);`)
10820+
10821+
execed := make(chan struct{})
10822+
go func() {
10823+
defer close(execed)
10824+
// initial_scan='only' is not required, but it makes testing this easier.
10825+
db.Exec(t, "CREATE CHANGEFEED WITH initial_scan='only' AS SELECT * FROM foo WHERE key = 1")
10826+
}()
10827+
10828+
select {
10829+
case err := <-errCh:
10830+
t.Fatalf("unexpected error: %v", err)
10831+
case <-time.After(10 * time.Second):
10832+
t.Fatal("timed out")
10833+
case <-execed:
10834+
}
10835+
}
10836+
// This fails with all sinks, but it's much easier to detect failures with sinkless, because the statement itself fails.
10837+
cdcTest(t, testFn, feedTestForceSink("sinkless"), withKnobsFn(knobsFn))
10838+
}
10839+
1079710840
func assertReasonableMVCCTimestamp(t *testing.T, ts string) {
1079810841
epochNanos := parseTimeToHLC(t, ts).WallTime
1079910842
now := timeutil.Now()

0 commit comments

Comments
 (0)