From 886f79c6fc37b520c9545c09211af82cdabc061f Mon Sep 17 00:00:00 2001 From: Miles Frankel Date: Tue, 18 Mar 2025 18:46:03 -0400 Subject: [PATCH] changefeedccl: ensure that spans planned from cdc queries have end keys When a cdc query results in a plan that scans a single key, the end key of the span is unset. This causes the feed to fail on startup. We now set span.EndKey = span.Key.Next(). Fixes: #143101 Release note (bug fix): Fixed a bug that caused feeds to fail on startup when scanning a single key. --- pkg/ccl/changefeedccl/cdceval/plan.go | 10 ++++- pkg/ccl/changefeedccl/changefeed_test.go | 52 ++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/pkg/ccl/changefeedccl/cdceval/plan.go b/pkg/ccl/changefeedccl/cdceval/plan.go index 9c1f37dfb2ec..d589575cfb18 100644 --- a/pkg/ccl/changefeedccl/cdceval/plan.go +++ b/pkg/ccl/changefeedccl/cdceval/plan.go @@ -132,7 +132,15 @@ func SpansForExpression( return nil, withErrorHint(err, d.FamilyName, d.HasOtherFamilies) } - return plan.Spans, nil + // Make sure any single-key spans are expanded to have end keys. + spans := plan.Spans + for i := range spans { + if len(spans[i].EndKey) == 0 { + spans[i].EndKey = spans[i].Key.Clone().Next() + } + } + + return spans, nil } // withErrorHint wraps error with error hints. diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 695e702f1b1d..16a166d9ac80 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -9702,3 +9702,55 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) { cdcTest(t, testFn, feedTestForceSink("kafka"), withTxnRetries) } + +func TestCDCQuerySelectSingleRow(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + errCh := make(chan error, 1) + knobsFn := func(knobs *base.TestingKnobs) { + if knobs.DistSQL == nil { + knobs.DistSQL = &execinfra.TestingKnobs{} + } + if knobs.DistSQL.(*execinfra.TestingKnobs).Changefeed == nil { + knobs.DistSQL.(*execinfra.TestingKnobs).Changefeed = &TestingKnobs{} + } + cfKnobs := knobs.DistSQL.(*execinfra.TestingKnobs).Changefeed.(*TestingKnobs) + cfKnobs.HandleDistChangefeedError = func(err error) error { + errCh <- err + return err + } + } + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + db := sqlutils.MakeSQLRunner(s.DB) + db.Exec(t, `CREATE TABLE foo (key INT PRIMARY KEY);`) + db.Exec(t, `INSERT INTO foo VALUES (1), (2), (3);`) + + // initial_scan='only' is not required, but it makes testing this easier. + foo := feed(t, f, `CREATE CHANGEFEED WITH initial_scan='only' AS SELECT * FROM foo WHERE key = 1`) + defer closeFeed(t, foo) + + done := make(chan struct{}) + go func() { + defer close(done) + assertPayloads(t, foo, []string{`foo: [1]->{"key": 1}`}) + }() + + select { + case err := <-errCh: + // Ignore any error after the above assertion completed, because + // it's likely just due to feed shutdown. + select { + case <-done: + default: + t.Fatalf("unexpected error: %v", err) + } + case <-time.After(30 * time.Second): + t.Fatal("timed out") + case <-done: + return + } + } + cdcTest(t, testFn, withKnobsFn(knobsFn)) +}