Skip to content

Commit 9478f89

Browse files
wk989898ti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#12038
Signed-off-by: ti-chi-bot <[email protected]>
1 parent bdaa449 commit 9478f89

File tree

11 files changed

+5912
-3
lines changed

11 files changed

+5912
-3
lines changed

cdc/entry/mounter_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -1221,13 +1221,13 @@ func TestNewDMRowChange(t *testing.T) {
12211221
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
12221222
cols := []*model.Column{
12231223
{
1224-
Name: "id", Type: 3, Charset: "binary", Flag: 65, Value: 1, Default: nil,
1224+
Name: "id", Type: 3, Charset: "binary", Flag: 65, Value: 1,
12251225
},
12261226
{
1227-
Name: "a1", Type: 3, Charset: "binary", Flag: 51, Value: 1, Default: nil,
1227+
Name: "a1", Type: 3, Charset: "binary", Flag: 51, Value: 1,
12281228
},
12291229
{
1230-
Name: "a3", Type: 3, Charset: "binary", Flag: 51, Value: 2, Default: nil,
1230+
Name: "a3", Type: 3, Charset: "binary", Flag: 51, Value: 2,
12311231
},
12321232
}
12331233
recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset)

cdc/model/codec/codec.go

+228
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
// Copyright 2023 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package codec
15+
16+
import (
17+
"encoding/binary"
18+
19+
timodel "github.com/pingcap/tidb/pkg/meta/model"
20+
"github.com/pingcap/tiflow/cdc/model"
21+
codecv1 "github.com/pingcap/tiflow/cdc/model/codec/v1"
22+
"github.com/tinylib/msgp/msgp"
23+
)
24+
25+
const (
26+
v1HeaderLength int = 4
27+
versionPrefixLength int = 2
28+
versionFieldLength int = 2
29+
30+
latestVersion uint16 = 2
31+
)
32+
33+
// NOTE: why we need this?
34+
//
35+
// Before this logic is introduced, redo log is encoded into byte slice without a version field.
36+
// This makes it hard to extend in the future.
37+
// However, in the old format (i.e. v1 format), the first 5 bytes are always same, which can be
38+
// confirmed in v1/codec_gen.go. So we reuse those bytes, and add a version field in them.
39+
var (
40+
versionPrefix = [versionPrefixLength]byte{0xff, 0xff}
41+
)
42+
43+
func postUnmarshal(r *model.RedoLog) {
44+
workaroundColumn := func(c *model.Column, redoC *model.RedoColumn) {
45+
c.Flag = model.ColumnFlagType(redoC.Flag)
46+
if redoC.ValueIsEmptyBytes {
47+
c.Value = []byte{}
48+
} else {
49+
c.Value = redoC.Value
50+
}
51+
}
52+
53+
if r.RedoRow.Row != nil {
54+
row := r.RedoRow.Row
55+
for i, c := range row.Columns {
56+
if c != nil {
57+
workaroundColumn(c, &r.RedoRow.Columns[i])
58+
}
59+
}
60+
for i, c := range row.PreColumns {
61+
if c != nil {
62+
workaroundColumn(c, &r.RedoRow.PreColumns[i])
63+
}
64+
}
65+
r.RedoRow.Columns = nil
66+
r.RedoRow.PreColumns = nil
67+
}
68+
if r.RedoDDL.DDL != nil {
69+
r.RedoDDL.DDL.Type = timodel.ActionType(r.RedoDDL.Type)
70+
r.RedoDDL.DDL.TableInfo = &model.TableInfo{
71+
TableName: r.RedoDDL.TableName,
72+
}
73+
}
74+
}
75+
76+
func preMarshal(r *model.RedoLog) {
77+
// Workaround empty byte slice for msgp#247
78+
workaroundColumn := func(redoC *model.RedoColumn) {
79+
switch v := redoC.Value.(type) {
80+
case []byte:
81+
if len(v) == 0 {
82+
redoC.ValueIsEmptyBytes = true
83+
}
84+
}
85+
}
86+
87+
if r.RedoRow.Row != nil {
88+
row := r.RedoRow.Row
89+
r.RedoRow.Columns = make([]model.RedoColumn, 0, len(row.Columns))
90+
r.RedoRow.PreColumns = make([]model.RedoColumn, 0, len(row.PreColumns))
91+
for _, c := range row.Columns {
92+
redoC := model.RedoColumn{}
93+
if c != nil {
94+
redoC.Value = c.Value
95+
redoC.Flag = uint64(c.Flag)
96+
workaroundColumn(&redoC)
97+
}
98+
r.RedoRow.Columns = append(r.RedoRow.Columns, redoC)
99+
}
100+
for _, c := range row.PreColumns {
101+
redoC := model.RedoColumn{}
102+
if c != nil {
103+
redoC.Value = c.Value
104+
redoC.Flag = uint64(c.Flag)
105+
workaroundColumn(&redoC)
106+
}
107+
r.RedoRow.PreColumns = append(r.RedoRow.PreColumns, redoC)
108+
}
109+
}
110+
if r.RedoDDL.DDL != nil {
111+
r.RedoDDL.Type = byte(r.RedoDDL.DDL.Type)
112+
if r.RedoDDL.DDL.TableInfo != nil {
113+
r.RedoDDL.TableName = r.RedoDDL.DDL.TableInfo.TableName
114+
}
115+
}
116+
}
117+
118+
// UnmarshalRedoLog unmarshals a RedoLog from the given byte slice.
119+
func UnmarshalRedoLog(bts []byte) (r *model.RedoLog, o []byte, err error) {
120+
if len(bts) < versionPrefixLength {
121+
err = msgp.ErrShortBytes
122+
return
123+
}
124+
125+
shouldBeV1 := false
126+
for i := 0; i < versionPrefixLength; i++ {
127+
if bts[i] != versionPrefix[i] {
128+
shouldBeV1 = true
129+
break
130+
}
131+
}
132+
if shouldBeV1 {
133+
var rv1 *codecv1.RedoLog = new(codecv1.RedoLog)
134+
if o, err = rv1.UnmarshalMsg(bts); err != nil {
135+
return
136+
}
137+
codecv1.PostUnmarshal(rv1)
138+
r = redoLogFromV1(rv1)
139+
} else {
140+
bts = bts[versionPrefixLength:]
141+
version, bts := decodeVersion(bts)
142+
if version == latestVersion {
143+
r = new(model.RedoLog)
144+
if o, err = r.UnmarshalMsg(bts); err != nil {
145+
return
146+
}
147+
postUnmarshal(r)
148+
} else {
149+
panic("unsupported codec version")
150+
}
151+
}
152+
return
153+
}
154+
155+
// MarshalRedoLog marshals a RedoLog into bytes.
156+
func MarshalRedoLog(r *model.RedoLog, b []byte) (o []byte, err error) {
157+
preMarshal(r)
158+
b = append(b, versionPrefix[:]...)
159+
b = binary.BigEndian.AppendUint16(b, latestVersion)
160+
o, err = r.MarshalMsg(b)
161+
return
162+
}
163+
164+
// MarshalDDLAsRedoLog converts a DDLEvent into RedoLog, and then marshals it.
165+
func MarshalDDLAsRedoLog(d *model.DDLEvent, b []byte) (o []byte, err error) {
166+
log := &model.RedoLog{
167+
RedoDDL: model.RedoDDLEvent{DDL: d},
168+
Type: model.RedoLogTypeDDL,
169+
}
170+
return MarshalRedoLog(log, b)
171+
}
172+
173+
func decodeVersion(bts []byte) (uint16, []byte) {
174+
version := binary.BigEndian.Uint16(bts[0:versionFieldLength])
175+
return version, bts[versionFieldLength:]
176+
}
177+
178+
func redoLogFromV1(rv1 *codecv1.RedoLog) (r *model.RedoLog) {
179+
r = &model.RedoLog{Type: (model.RedoLogType)(rv1.Type)}
180+
if rv1.RedoRow != nil && rv1.RedoRow.Row != nil {
181+
r.RedoRow.Row = &model.RowChangedEventInRedoLog{
182+
StartTs: rv1.RedoRow.Row.StartTs,
183+
CommitTs: rv1.RedoRow.Row.CommitTs,
184+
Table: tableNameFromV1(rv1.RedoRow.Row.Table),
185+
Columns: make([]*model.Column, 0, len(rv1.RedoRow.Row.Columns)),
186+
PreColumns: make([]*model.Column, 0, len(rv1.RedoRow.Row.PreColumns)),
187+
IndexColumns: rv1.RedoRow.Row.IndexColumns,
188+
}
189+
for _, c := range rv1.RedoRow.Row.Columns {
190+
r.RedoRow.Row.Columns = append(r.RedoRow.Row.Columns, columnFromV1(c))
191+
}
192+
for _, c := range rv1.RedoRow.Row.PreColumns {
193+
r.RedoRow.Row.PreColumns = append(r.RedoRow.Row.PreColumns, columnFromV1(c))
194+
}
195+
}
196+
if rv1.RedoDDL != nil && rv1.RedoDDL.DDL != nil {
197+
r.RedoDDL.DDL = &model.DDLEvent{
198+
StartTs: rv1.RedoDDL.DDL.StartTs,
199+
CommitTs: rv1.RedoDDL.DDL.CommitTs,
200+
Query: rv1.RedoDDL.DDL.Query,
201+
TableInfo: rv1.RedoDDL.DDL.TableInfo,
202+
PreTableInfo: rv1.RedoDDL.DDL.PreTableInfo,
203+
Type: rv1.RedoDDL.DDL.Type,
204+
}
205+
r.RedoDDL.DDL.Done.Store(rv1.RedoDDL.DDL.Done)
206+
}
207+
return
208+
}
209+
210+
func tableNameFromV1(t *codecv1.TableName) *model.TableName {
211+
return &model.TableName{
212+
Schema: t.Schema,
213+
Table: t.Table,
214+
TableID: t.TableID,
215+
IsPartition: t.IsPartition,
216+
}
217+
}
218+
219+
func columnFromV1(c *codecv1.Column) *model.Column {
220+
return &model.Column{
221+
Name: c.Name,
222+
Type: c.Type,
223+
Charset: c.Charset,
224+
Flag: c.Flag,
225+
Value: c.Value,
226+
ApproximateBytes: c.ApproximateBytes,
227+
}
228+
}

cdc/model/schema_storage.go

+78
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,21 @@ package model
1616
import (
1717
"fmt"
1818

19+
<<<<<<< HEAD
1920
"github.com/pingcap/tidb/parser/model"
2021
"github.com/pingcap/tidb/parser/mysql"
2122
"github.com/pingcap/tidb/parser/types"
2223
"github.com/pingcap/tidb/table/tables"
2324
"github.com/pingcap/tidb/util/rowcodec"
25+
=======
26+
"github.com/pingcap/log"
27+
"github.com/pingcap/tidb/pkg/meta/model"
28+
"github.com/pingcap/tidb/pkg/parser/mysql"
29+
"github.com/pingcap/tidb/pkg/parser/types"
30+
"github.com/pingcap/tidb/pkg/table/tables"
31+
"github.com/pingcap/tidb/pkg/util/rowcodec"
32+
"go.uber.org/zap"
33+
>>>>>>> 600286c56d (sink(ticdc): fix incorrect `default` field (#12038))
2434
)
2535

2636
const (
@@ -319,3 +329,71 @@ func (ti *TableInfo) IsIndexUnique(indexInfo *model.IndexInfo) bool {
319329
func (ti *TableInfo) Clone() *TableInfo {
320330
return WrapTableInfo(ti.SchemaID, ti.TableName.Schema, ti.Version, ti.TableInfo.Clone())
321331
}
332+
<<<<<<< HEAD
333+
=======
334+
335+
// GetIndex return the corresponding index by the given name.
336+
func (ti *TableInfo) GetIndex(name string) *model.IndexInfo {
337+
for _, index := range ti.Indices {
338+
if index != nil && index.Name.O == name {
339+
return index
340+
}
341+
}
342+
return nil
343+
}
344+
345+
// IndexByName returns the index columns and offsets of the corresponding index by name
346+
func (ti *TableInfo) IndexByName(name string) ([]string, []int, bool) {
347+
index := ti.GetIndex(name)
348+
if index == nil {
349+
return nil, nil, false
350+
}
351+
names := make([]string, 0, len(index.Columns))
352+
offset := make([]int, 0, len(index.Columns))
353+
for _, col := range index.Columns {
354+
names = append(names, col.Name.O)
355+
offset = append(offset, col.Offset)
356+
}
357+
return names, offset, true
358+
}
359+
360+
// OffsetsByNames returns the column offsets of the corresponding columns by names
361+
// If any column does not exist, return false
362+
func (ti *TableInfo) OffsetsByNames(names []string) ([]int, bool) {
363+
// todo: optimize it
364+
columnOffsets := make(map[string]int, len(ti.Columns))
365+
for _, col := range ti.Columns {
366+
if col != nil {
367+
columnOffsets[col.Name.O] = col.Offset
368+
}
369+
}
370+
371+
result := make([]int, 0, len(names))
372+
for _, col := range names {
373+
offset, ok := columnOffsets[col]
374+
if !ok {
375+
return nil, false
376+
}
377+
result = append(result, offset)
378+
}
379+
380+
return result, true
381+
}
382+
383+
// GetPrimaryKeyColumnNames returns the primary key column names
384+
func (ti *TableInfo) GetPrimaryKeyColumnNames() []string {
385+
var result []string
386+
if ti.PKIsHandle {
387+
result = append(result, ti.GetPkColInfo().Name.O)
388+
return result
389+
}
390+
391+
indexInfo := ti.GetPrimaryKey()
392+
if indexInfo != nil {
393+
for _, col := range indexInfo.Columns {
394+
result = append(result, col.Name.O)
395+
}
396+
}
397+
return result
398+
}
399+
>>>>>>> 600286c56d (sink(ticdc): fix incorrect `default` field (#12038))

0 commit comments

Comments
 (0)