From 38435497be09948495cc58a37203ea051e04ae71 Mon Sep 17 00:00:00 2001
From: Miles Frankel <miles.frankel@cockroachlabs.com>
Date: Tue, 18 Mar 2025 18:46:03 -0400
Subject: [PATCH] changefeedccl: ensure that spans planned from cdc queries
 have end keys

When a cdc query results in a plan that scans a
single key, the end key of the span is unset. This
causes the feed to fail on startup. We now set
span.EndKey = span.Key.Next().

Fixes: #143101

Release note (bug fix): Fixed a bug that caused feeds to fail on startup when
scanning a single key.
---
 pkg/ccl/changefeedccl/cdceval/plan.go    | 10 ++++-
 pkg/ccl/changefeedccl/changefeed_test.go | 52 ++++++++++++++++++++++++
 2 files changed, 61 insertions(+), 1 deletion(-)

diff --git a/pkg/ccl/changefeedccl/cdceval/plan.go b/pkg/ccl/changefeedccl/cdceval/plan.go
index 9c1f37dfb2ec..d589575cfb18 100644
--- a/pkg/ccl/changefeedccl/cdceval/plan.go
+++ b/pkg/ccl/changefeedccl/cdceval/plan.go
@@ -132,7 +132,15 @@ func SpansForExpression(
 		return nil, withErrorHint(err, d.FamilyName, d.HasOtherFamilies)
 	}
 
-	return plan.Spans, nil
+	// Make sure any single-key spans are expanded to have end keys.
+	spans := plan.Spans
+	for i := range spans {
+		if len(spans[i].EndKey) == 0 {
+			spans[i].EndKey = spans[i].Key.Clone().Next()
+		}
+	}
+
+	return spans, nil
 }
 
 // withErrorHint wraps error with error hints.
diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go
index 30ce0fd2e19a..92458caf3768 100644
--- a/pkg/ccl/changefeedccl/changefeed_test.go
+++ b/pkg/ccl/changefeedccl/changefeed_test.go
@@ -10012,3 +10012,55 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
 
 	cdcTest(t, testFn, feedTestForceSink("kafka"), withTxnRetries)
 }
+
+func TestCDCQuerySelectSingleRow(t *testing.T) {
+	defer leaktest.AfterTest(t)()
+	defer log.Scope(t).Close(t)
+
+	errCh := make(chan error, 1)
+	knobsFn := func(knobs *base.TestingKnobs) {
+		if knobs.DistSQL == nil {
+			knobs.DistSQL = &execinfra.TestingKnobs{}
+		}
+		if knobs.DistSQL.(*execinfra.TestingKnobs).Changefeed == nil {
+			knobs.DistSQL.(*execinfra.TestingKnobs).Changefeed = &TestingKnobs{}
+		}
+		cfKnobs := knobs.DistSQL.(*execinfra.TestingKnobs).Changefeed.(*TestingKnobs)
+		cfKnobs.HandleDistChangefeedError = func(err error) error {
+			errCh <- err
+			return err
+		}
+	}
+
+	testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
+		db := sqlutils.MakeSQLRunner(s.DB)
+		db.Exec(t, `CREATE TABLE foo (key INT PRIMARY KEY);`)
+		db.Exec(t, `INSERT INTO foo VALUES (1), (2), (3);`)
+
+		// initial_scan='only' is not required, but it makes testing this easier.
+		foo := feed(t, f, `CREATE CHANGEFEED WITH initial_scan='only' AS SELECT * FROM foo WHERE key = 1`)
+		defer closeFeed(t, foo)
+
+		done := make(chan struct{})
+		go func() {
+			defer close(done)
+			assertPayloads(t, foo, []string{`foo: [1]->{"key": 1}`})
+		}()
+
+		select {
+		case err := <-errCh:
+			// Ignore any error after the above assertion completed, because
+			// it's likely just due to feed shutdown.
+			select {
+			case <-done:
+			default:
+				t.Fatalf("unexpected error: %v", err)
+			}
+		case <-time.After(30 * time.Second):
+			t.Fatal("timed out")
+		case <-done:
+			return
+		}
+	}
+	cdcTest(t, testFn, withKnobsFn(knobsFn))
+}