@@ -3938,7 +3938,8 @@ func TestChangefeedEnriched(t *testing.T) {
3938
3938
// Create an enriched source provider with no data. The contents of source
3939
3939
// will be tested in another test, we just want to make sure the structure &
3940
3940
// schema is right here.
3941
- esp := newEnrichedSourceProvider (changefeedbase.EncodingOptions {}, enrichedSourceData {})
3941
+ esp , err := newEnrichedSourceProvider (changefeedbase.EncodingOptions {}, enrichedSourceData {})
3942
+ require .NoError (t , err )
3942
3943
source , err := esp .GetJSON (cdcevent.Row {})
3943
3944
require .NoError (t , err )
3944
3945
@@ -4236,7 +4237,7 @@ func TestChangefeedEnrichedSourceWithData(t *testing.T) {
4236
4237
}
4237
4238
4238
4239
for _ , testCase := range cases {
4239
- t . Run ( testCase . name , func (t * testing.T ) {
4240
+ testutils . RunTrueAndFalse ( t , "mvcc_ts" , func (t * testing.T , withMVCCTS bool ) {
4240
4241
clusterName := "clusterName123"
4241
4242
dbVersion := "v999.0.0"
4242
4243
defer build .TestingOverrideVersion (dbVersion )()
@@ -4248,7 +4249,11 @@ func TestChangefeedEnrichedSourceWithData(t *testing.T) {
4248
4249
4249
4250
sqlDB .Exec (t , `CREATE TABLE foo (i INT PRIMARY KEY)` )
4250
4251
sqlDB .Exec (t , `INSERT INTO foo values (0)` )
4251
- testFeed := feed (t , f , fmt .Sprintf (`CREATE CHANGEFEED FOR foo WITH envelope=enriched, enriched_properties='source', format=%s` , testCase .format ))
4252
+ stmt := fmt .Sprintf (`CREATE CHANGEFEED FOR foo WITH envelope=enriched, enriched_properties='source', format=%s` , testCase .format )
4253
+ if withMVCCTS {
4254
+ stmt += ", mvcc_timestamp"
4255
+ }
4256
+ testFeed := feed (t , f , stmt )
4252
4257
defer closeFeed (t , testFeed )
4253
4258
4254
4259
var jobID int64
@@ -4276,35 +4281,50 @@ func TestChangefeedEnrichedSourceWithData(t *testing.T) {
4276
4281
sink = sinkTypeSinklessBuffer .String ()
4277
4282
}
4278
4283
4284
+ const dummyMvccTimestamp = "1234567890.0001"
4285
+ jobIDStr := strconv .FormatInt (jobID , 10 )
4286
+
4279
4287
var assertion string
4280
4288
if testCase .format == "avro" {
4281
- assertion = fmt .Sprintf (
4282
- `{
4283
- "source": {
4284
- "cluster_id": {"string": "%s"},
4285
- "cluster_name": {"string": "%s"},
4286
- "db_version": {"string": "%s"},
4287
- "job_id": {"string": "%d"},
4288
- "node_id": {"string": "%s"},
4289
- "node_name": {"string": "%s"},
4290
- "changefeed_sink": {"string": "%s"},
4291
- "source_node_locality": {"string": "%s"}
4292
- }
4293
- }` ,
4294
- clusterID , clusterName , dbVersion , jobID , nodeID , nodeName , sink , sourceNodeLocality )
4289
+ assertionMap := map [string ]any {
4290
+ "source" : map [string ]any {
4291
+ "cluster_id" : map [string ]any {"string" : clusterID },
4292
+ "cluster_name" : map [string ]any {"string" : clusterName },
4293
+ "db_version" : map [string ]any {"string" : dbVersion },
4294
+ "job_id" : map [string ]any {"string" : jobIDStr },
4295
+ // Note that the field is still present in the avro schema, so it appears here as nil.
4296
+ "mvcc_timestamp" : nil ,
4297
+ "node_id" : map [string ]any {"string" : nodeID },
4298
+ "node_name" : map [string ]any {"string" : nodeName },
4299
+ "changefeed_sink" : map [string ]any {"string" : sink },
4300
+ "source_node_locality" : map [string ]any {"string" : sourceNodeLocality },
4301
+ },
4302
+ }
4303
+ if withMVCCTS {
4304
+ mvccTsMap := actualSource ["source" ].(map [string ]any )["mvcc_timestamp" ].(map [string ]any )
4305
+ assertReasonableMVCCTimestamp (t , mvccTsMap ["string" ].(string ))
4306
+
4307
+ mvccTsMap ["string" ] = dummyMvccTimestamp
4308
+ assertionMap ["source" ].(map [string ]any )["mvcc_timestamp" ] = map [string ]any {"string" : dummyMvccTimestamp }
4309
+ }
4310
+ assertion = toJSON (t , assertionMap )
4295
4311
} else {
4296
- assertion = fmt .Sprintf (
4297
- `{
4298
- "cluster_id": "%s",
4299
- "cluster_name": "%s",
4300
- "db_version": "%s",
4301
- "job_id": "%d",
4302
- "node_id": "%s",
4303
- "node_name": "%s",
4304
- "changefeed_sink": "%s",
4305
- "source_node_locality": "%s"
4306
- }` ,
4307
- clusterID , clusterName , dbVersion , jobID , nodeID , nodeName , sink , sourceNodeLocality )
4312
+ assertionMap := map [string ]any {
4313
+ "cluster_id" : clusterID ,
4314
+ "cluster_name" : clusterName ,
4315
+ "db_version" : dbVersion ,
4316
+ "job_id" : jobIDStr ,
4317
+ "node_id" : nodeID ,
4318
+ "node_name" : nodeName ,
4319
+ "changefeed_sink" : sink ,
4320
+ "source_node_locality" : sourceNodeLocality ,
4321
+ }
4322
+ if withMVCCTS {
4323
+ assertReasonableMVCCTimestamp (t , actualSource ["mvcc_timestamp" ].(string ))
4324
+ actualSource ["mvcc_timestamp" ] = dummyMvccTimestamp
4325
+ assertionMap ["mvcc_timestamp" ] = dummyMvccTimestamp
4326
+ }
4327
+ assertion = toJSON (t , assertionMap )
4308
4328
}
4309
4329
4310
4330
value , err := reformatJSON (actualSource )
@@ -10748,3 +10768,9 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
10748
10768
10749
10769
cdcTest (t , testFn , feedTestForceSink ("kafka" ), withTxnRetries )
10750
10770
}
10771
+
10772
+ func assertReasonableMVCCTimestamp (t * testing.T , ts string ) {
10773
+ epochNanos := parseTimeToHLC (t , ts ).WallTime
10774
+ now := timeutil .Now ()
10775
+ require .GreaterOrEqual (t , epochNanos , now .Add (- 1 * time .Hour ).UnixNano ())
10776
+ }
0 commit comments