Skip to content

Commit 50c5686

Browse files
committed
crosscluster: add new insert/update/delete replication statements
These statements take advantage of the KV level LWW validation added by PR #143100 and do not explicitly validate the origin timestamp. Update and Delete specify the whole row in the where clause, which is intended to enable future SQL optimizations that generate a CPUT to replace the row instead Release note: none Epic: CRDB-48647
1 parent 8c96475 commit 50c5686

File tree

4 files changed

+369
-1
lines changed

4 files changed

+369
-1
lines changed

pkg/crosscluster/logical/BUILD.bazel

+8-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ go_library(
1515
"offline_initial_scan_processor.go",
1616
"purgatory.go",
1717
"range_stats.go",
18+
"replication_statements.go",
1819
"udf_row_processor.go",
1920
],
2021
importpath = "github.com/cockroachdb/cockroach/pkg/crosscluster/logical",
@@ -76,6 +77,7 @@ go_library(
7677
"//pkg/sql/sem/catid",
7778
"//pkg/sql/sem/eval",
7879
"//pkg/sql/sem/tree",
80+
"//pkg/sql/sem/tree/treecmp",
7981
"//pkg/sql/sessiondata",
8082
"//pkg/sql/sessiondatapb",
8183
"//pkg/sql/stats",
@@ -117,9 +119,12 @@ go_test(
117119
"main_test.go",
118120
"purgatory_test.go",
119121
"range_stats_test.go",
122+
"replication_statements_test.go",
120123
"udf_row_processor_test.go",
121124
],
122-
data = ["//c-deps:libgeos"],
125+
data = glob(["testdata/**"]) + [
126+
"//c-deps:libgeos",
127+
],
123128
embed = [":logical"],
124129
exec_properties = {"test.Pool": "large"},
125130
deps = [
@@ -163,6 +168,7 @@ go_test(
163168
"//pkg/sql/sqltestutils",
164169
"//pkg/sql/stats",
165170
"//pkg/testutils",
171+
"//pkg/testutils/datapathutils",
166172
"//pkg/testutils/jobutils",
167173
"//pkg/testutils/serverutils",
168174
"//pkg/testutils/skip",
@@ -182,6 +188,7 @@ go_test(
182188
"//pkg/util/timeutil",
183189
"//pkg/util/uuid",
184190
"@com_github_cockroachdb_cockroach_go_v2//crdb",
191+
"@com_github_cockroachdb_datadriven//:datadriven",
185192
"@com_github_cockroachdb_errors//:errors",
186193
"@com_github_cockroachdb_redact//:redact",
187194
"@com_github_lib_pq//:pq",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package logical
7+
8+
import (
9+
"fmt"
10+
11+
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
12+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
13+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree/treecmp"
14+
)
15+
16+
// getPhysicalColumns returns the list of columns that are part of the table's
17+
// primary key and value.
18+
func getPhysicalColumns(table catalog.TableDescriptor) []catalog.Column {
19+
columns := table.AllColumns()
20+
result := make([]catalog.Column, 0, len(columns))
21+
for _, col := range columns {
22+
if !col.IsComputed() && !col.IsVirtual() && !col.IsSystemColumn() {
23+
result = append(result, col)
24+
}
25+
}
26+
return result
27+
}
28+
29+
// newInsertStatement returns a statement that can be used to insert a row into
30+
// the table.
31+
//
32+
// The statement will have `n` parameters, where `n` is the number of columns
33+
// in the table. Parameters are ordered by column ID.
34+
func newInsertStatement(table catalog.TableDescriptor) (tree.Statement, error) {
35+
columns := getPhysicalColumns(table)
36+
37+
columnNames := make(tree.NameList, len(columns))
38+
for i, col := range columns {
39+
columnNames[i] = tree.Name(col.GetName())
40+
}
41+
42+
parameters := make(tree.Exprs, len(columnNames))
43+
for i := range columnNames {
44+
var err error
45+
parameters[i], err = tree.NewPlaceholder(fmt.Sprintf("%d", i+1))
46+
if err != nil {
47+
return nil, err
48+
}
49+
}
50+
51+
parameterValues := &tree.ValuesClause{
52+
Rows: []tree.Exprs{
53+
parameters,
54+
},
55+
}
56+
57+
rows := &tree.Select{
58+
Select: parameterValues,
59+
}
60+
61+
insert := &tree.Insert{
62+
Table: &tree.TableRef{
63+
TableID: int64(table.GetID()),
64+
As: tree.AliasClause{Alias: "replication_target"},
65+
},
66+
Rows: rows,
67+
Columns: columnNames,
68+
Returning: tree.AbsentReturningClause,
69+
}
70+
71+
return insert, nil
72+
}
73+
74+
// newUpdateStatement returns a statement that can be used to update a row in
75+
// the table. If a table has `n` columns, the statement will have `2n`
76+
// parameters, where the first `n` parameters are the previous values of the row
77+
// and the last `n` parameters are the new values of the row.
78+
//
79+
// Parameters are ordered by column ID.
80+
func newUpdateStatement(table catalog.TableDescriptor) (tree.Statement, error) {
81+
columns := getPhysicalColumns(table)
82+
83+
// Create WHERE clause placeholders for every column in the table
84+
var whereClause tree.Expr
85+
for i, col := range columns {
86+
placeholder, err := tree.NewPlaceholder(fmt.Sprintf("%d", i+1))
87+
if err != nil {
88+
return nil, err
89+
}
90+
colExpr := &tree.ComparisonExpr{
91+
Operator: treecmp.MakeComparisonOperator(treecmp.EQ),
92+
Left: &tree.ColumnItem{ColumnName: tree.Name(col.GetName())},
93+
Right: placeholder,
94+
}
95+
96+
if whereClause == nil {
97+
whereClause = colExpr
98+
} else {
99+
whereClause = &tree.AndExpr{
100+
Left: whereClause,
101+
Right: colExpr,
102+
}
103+
}
104+
}
105+
106+
exprs := make(tree.UpdateExprs, len(columns))
107+
for i, col := range columns {
108+
nameNode := tree.Name(col.GetName())
109+
names := tree.NameList{nameNode}
110+
111+
// Create a placeholder for the new value (num_columns+i+1) since we
112+
// placholders to the where and placholders are 1 indexed.
113+
placeholder, err := tree.NewPlaceholder(fmt.Sprintf("%d", len(columns)+i+1))
114+
if err != nil {
115+
return nil, err
116+
}
117+
118+
exprs[i] = &tree.UpdateExpr{
119+
Names: names,
120+
Expr: &tree.CastExpr{
121+
Expr: placeholder,
122+
Type: col.GetType(),
123+
SyntaxMode: tree.CastPrepend,
124+
},
125+
}
126+
}
127+
128+
// Create the final update statement
129+
update := &tree.Update{
130+
Table: &tree.TableRef{
131+
TableID: int64(table.GetID()),
132+
As: tree.AliasClause{Alias: "replication_target"},
133+
},
134+
Exprs: exprs,
135+
Where: &tree.Where{Type: tree.AstWhere, Expr: whereClause},
136+
Returning: tree.AbsentReturningClause,
137+
}
138+
139+
return update, nil
140+
}
141+
142+
// newDeleteStatement returns a statement that can be used to delete a row from
143+
// the table. The statement will have `n` parameters, where `n` is the number of
144+
// columns in the table. Parameters are used in the WHERE clause to precisely
145+
// identify the row to delete.
146+
//
147+
// Parameters are ordered by column ID.
148+
func newDeleteStatement(table catalog.TableDescriptor) (tree.Statement, error) {
149+
columns := getPhysicalColumns(table)
150+
151+
// Create WHERE clause placeholders for every column in the table
152+
var whereClause tree.Expr
153+
for i, col := range columns {
154+
placeholder, err := tree.NewPlaceholder(fmt.Sprintf("%d", i+1))
155+
if err != nil {
156+
return nil, err
157+
}
158+
colExpr := &tree.ComparisonExpr{
159+
Operator: treecmp.MakeComparisonOperator(treecmp.EQ),
160+
Left: &tree.ColumnItem{ColumnName: tree.Name(col.GetName())},
161+
Right: placeholder,
162+
}
163+
164+
if whereClause == nil {
165+
whereClause = colExpr
166+
} else {
167+
whereClause = &tree.AndExpr{
168+
Left: whereClause,
169+
Right: colExpr,
170+
}
171+
}
172+
}
173+
174+
// Create the final delete statement
175+
delete := &tree.Delete{
176+
Table: &tree.TableRef{
177+
TableID: int64(table.GetID()),
178+
As: tree.AliasClause{Alias: "replication_target"},
179+
},
180+
Where: &tree.Where{Type: tree.AstWhere, Expr: whereClause},
181+
Returning: tree.AbsentReturningClause,
182+
}
183+
184+
return delete, nil
185+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package logical
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"math/rand"
12+
"testing"
13+
14+
"github.com/cockroachdb/cockroach/pkg/base"
15+
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
16+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
17+
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
18+
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
19+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
20+
"github.com/cockroachdb/cockroach/pkg/util/log"
21+
"github.com/cockroachdb/datadriven"
22+
"github.com/stretchr/testify/require"
23+
)
24+
25+
func TestReplicationStatements(t *testing.T) {
26+
defer leaktest.AfterTest(t)()
27+
defer log.Scope(t).Close(t)
28+
29+
ctx := context.Background()
30+
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
31+
defer s.Stopper().Stop(ctx)
32+
33+
getTableDesc := func(tableName string) catalog.TableDescriptor {
34+
return desctestutils.TestingGetTableDescriptor(
35+
s.DB(),
36+
s.Codec(),
37+
"defaultdb",
38+
"public",
39+
tableName,
40+
)
41+
}
42+
43+
datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) {
44+
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
45+
switch d.Cmd {
46+
case "exec":
47+
_, err := sqlDB.Exec(d.Input)
48+
if err != nil {
49+
return err.Error()
50+
}
51+
return "ok"
52+
case "show-insert":
53+
var tableName string
54+
d.ScanArgs(t, "table", &tableName)
55+
56+
desc := getTableDesc(tableName)
57+
58+
insertStmt, err := newInsertStatement(desc)
59+
require.NoError(t, err)
60+
61+
// Test preparing the statement to ensure it is valid SQL.
62+
_, err = sqlDB.Exec(fmt.Sprintf("PREPARE stmt_%d AS %s", rand.Int(), insertStmt.String()))
63+
require.NoError(t, err)
64+
65+
return fmt.Sprintf("%s", insertStmt.String())
66+
case "show-update":
67+
var tableName string
68+
d.ScanArgs(t, "table", &tableName)
69+
70+
desc := getTableDesc(tableName)
71+
72+
updateStmt, err := newUpdateStatement(desc)
73+
require.NoError(t, err)
74+
75+
// Test preparing the statement to ensure it is valid SQL.
76+
_, err = sqlDB.Exec(fmt.Sprintf("PREPARE stmt_%d AS %s", rand.Int(), updateStmt.String()))
77+
require.NoError(t, err)
78+
79+
return fmt.Sprintf("%s", updateStmt.String())
80+
case "show-delete":
81+
var tableName string
82+
d.ScanArgs(t, "table", &tableName)
83+
84+
desc := getTableDesc(tableName)
85+
86+
deleteStmt, err := newDeleteStatement(desc)
87+
require.NoError(t, err)
88+
89+
// Test preparing the statement to ensure it is valid SQL.
90+
_, err = sqlDB.Exec(fmt.Sprintf("PREPARE stmt_%d AS %s", rand.Int(), deleteStmt.String()))
91+
require.NoError(t, err)
92+
93+
return fmt.Sprintf("%s", deleteStmt.String())
94+
default:
95+
return "unknown command: " + d.Cmd
96+
}
97+
})
98+
})
99+
}

0 commit comments

Comments
 (0)