Skip to content

Commit f0a4f11

Browse files
craig[bot]shubhamdhamaasg0451arulajmani
committedMar 13, 2025·
141735: sql: enable tenant testing for zone tests r=rafiss a=shubhamdhama `` sql: enable tenant testing for `zone_test.go` `` Updating `TestValidSetShowZones` to use `createTestServerParamsAllowTenants` instead of `createTestServerParams` required some investigation. It turned out simpler than expected. Zone settings for meta ranges aren't supported for secondary tenants, so this change simply exclude them from setup and validating. Informs: #140446 Epic: CRDB-48357 Release note: None `` sql: enable tenant testing for `zone_config_test` `` Since `GetSpanConfigForKey` is only available in system tenant, it is not used when running under secondary tenants. 142684: changefeedccl: optimize enriched source provider r=andyyang890 a=asg0451 Optimize enriched source provider by avoiding unnecessary allocations and json construction. Fixes: #141798 Release note: None 142836: kvserver: deflake TestLeasePreferencesDuringOutage r=kvoli a=arulajmani There was a race here, when heartbeating node liveness epochs, where another node could increment our epoch. This patch retries in such situations. Fixes #142795 Release note: None Co-authored-by: Shubham Dhama <[email protected]> Co-authored-by: Miles Frankel <[email protected]> Co-authored-by: Arul Ajmani <[email protected]>
4 parents 42b4638 + 344c262 + 5015b28 + 4e87632 commit f0a4f11

21 files changed

+465
-192
lines changed
 

‎pkg/ccl/changefeedccl/avro/avro.go

+20-10
Original file line numberDiff line numberDiff line change
@@ -166,17 +166,20 @@ type DataRecord struct {
166166
native map[string]interface{}
167167
}
168168

169+
type functionalNativeFromRowFn func(row cdcevent.Row, dest map[string]any)
170+
169171
type FunctionalRecord struct {
170-
// Use the Record just for schema info
172+
// Use the Record just for schema info.
171173
Record
172174

173-
nativeFromRowFn func(row cdcevent.Row) (map[string]any, error)
175+
// nativeFromRowFn populates the native map from a row.
176+
nativeFromRowFn functionalNativeFromRowFn
177+
// Reuse this map to avoid repeated map allocation when encoding.
178+
native map[string]any
174179
}
175180

176181
func NewFunctionalRecord(
177-
name, namespace string,
178-
fields []*SchemaField,
179-
nativeFromRowFn func(row cdcevent.Row) (map[string]any, error),
182+
name, namespace string, fields []*SchemaField, nativeFromRowFn functionalNativeFromRowFn,
180183
) (*FunctionalRecord, error) {
181184
schema := &FunctionalRecord{
182185
Record: Record{
@@ -186,6 +189,7 @@ func NewFunctionalRecord(
186189
Fields: fields,
187190
},
188191
nativeFromRowFn: nativeFromRowFn,
192+
native: make(map[string]any),
189193
}
190194

191195
schemaJSON, err := json.Marshal(schema)
@@ -199,6 +203,16 @@ func NewFunctionalRecord(
199203
return schema, nil
200204
}
201205

206+
// nativeFromRow populates the native map from a row and returns it. The
207+
// returned map will be reused in subsequent calls.
208+
func (r *FunctionalRecord) nativeFromRow(row cdcevent.Row) map[string]any {
209+
if r.native == nil {
210+
r.native = make(map[string]any, len(r.Fields))
211+
}
212+
r.nativeFromRowFn(row, r.native)
213+
return r.native
214+
}
215+
202216
// Metadata is the `EnvelopeRecord` metadata.
203217
type Metadata map[string]interface{}
204218

@@ -1158,11 +1172,7 @@ func (r *EnvelopeRecord) BinaryFromRow(
11581172
}
11591173

11601174
if r.Opts.SourceField {
1161-
sourceNative, err := r.Source.nativeFromRowFn(recordRow)
1162-
if err != nil {
1163-
return nil, err
1164-
}
1165-
native[`source`] = goavro.Union(unionKey(&r.Source.Record), sourceNative)
1175+
native[`source`] = goavro.Union(unionKey(&r.Source.Record), r.Source.nativeFromRow(recordRow))
11661176
}
11671177
if r.Opts.TsField {
11681178
native[`ts_ns`] = nil

‎pkg/ccl/changefeedccl/changefeed_processors.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -1258,7 +1258,10 @@ func newChangeFrontierProcessor(
12581258

12591259
// This changeFrontier's encoder will only be used for resolved events which
12601260
// never have a source field, so we pass an empty enriched source provider.
1261-
sourceProvider := newEnrichedSourceProvider(encodingOpts, enrichedSourceData{})
1261+
sourceProvider, err := newEnrichedSourceProvider(encodingOpts, enrichedSourceData{})
1262+
if err != nil {
1263+
return nil, err
1264+
}
12621265
if cf.encoder, err = getEncoder(
12631266
ctx, encodingOpts, AllTargets(spec.Feed), spec.Feed.Select != "",
12641267
makeExternalConnectionProvider(ctx, flowCtx.Cfg.DB), sliMetrics,

‎pkg/ccl/changefeedccl/changefeed_stmt.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -604,7 +604,10 @@ func createChangefeedJobRecord(
604604
if err != nil {
605605
return nil, err
606606
}
607-
sourceProvider := newEnrichedSourceProvider(encodingOpts, enrichedSourceData{})
607+
sourceProvider, err := newEnrichedSourceProvider(encodingOpts, enrichedSourceData{})
608+
if err != nil {
609+
return nil, err
610+
}
608611
if _, err := getEncoder(ctx, encodingOpts, AllTargets(details), details.Select != "",
609612
makeExternalConnectionProvider(ctx, p.ExecCfg().InternalDB), nil, sourceProvider); err != nil {
610613
return nil, err

‎pkg/ccl/changefeedccl/changefeed_test.go

+55-29
Original file line numberDiff line numberDiff line change
@@ -3938,7 +3938,8 @@ func TestChangefeedEnriched(t *testing.T) {
39383938
// Create an enriched source provider with no data. The contents of source
39393939
// will be tested in another test, we just want to make sure the structure &
39403940
// schema is right here.
3941-
esp := newEnrichedSourceProvider(changefeedbase.EncodingOptions{}, enrichedSourceData{})
3941+
esp, err := newEnrichedSourceProvider(changefeedbase.EncodingOptions{}, enrichedSourceData{})
3942+
require.NoError(t, err)
39423943
source, err := esp.GetJSON(cdcevent.Row{})
39433944
require.NoError(t, err)
39443945

@@ -4236,7 +4237,7 @@ func TestChangefeedEnrichedSourceWithData(t *testing.T) {
42364237
}
42374238

42384239
for _, testCase := range cases {
4239-
t.Run(testCase.name, func(t *testing.T) {
4240+
testutils.RunTrueAndFalse(t, "mvcc_ts", func(t *testing.T, withMVCCTS bool) {
42404241
clusterName := "clusterName123"
42414242
dbVersion := "v999.0.0"
42424243
defer build.TestingOverrideVersion(dbVersion)()
@@ -4248,7 +4249,11 @@ func TestChangefeedEnrichedSourceWithData(t *testing.T) {
42484249

42494250
sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY)`)
42504251
sqlDB.Exec(t, `INSERT INTO foo values (0)`)
4251-
testFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR foo WITH envelope=enriched, enriched_properties='source', format=%s`, testCase.format))
4252+
stmt := fmt.Sprintf(`CREATE CHANGEFEED FOR foo WITH envelope=enriched, enriched_properties='source', format=%s`, testCase.format)
4253+
if withMVCCTS {
4254+
stmt += ", mvcc_timestamp"
4255+
}
4256+
testFeed := feed(t, f, stmt)
42524257
defer closeFeed(t, testFeed)
42534258

42544259
var jobID int64
@@ -4276,35 +4281,50 @@ func TestChangefeedEnrichedSourceWithData(t *testing.T) {
42764281
sink = sinkTypeSinklessBuffer.String()
42774282
}
42784283

4284+
const dummyMvccTimestamp = "1234567890.0001"
4285+
jobIDStr := strconv.FormatInt(jobID, 10)
4286+
42794287
var assertion string
42804288
if testCase.format == "avro" {
4281-
assertion = fmt.Sprintf(
4282-
`{
4283-
"source": {
4284-
"cluster_id": {"string": "%s"},
4285-
"cluster_name": {"string": "%s"},
4286-
"db_version": {"string": "%s"},
4287-
"job_id": {"string": "%d"},
4288-
"node_id": {"string": "%s"},
4289-
"node_name": {"string": "%s"},
4290-
"changefeed_sink": {"string": "%s"},
4291-
"source_node_locality": {"string": "%s"}
4292-
}
4293-
}`,
4294-
clusterID, clusterName, dbVersion, jobID, nodeID, nodeName, sink, sourceNodeLocality)
4289+
assertionMap := map[string]any{
4290+
"source": map[string]any{
4291+
"cluster_id": map[string]any{"string": clusterID},
4292+
"cluster_name": map[string]any{"string": clusterName},
4293+
"db_version": map[string]any{"string": dbVersion},
4294+
"job_id": map[string]any{"string": jobIDStr},
4295+
// Note that the field is still present in the avro schema, so it appears here as nil.
4296+
"mvcc_timestamp": nil,
4297+
"node_id": map[string]any{"string": nodeID},
4298+
"node_name": map[string]any{"string": nodeName},
4299+
"changefeed_sink": map[string]any{"string": sink},
4300+
"source_node_locality": map[string]any{"string": sourceNodeLocality},
4301+
},
4302+
}
4303+
if withMVCCTS {
4304+
mvccTsMap := actualSource["source"].(map[string]any)["mvcc_timestamp"].(map[string]any)
4305+
assertReasonableMVCCTimestamp(t, mvccTsMap["string"].(string))
4306+
4307+
mvccTsMap["string"] = dummyMvccTimestamp
4308+
assertionMap["source"].(map[string]any)["mvcc_timestamp"] = map[string]any{"string": dummyMvccTimestamp}
4309+
}
4310+
assertion = toJSON(t, assertionMap)
42954311
} else {
4296-
assertion = fmt.Sprintf(
4297-
`{
4298-
"cluster_id": "%s",
4299-
"cluster_name": "%s",
4300-
"db_version": "%s",
4301-
"job_id": "%d",
4302-
"node_id": "%s",
4303-
"node_name": "%s",
4304-
"changefeed_sink": "%s",
4305-
"source_node_locality": "%s"
4306-
}`,
4307-
clusterID, clusterName, dbVersion, jobID, nodeID, nodeName, sink, sourceNodeLocality)
4312+
assertionMap := map[string]any{
4313+
"cluster_id": clusterID,
4314+
"cluster_name": clusterName,
4315+
"db_version": dbVersion,
4316+
"job_id": jobIDStr,
4317+
"node_id": nodeID,
4318+
"node_name": nodeName,
4319+
"changefeed_sink": sink,
4320+
"source_node_locality": sourceNodeLocality,
4321+
}
4322+
if withMVCCTS {
4323+
assertReasonableMVCCTimestamp(t, actualSource["mvcc_timestamp"].(string))
4324+
actualSource["mvcc_timestamp"] = dummyMvccTimestamp
4325+
assertionMap["mvcc_timestamp"] = dummyMvccTimestamp
4326+
}
4327+
assertion = toJSON(t, assertionMap)
43084328
}
43094329

43104330
value, err := reformatJSON(actualSource)
@@ -10748,3 +10768,9 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
1074810768

1074910769
cdcTest(t, testFn, feedTestForceSink("kafka"), withTxnRetries)
1075010770
}
10771+
10772+
func assertReasonableMVCCTimestamp(t *testing.T, ts string) {
10773+
epochNanos := parseTimeToHLC(t, ts).WallTime
10774+
now := timeutil.Now()
10775+
require.GreaterOrEqual(t, epochNanos, now.Add(-1*time.Hour).UnixNano())
10776+
}

‎pkg/ccl/changefeedccl/changefeedbase/options.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -943,21 +943,26 @@ func (e EncodingOptions) Validate() error {
943943
return errors.Errorf(`%s is only usable with %s=%s/%s`, OptHeadersJSONColumnName, OptFormat, OptFormatJSON, OptFormatAvro)
944944
}
945945

946-
if e.Envelope != OptEnvelopeWrapped && e.Format != OptFormatJSON && e.Format != OptFormatParquet {
946+
// TODO(#140110): refactor this logic.
947+
if (e.Envelope != OptEnvelopeWrapped && e.Envelope != OptEnvelopeEnriched) && e.Format != OptFormatJSON && e.Format != OptFormatParquet {
947948
requiresWrap := []struct {
948949
k string
949950
b bool
950951
}{
951952
{OptKeyInValue, e.KeyInValue},
953+
// NOTE: topic_in_value is allowed for envelope=enriched, but has no
954+
// effect. This is because the enriched envelope already has much of
955+
// the information contained in the topic (ie table name), but this
956+
// option is required for the webhook sink so we must permit it.
952957
{OptTopicInValue, e.TopicInValue},
953958
{OptUpdatedTimestamps, e.UpdatedTimestamps},
954959
{OptMVCCTimestamps, e.MVCCTimestamps},
955960
{OptDiff, e.Diff},
956961
}
957962
for _, v := range requiresWrap {
958963
if v.b {
959-
return errors.Errorf(`%s is only usable with %s=%s`,
960-
v.k, OptEnvelope, OptEnvelopeWrapped)
964+
return errors.Errorf(`%[1]s is only usable with %[2]s=%[3]s or %[2]s=%[4]s`,
965+
v.k, OptEnvelope, OptEnvelopeWrapped, OptEnvelopeEnriched)
961966
}
962967
}
963968
}

‎pkg/ccl/changefeedccl/encoder_avro.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -254,8 +254,7 @@ func (e *confluentAvroEncoder) EncodeValue(
254254
recordDataSchema = currentSchema
255255
case changefeedbase.OptEnvelopeEnriched:
256256
afterDataSchema = currentSchema
257-
opts = avro.EnvelopeOpts{AfterField: true, BeforeField: e.beforeField, MVCCTimestampField: e.mvccTimestampField, UpdatedField: e.updatedField,
258-
OpField: true, TsField: true, SourceField: e.sourceField}
257+
opts = avro.EnvelopeOpts{AfterField: true, BeforeField: e.beforeField, OpField: true, TsField: true, SourceField: e.sourceField}
259258
if e.sourceField {
260259
if sourceDataSchema, err = e.enrichedSourceProvider.GetAvro(updatedRow, e.schemaPrefix); err != nil {
261260
return nil, err

‎pkg/ccl/changefeedccl/encoder_json.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -784,7 +784,10 @@ func EncodeAsJSONChangefeedWithFlags(
784784
}
785785
// This encoder is not used in the context of a real changefeed so make an empty
786786
// source provider.
787-
sourceProvider := newEnrichedSourceProvider(opts, enrichedSourceData{})
787+
sourceProvider, err := newEnrichedSourceProvider(opts, enrichedSourceData{})
788+
if err != nil {
789+
return nil, err
790+
}
788791

789792
// If this function ends up needing to be optimized, cache or pool these.
790793
// Nontrivial to do as an encoder generally isn't safe to call on different

‎pkg/ccl/changefeedccl/encoder_json_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func TestJSONEncoderJSONNullAsObject(t *testing.T) {
156156
// NOTE: This is no longer required in go 1.22+, but bazel still requires it. See https://github.com/bazelbuild/rules_go/issues/3924
157157
c := c
158158
t.Run(c.name, func(t *testing.T) {
159-
e, err := getEncoder(ctx, opts, targets, false, nil, nil, getTestingEnrichedSourceProvider(opts))
159+
e, err := getEncoder(ctx, opts, targets, false, nil, nil, getTestingEnrichedSourceProvider(t, opts))
160160
require.NoError(t, err)
161161

162162
row := cdcevent.TestingMakeEventRow(tableDesc, 0, c.row, false)
@@ -201,7 +201,7 @@ func TestJSONEncoderJSONNullAsObjectEdgeCases(t *testing.T) {
201201
rowenc.EncDatum{Datum: tree.NewDJSON(json.NullJSONValue)},
202202
}
203203
e, err := getEncoder(ctx, opts, targets, false, nil, nil,
204-
getTestingEnrichedSourceProvider(opts))
204+
getTestingEnrichedSourceProvider(t, opts))
205205
require.NoError(t, err)
206206

207207
row := cdcevent.TestingMakeEventRow(tableDesc, 0, eRow, false)
@@ -225,7 +225,7 @@ func TestJSONEncoderJSONNullAsObjectEdgeCases(t *testing.T) {
225225
rowenc.EncDatum{Datum: tree.DNull},
226226
}
227227
e, err := getEncoder(ctx, opts, twoJSONsTargets, false, nil, nil,
228-
getTestingEnrichedSourceProvider(opts))
228+
getTestingEnrichedSourceProvider(t, opts))
229229
require.NoError(t, err)
230230

231231
row := cdcevent.TestingMakeEventRow(twoJSONsTableDesc, 0, eRow, false)
@@ -243,7 +243,7 @@ func TestJSONEncoderJSONNullAsObjectEdgeCases(t *testing.T) {
243243
rowenc.EncDatum{Datum: tree.NewDJSON(json.NullJSONValue)},
244244
}
245245
e, err := getEncoder(ctx, opts, twoJSONsTargets, false, nil, nil,
246-
getTestingEnrichedSourceProvider(opts))
246+
getTestingEnrichedSourceProvider(t, opts))
247247
require.NoError(t, err)
248248

249249
row := cdcevent.TestingMakeEventRow(twoJSONsTableDesc, 0, eRow, false)
@@ -265,7 +265,7 @@ func TestJSONEncoderJSONNullAsObjectEdgeCases(t *testing.T) {
265265
rowenc.EncDatum{Datum: tree.DNull},
266266
}
267267
e, err := getEncoder(ctx, disabledOpts, twoJSONsTargets, false, nil, nil,
268-
getTestingEnrichedSourceProvider(disabledOpts))
268+
getTestingEnrichedSourceProvider(t, disabledOpts))
269269
require.NoError(t, err)
270270

271271
row := cdcevent.TestingMakeEventRow(twoJSONsTableDesc, 0, eRow, false)
@@ -287,7 +287,7 @@ func TestJSONEncoderJSONNullAsObjectEdgeCases(t *testing.T) {
287287
rowenc.EncDatum{Datum: tree.NewDJSON(obj)},
288288
}
289289
e, err := getEncoder(ctx, opts, twoJSONsTargets, false, nil, nil,
290-
getTestingEnrichedSourceProvider(opts))
290+
getTestingEnrichedSourceProvider(t, opts))
291291
require.NoError(t, err)
292292

293293
row := cdcevent.TestingMakeEventRow(twoJSONsTableDesc, 0, eRow, false)

‎pkg/ccl/changefeedccl/encoder_test.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -142,13 +142,13 @@ func TestEncoders(t *testing.T) {
142142
resolved: `{"resolved":{"string":"1.0000000002"}}`,
143143
},
144144
`format=avro,envelope=key_only,updated`: {
145-
err: `updated is only usable with envelope=wrapped`,
145+
err: `updated is only usable with envelope=wrapped or envelope=enriched`,
146146
},
147147
`format=avro,envelope=key_only,diff`: {
148-
err: `diff is only usable with envelope=wrapped`,
148+
err: `diff is only usable with envelope=wrapped or envelope=enriched`,
149149
},
150150
`format=avro,envelope=key_only,updated,diff`: {
151-
err: `updated is only usable with envelope=wrapped`,
151+
err: `updated is only usable with envelope=wrapped or envelope=enriched`,
152152
},
153153
`format=avro,envelope=row`: {
154154
err: `envelope=row is not supported with format=avro`,
@@ -241,7 +241,7 @@ func TestEncoders(t *testing.T) {
241241
return
242242
}
243243
require.NoError(t, o.Validate())
244-
e, err := getEncoder(context.Background(), o, targets, false, nil, nil, getTestingEnrichedSourceProvider(o))
244+
e, err := getEncoder(context.Background(), o, targets, false, nil, nil, getTestingEnrichedSourceProvider(t, o))
245245
require.NoError(t, err)
246246

247247
rowInsert := cdcevent.TestingMakeEventRow(tableDesc, 0, row, false)
@@ -387,7 +387,7 @@ func TestAvroEncoderWithTLS(t *testing.T) {
387387
StatementTimeName: changefeedbase.StatementTimeName(tableDesc.GetName()),
388388
})
389389

390-
e, err := getEncoder(context.Background(), opts, targets, false, nil, nil, getTestingEnrichedSourceProvider(opts))
390+
e, err := getEncoder(context.Background(), opts, targets, false, nil, nil, getTestingEnrichedSourceProvider(t, opts))
391391
require.NoError(t, err)
392392

393393
rowInsert := cdcevent.TestingMakeEventRow(tableDesc, 0, row, false)
@@ -419,7 +419,7 @@ func TestAvroEncoderWithTLS(t *testing.T) {
419419
defer noCertReg.Close()
420420
opts.SchemaRegistryURI = noCertReg.URL()
421421

422-
enc, err := getEncoder(context.Background(), opts, targets, false, nil, nil, getTestingEnrichedSourceProvider(opts))
422+
enc, err := getEncoder(context.Background(), opts, targets, false, nil, nil, getTestingEnrichedSourceProvider(t, opts))
423423
require.NoError(t, err)
424424
_, err = enc.EncodeKey(context.Background(), rowInsert)
425425
require.Regexp(t, "x509", err)
@@ -432,7 +432,7 @@ func TestAvroEncoderWithTLS(t *testing.T) {
432432
defer wrongCertReg.Close()
433433
opts.SchemaRegistryURI = wrongCertReg.URL()
434434

435-
enc, err = getEncoder(context.Background(), opts, targets, false, nil, nil, getTestingEnrichedSourceProvider(opts))
435+
enc, err = getEncoder(context.Background(), opts, targets, false, nil, nil, getTestingEnrichedSourceProvider(t, opts))
436436
require.NoError(t, err)
437437
_, err = enc.EncodeKey(context.Background(), rowInsert)
438438
require.Regexp(t, `contacting confluent schema registry.*: x509`, err)
@@ -974,7 +974,7 @@ func BenchmarkEncoders(b *testing.B) {
974974
b.StopTimer()
975975

976976
encoder, err := getEncoder(context.Background(), opts, targets, false, nil, nil,
977-
getTestingEnrichedSourceProvider(opts))
977+
getTestingEnrichedSourceProvider(b, opts))
978978
if err != nil {
979979
b.Fatal(err)
980980
}
@@ -1248,7 +1248,7 @@ func TestJsonRountrip(t *testing.T) {
12481248

12491249
// TODO(#139660): test this with other envelopes.
12501250
opts := jsonEncoderOptions{EncodingOptions: changefeedbase.EncodingOptions{Envelope: changefeedbase.OptEnvelopeBare}}
1251-
encoder, err := makeJSONEncoder(context.Background(), opts, getTestingEnrichedSourceProvider(opts.EncodingOptions), makeChangefeedTargets(test.name))
1251+
encoder, err := makeJSONEncoder(context.Background(), opts, getTestingEnrichedSourceProvider(t, opts.EncodingOptions), makeChangefeedTargets(test.name))
12521252
require.NoError(t, err)
12531253

12541254
// Encode the value to a string and parse it. Assert that the parsed json matches the

0 commit comments

Comments
 (0)
Please sign in to comment.