Skip to content

Commit 600286c

Browse files
authored
sink(ticdc): fix incorrect default field (#12038)
close #12037
1 parent 4b47982 commit 600286c

File tree

11 files changed

+20
-58
lines changed

11 files changed

+20
-58
lines changed

cdc/entry/mounter_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -1629,13 +1629,13 @@ func TestNewDMRowChange(t *testing.T) {
16291629
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
16301630
cols := []*model.Column{
16311631
{
1632-
Name: "id", Type: 3, Charset: "binary", Flag: 65, Value: 1, Default: nil,
1632+
Name: "id", Type: 3, Charset: "binary", Flag: 65, Value: 1,
16331633
},
16341634
{
1635-
Name: "a1", Type: 3, Charset: "binary", Flag: 51, Value: 1, Default: nil,
1635+
Name: "a1", Type: 3, Charset: "binary", Flag: 51, Value: 1,
16361636
},
16371637
{
1638-
Name: "a3", Type: 3, Charset: "binary", Flag: 51, Value: 2, Default: nil,
1638+
Name: "a3", Type: 3, Charset: "binary", Flag: 51, Value: 2,
16391639
},
16401640
}
16411641
recoveredTI := model.BuildTiDBTableInfo(cdcTableInfo.TableName.Table, cols, cdcTableInfo.IndexColumnsOffset)

cdc/model/codec/codec.go

-1
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,6 @@ func columnFromV1(c *codecv1.Column) *model.Column {
223223
Charset: c.Charset,
224224
Flag: c.Flag,
225225
Value: c.Value,
226-
Default: c.Default,
227226
ApproximateBytes: c.ApproximateBytes,
228227
}
229228
}

cdc/model/schema_storage.go

-11
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"github.com/pingcap/tidb/pkg/parser/mysql"
2222
"github.com/pingcap/tidb/pkg/parser/types"
2323
"github.com/pingcap/tidb/pkg/table/tables"
24-
datumTypes "github.com/pingcap/tidb/pkg/types"
2524
"github.com/pingcap/tidb/pkg/util/rowcodec"
2625
"go.uber.org/zap"
2726
)
@@ -497,13 +496,3 @@ func (ti *TableInfo) GetPrimaryKeyColumnNames() []string {
497496
}
498497
return result
499498
}
500-
501-
// GetColumnDefaultValue returns the default definition of a column.
502-
func GetColumnDefaultValue(col *model.ColumnInfo) interface{} {
503-
defaultValue := col.GetDefaultValue()
504-
if defaultValue == nil {
505-
defaultValue = col.GetOriginDefaultValue()
506-
}
507-
defaultDatum := datumTypes.NewDatum(defaultValue)
508-
return defaultDatum.GetValue()
509-
}

cdc/model/sink.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,6 @@ func columnData2Column(col *ColumnData, tableInfo *TableInfo) *Column {
480480
Collation: colInfo.GetCollate(),
481481
Flag: *tableInfo.ColumnsFlag[colID],
482482
Value: col.Value,
483-
Default: GetColumnDefaultValue(colInfo),
484483
}
485484
}
486485

@@ -652,7 +651,6 @@ type Column struct {
652651
Collation string `msg:"collation"`
653652
Flag ColumnFlagType `msg:"-"`
654653
Value interface{} `msg:"-"`
655-
Default interface{} `msg:"-"`
656654

657655
// ApproximateBytes is approximate bytes consumed by the column.
658656
ApproximateBytes int `msg:"-"`
@@ -1360,7 +1358,7 @@ func (x ColumnDataX) GetFlag() ColumnFlagType {
13601358

13611359
// GetDefaultValue return default value.
13621360
func (x ColumnDataX) GetDefaultValue() interface{} {
1363-
return GetColumnDefaultValue(x.info)
1361+
return x.info.GetDefaultValue()
13641362
}
13651363

13661364
// GetColumnInfo returns column info.
@@ -1388,7 +1386,6 @@ func Columns2ColumnDataForTest(columns []*Column) ([]*ColumnData, *TableInfo) {
13881386
info.Columns[i].SetType(column.Type)
13891387
info.Columns[i].SetCharset(column.Charset)
13901388
info.Columns[i].SetCollate(column.Collation)
1391-
info.Columns[i].DefaultValue = column.Default
13921389

13931390
info.ColumnsFlag[columnID] = new(ColumnFlagType)
13941391
*info.ColumnsFlag[columnID] = column.Flag

pkg/sink/cloudstorage/table_definition.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func (t *TableCol) FromTiColumnInfo(col *timodel.ColumnInfo, outputColumnID bool
7777
if mysql.HasNotNullFlag(col.GetFlag()) {
7878
t.Nullable = "false"
7979
}
80-
t.Default = model.GetColumnDefaultValue(col)
80+
t.Default = col.GetDefaultValue()
8181

8282
switch col.GetType() {
8383
case mysql.TypeTimestamp, mysql.TypeDatetime, mysql.TypeDuration:

pkg/sink/codec/debezium/codec.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,10 @@ func (c *dbzCodec) writeDebeziumFieldValue(
505505
col model.ColumnDataX,
506506
ft *types.FieldType,
507507
) error {
508-
value := getValue(col)
508+
value := col.Value
509+
if value == nil {
510+
value = col.GetDefaultValue()
511+
}
509512
if value == nil {
510513
writer.WriteNullField(col.GetName())
511514
return nil

pkg/sink/codec/debezium/helper.go

+7-20
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,6 @@ func (v *visiter) Leave(n ast.Node) (node ast.Node, ok bool) {
4343
switch col := n.(type) {
4444
case *ast.ColumnDef:
4545
c := v.columnsMap[col.Name.Name]
46-
if col.Options != nil {
47-
parseOptions(col.Options, c)
48-
}
4946
if col.Tp != nil {
5047
parseType(c, col)
5148
}
@@ -67,16 +64,13 @@ func extractValue(expr ast.ExprNode) any {
6764
func parseType(c *timodel.ColumnInfo, col *ast.ColumnDef) {
6865
ft := col.Tp
6966
switch ft.GetType() {
70-
case mysql.TypeDatetime, mysql.TypeDuration, mysql.TypeTimestamp:
71-
c.SetDecimal(ft.GetDecimal())
72-
if c.OriginDefaultValue != nil {
73-
c.SetDefaultValue(c.OriginDefaultValue)
74-
}
75-
case mysql.TypeYear:
76-
c.SetFlen(ft.GetFlen())
77-
if c.OriginDefaultValue != nil {
78-
c.SetDefaultValue(c.OriginDefaultValue)
67+
case mysql.TypeDatetime, mysql.TypeDuration, mysql.TypeTimestamp, mysql.TypeYear:
68+
if ft.GetType() == mysql.TypeYear {
69+
c.SetFlen(ft.GetFlen())
70+
} else {
71+
c.SetDecimal(ft.GetDecimal())
7972
}
73+
parseOptions(col.Options, c)
8074
default:
8175
}
8276
}
@@ -89,7 +83,7 @@ func parseOptions(options []*ast.ColumnOption, c *timodel.ColumnInfo) {
8983
if defaultValue == nil {
9084
continue
9185
}
92-
if err := c.SetOriginDefaultValue(defaultValue); err != nil {
86+
if err := c.SetDefaultValue(defaultValue); err != nil {
9387
log.Error("failed to set default value")
9488
}
9589
}
@@ -243,13 +237,6 @@ func getBitFromUint64(n int, v uint64) []byte {
243237
return buf[:numBytes]
244238
}
245239

246-
func getValue(col model.ColumnDataX) any {
247-
if col.Value == nil {
248-
return col.GetDefaultValue()
249-
}
250-
return col.Value
251-
}
252-
253240
func getDBTableName(e *model.DDLEvent) (string, string) {
254241
if e.TableInfo == nil {
255242
return "", ""

pkg/sink/codec/debezium/helper_test.go

-13
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
pmodel "github.com/pingcap/tidb/pkg/parser/model"
2121
"github.com/pingcap/tidb/pkg/parser/mysql"
2222
"github.com/pingcap/tidb/pkg/types"
23-
"github.com/pingcap/tiflow/cdc/model"
2423
"github.com/stretchr/testify/require"
2524
)
2625

@@ -59,18 +58,6 @@ func TestGetColumns(t *testing.T) {
5958
require.Equal(t, columnInfos[4].Comment, "")
6059
}
6160

62-
func TestGetValue(t *testing.T) {
63-
column := &model.Column{
64-
Default: 1,
65-
}
66-
data := model.Column2ColumnDataXForTest(column)
67-
v := getValue(data)
68-
require.Equal(t, v, int64(1))
69-
data.Value = 2
70-
v = getValue(data)
71-
require.Equal(t, v, 2)
72-
}
73-
7461
func TestGetSchemaTopicName(t *testing.T) {
7562
namespace := "default"
7663
schema := "1A.B"

pkg/sink/codec/simple/avro.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func newTableSchemaMap(tableInfo *model.TableInfo) interface{} {
109109
"nullable": !mysql.HasNotNullFlag(col.GetFlag()),
110110
"default": nil,
111111
}
112-
defaultValue := model.GetColumnDefaultValue(col)
112+
defaultValue := col.GetDefaultValue()
113113
if defaultValue != nil {
114114
// according to TiDB source code, the default value is converted to string if not nil.
115115
column["default"] = map[string]interface{}{

pkg/sink/codec/simple/encoder_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -1487,15 +1487,15 @@ func TestEncodeLargeEventsNormal(t *testing.T) {
14871487

14881488
obtainedDefaultValues := make(map[string]interface{}, len(obtainedDDL.TableInfo.Columns))
14891489
for _, col := range obtainedDDL.TableInfo.Columns {
1490-
obtainedDefaultValues[col.Name.O] = model.GetColumnDefaultValue(col)
1490+
obtainedDefaultValues[col.Name.O] = col.GetDefaultValue()
14911491
switch col.GetType() {
14921492
case mysql.TypeFloat, mysql.TypeDouble:
14931493
require.Equal(t, 0, col.GetDecimal())
14941494
default:
14951495
}
14961496
}
14971497
for _, col := range ddlEvent.TableInfo.Columns {
1498-
expected := model.GetColumnDefaultValue(col)
1498+
expected := col.GetDefaultValue()
14991499
obtained := obtainedDefaultValues[col.Name.O]
15001500
require.Equal(t, expected, obtained)
15011501
}

pkg/sink/codec/simple/message.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func newColumnSchema(col *timodel.ColumnInfo) *columnSchema {
145145
tp.Decimal = col.GetDecimal()
146146
}
147147

148-
defaultValue := model.GetColumnDefaultValue(col)
148+
defaultValue := col.GetDefaultValue()
149149
if defaultValue != nil && col.GetType() == mysql.TypeBit {
150150
defaultValue = common.MustBinaryLiteralToInt([]byte(defaultValue.(string)))
151151
}

0 commit comments

Comments
 (0)