Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 5eaf411

Browse files
committedMar 13, 2025·
row, sql: implement mutations on vector indexes
This patch plumbs the output of the vector search operators into rowenc for encoding into vector indexes. The output of these vector search operators is included in the row values for mutation operators after partial index values and are plumbed into a new vector index update helper, which tracks the column values until they're needed by rowenc. While we're here, we homogonize how pkg/sql/{delete,insert,update,upsert}.go consume row values, hopefully improving legibility. Also fixed are a couple of bugs in the vector search and row encoding. Epic: CRDB-42943 Release note: None
1 parent 0244ff5 commit 5eaf411

29 files changed

+444
-131
lines changed
 

‎pkg/crosscluster/logical/lww_kv_processor.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -476,13 +476,15 @@ func (p *kvTableWriter) insertRow(ctx context.Context, b *kv.Batch, after cdceve
476476

477477
var ph row.PartialIndexUpdateHelper
478478
// TODO(dt): support partial indexes.
479+
var vh row.VectorIndexUpdateHelper
480+
// TODO(mw5h, drewk): support vector indexes.
479481
oth := &row.OriginTimestampCPutHelper{
480482
OriginTimestamp: after.MvccTimestamp,
481483
// TODO(ssd): We should choose this based by comparing the cluster IDs of the source
482484
// and destination clusters.
483485
// ShouldWinTie: true,
484486
}
485-
return p.ri.InsertRow(ctx, &row.KVBatchAdapter{Batch: b}, p.newVals, ph, oth, row.CPutOp, false /* traceKV */)
487+
return p.ri.InsertRow(ctx, &row.KVBatchAdapter{Batch: b}, p.newVals, ph, vh, oth, row.CPutOp, false /* traceKV */)
486488
}
487489

488490
func (p *kvTableWriter) updateRow(
@@ -497,13 +499,15 @@ func (p *kvTableWriter) updateRow(
497499

498500
var ph row.PartialIndexUpdateHelper
499501
// TODO(dt): support partial indexes.
502+
var vh row.VectorIndexUpdateHelper
503+
// TODO(mw5h, drewk): support vector indexes.
500504
oth := &row.OriginTimestampCPutHelper{
501505
OriginTimestamp: after.MvccTimestamp,
502506
// TODO(ssd): We should choose this based by comparing the cluster IDs of the source
503507
// and destination clusters.
504508
// ShouldWinTie: true,
505509
}
506-
_, err := p.ru.UpdateRow(ctx, b, p.oldVals, p.newVals, ph, oth, false)
510+
_, err := p.ru.UpdateRow(ctx, b, p.oldVals, p.newVals, ph, vh, oth, false)
507511
return err
508512
}
509513

@@ -516,6 +520,8 @@ func (p *kvTableWriter) deleteRow(
516520

517521
var ph row.PartialIndexUpdateHelper
518522
// TODO(dt): support partial indexes.
523+
var vh row.VectorIndexUpdateHelper
524+
// TODO(mw5h, drewk): support vector indexes.
519525
oth := &row.OriginTimestampCPutHelper{
520526
PreviousWasDeleted: before.IsDeleted(),
521527
OriginTimestamp: after.MvccTimestamp,
@@ -524,7 +530,7 @@ func (p *kvTableWriter) deleteRow(
524530
// ShouldWinTie: true,
525531
}
526532

527-
return p.rd.DeleteRow(ctx, b, p.oldVals, ph, oth, false)
533+
return p.rd.DeleteRow(ctx, b, p.oldVals, ph, vh, oth, false)
528534
}
529535

530536
func (p *kvTableWriter) fillOld(vals cdcevent.Row) error {

‎pkg/sql/backfill/backfill.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -398,12 +398,13 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
398398
}
399399
copy(oldValues, fetchedValues)
400400

401-
// No existing secondary indexes will be updated by adding or dropping a
402-
// column. It is safe to use an empty PartialIndexUpdateHelper in this
403-
// case.
401+
// No existing secondary indexes will be updated by adding or dropping a column.
402+
// It is safe to use an empty PartialIndexUpdateHelper and
403+
// VectorIndexUpdateHelper in this case.
404404
var pm row.PartialIndexUpdateHelper
405+
var vh row.VectorIndexUpdateHelper
405406
if _, err := ru.UpdateRow(
406-
ctx, b, oldValues, updateValues, pm, nil, traceKV,
407+
ctx, b, oldValues, updateValues, pm, vh, nil, traceKV,
407408
); err != nil {
408409
return roachpb.Key{}, err
409410
}

‎pkg/sql/colenc/encode.go

+5
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,11 @@ func (b *BatchEncoder) encodeSecondaryIndex(ctx context.Context, ind catalog.Ind
432432
// Since the inverted indexes generate multiple keys per row just handle them
433433
// separately.
434434
return b.encodeInvertedSecondaryIndex(ctx, ind, kys, b.extraKeys)
435+
} else if ind.GetType() == idxtype.VECTOR {
436+
// TODO(mw5h, drewk): Implement encoding for vector indexes. This is complicated
437+
// by vector indexes needing output from the mutation search operator, which will
438+
// need to be plumbed through to the encoder.
439+
return errors.AssertionFailedf("vector indexes not supported")
435440
} else {
436441
keyAndSuffixCols := b.rh.TableDesc.IndexFetchSpecKeyAndSuffixColumns(ind)
437442
keyCols := keyAndSuffixCols[:ind.NumKeyColumns()]

‎pkg/sql/colenc/encode_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -606,8 +606,9 @@ func buildRowKVs(
606606
}
607607
p := &capturePutter{}
608608
var pm row.PartialIndexUpdateHelper
609+
var vh row.VectorIndexUpdateHelper
609610
for _, d := range datums {
610-
if err := inserter.InsertRow(context.Background(), p, d, pm, nil, row.CPutOp, true /* traceKV */); err != nil {
611+
if err := inserter.InsertRow(context.Background(), p, d, pm, vh, nil, row.CPutOp, true /* traceKV */); err != nil {
611612
return kvs{}, err
612613
}
613614
}

‎pkg/sql/copy_from.go

+4
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,10 @@ func (c *copyMachine) canSupportVectorized(table catalog.TableDescriptor) bool {
418418
if c.p.SessionData().VectorizeMode == sessiondatapb.VectorizeOff {
419419
return false
420420
}
421+
// Columnar vector index encoding is not yet supported.
422+
if len(table.VectorIndexes()) > 0 {
423+
return false
424+
}
421425
// Vectorized COPY doesn't support foreign key checks, no reason it couldn't
422426
// but it doesn't work right now because we don't have the ability to
423427
// hold the results in a bufferNode. We wouldn't want to enable it

‎pkg/sql/create_table.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -612,11 +612,12 @@ func (n *createTableNode) startExec(params runParams) error {
612612
// Populate the buffer.
613613
copy(rowBuffer, n.input.Values())
614614

615-
// CREATE TABLE AS does not copy indexes from the input table.
616-
// An empty row.PartialIndexUpdateHelper is used here because
617-
// there are no indexes, partial or otherwise, to update.
615+
// CREATE TABLE AS does not copy indexes from the input table. Empty
616+
// partial and vector index helpers are used here because there are no
617+
// indexes, partial, vector, or otherwise, to update.
618618
var pm row.PartialIndexUpdateHelper
619-
if err := ti.row(params.ctx, rowBuffer, pm, params.extendedEvalCtx.Tracing.KVTracingEnabled()); err != nil {
619+
var vh row.VectorIndexUpdateHelper
620+
if err := ti.row(params.ctx, rowBuffer, pm, vh, params.extendedEvalCtx.Tracing.KVTracingEnabled()); err != nil {
620621
return err
621622
}
622623
}

‎pkg/sql/delete.go

+15-11
Original file line numberDiff line numberDiff line change
@@ -146,29 +146,33 @@ func (d *deleteNode) BatchedNext(params runParams) (bool, error) {
146146
// processSourceRow processes one row from the source for deletion and, if
147147
// result rows are needed, saves it in the result row container
148148
func (d *deleteNode) processSourceRow(params runParams, sourceVals tree.Datums) error {
149+
// Remove extra columns for partial index predicate values and AFTER triggers.
150+
deleteVals := sourceVals[:len(d.run.td.rd.FetchCols)+d.run.numPassthrough]
151+
sourceVals = sourceVals[len(deleteVals):]
152+
149153
// Create a set of partial index IDs to not delete from. Indexes should not
150154
// be deleted from when they are partial indexes and the row does not
151155
// satisfy the predicate and therefore do not exist in the partial index.
152156
// This set is passed as a argument to tableDeleter.row below.
153157
var pm row.PartialIndexUpdateHelper
154-
deleteCols := len(d.run.td.rd.FetchCols) + d.run.numPassthrough
155158
if n := len(d.run.td.tableDesc().PartialIndexes()); n > 0 {
156-
partialIndexDelVals := sourceVals[deleteCols : deleteCols+n]
157-
158-
err := pm.Init(nil /* partialIndexPutVals */, partialIndexDelVals, d.run.td.tableDesc())
159+
err := pm.Init(nil /* partialIndexPutVals */, sourceVals[:n], d.run.td.tableDesc())
159160
if err != nil {
160161
return err
161162
}
163+
sourceVals = sourceVals[n:]
162164
}
163165

164-
if len(sourceVals) > deleteCols {
165-
// Remove extra columns for partial index predicate values and AFTER
166-
// triggers.
167-
sourceVals = sourceVals[:deleteCols]
166+
// Keep track of the vector index partitions to update. This information is
167+
// passed to tableInserter.row below.
168+
var vh row.VectorIndexUpdateHelper
169+
if n := len(d.run.td.tableDesc().VectorIndexes()); n > 0 {
170+
vh.InitForDel(sourceVals[:n], d.run.td.tableDesc())
171+
sourceVals = sourceVals[n:]
168172
}
169173

170174
// Queue the deletion in the KV batch.
171-
if err := d.run.td.row(params.ctx, sourceVals, pm, d.run.traceKV); err != nil {
175+
if err := d.run.td.row(params.ctx, deleteVals, pm, vh, d.run.traceKV); err != nil {
172176
return err
173177
}
174178

@@ -188,7 +192,7 @@ func (d *deleteNode) processSourceRow(params runParams, sourceVals tree.Datums)
188192
if retIdx >= largestRetIdx {
189193
largestRetIdx = retIdx
190194
}
191-
resultValues[retIdx] = sourceVals[i]
195+
resultValues[retIdx] = deleteVals[i]
192196
}
193197
}
194198

@@ -198,7 +202,7 @@ func (d *deleteNode) processSourceRow(params runParams, sourceVals tree.Datums)
198202
if d.run.numPassthrough > 0 {
199203
passthroughBegin := len(d.run.td.rd.FetchCols)
200204
passthroughEnd := passthroughBegin + d.run.numPassthrough
201-
passthroughValues := sourceVals[passthroughBegin:passthroughEnd]
205+
passthroughValues := deleteVals[passthroughBegin:passthroughEnd]
202206

203207
for i := 0; i < d.run.numPassthrough; i++ {
204208
largestRetIdx++

‎pkg/sql/distsql_physical_planner.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -4622,7 +4622,8 @@ func (dsp *DistSQLPlanner) planVectorMutationSearch(
46224622
suffixKeyColumnOrdinals[i] = uint32(p.PlanToStreamColMap[col])
46234623
}
46244624
keyAndSuffixCols := planInfo.table.IndexFetchSpecKeyAndSuffixColumns(planInfo.index)
4625-
prefixKeyCols := keyAndSuffixCols[planInfo.index.NumKeyColumns()-1:]
4625+
// Prefix cols do not include the vector column, which is the last key column.
4626+
prefixKeyCols := keyAndSuffixCols[:planInfo.index.NumKeyColumns()-1]
46264627
suffixKeyCols := keyAndSuffixCols[planInfo.index.NumKeyColumns():]
46274628
spec := &execinfrapb.VectorMutationSearchSpec{
46284629
PrefixKeyColumnOrdinals: prefixKeyColumnOrdinals,

‎pkg/sql/insert.go

+24-16
Original file line numberDiff line numberDiff line change
@@ -202,31 +202,39 @@ func (r *insertRun) processSourceRow(params runParams, rowVals tree.Datums) erro
202202
return err
203203
}
204204

205+
rowVals = rowVals[len(insertVals):]
206+
207+
// Verify the CHECK constraint results, if any.
208+
if n := r.checkOrds.Len(); n > 0 {
209+
if err := checkMutationInput(
210+
params.ctx, params.p.EvalContext(), &params.p.semaCtx, params.p.SessionData(),
211+
r.ti.tableDesc(), r.checkOrds, rowVals[:n],
212+
); err != nil {
213+
return err
214+
}
215+
rowVals = rowVals[n:]
216+
}
217+
205218
// Create a set of partial index IDs to not write to. Indexes should not be
206219
// written to when they are partial indexes and the row does not satisfy the
207220
// predicate. This set is passed as a parameter to tableInserter.row below.
208221
var pm row.PartialIndexUpdateHelper
209222
if n := len(r.ti.tableDesc().PartialIndexes()); n > 0 {
210-
offset := len(r.insertCols) + r.checkOrds.Len()
211-
partialIndexPutVals := rowVals[offset : offset+n]
212-
213-
err := pm.Init(partialIndexPutVals, nil /* partialIndexDelVals */, r.ti.tableDesc())
223+
err := pm.Init(rowVals[:n], nil /* partialIndexDelVals */, r.ti.tableDesc())
214224
if err != nil {
215225
return err
216226
}
227+
rowVals = rowVals[n:]
217228
}
218229

219-
// Verify the CHECK constraint results, if any.
220-
if n := r.checkOrds.Len(); n > 0 {
221-
// CHECK constraint results are after the insert columns.
222-
offset := len(r.insertCols)
223-
checkVals := rowVals[offset : offset+n]
224-
if err := checkMutationInput(
225-
params.ctx, params.p.EvalContext(), &params.p.semaCtx, params.p.SessionData(),
226-
r.ti.tableDesc(), r.checkOrds, checkVals,
227-
); err != nil {
228-
return err
229-
}
230+
// Keep track of the vector index partitions to update, as well as the
231+
// quantized vectors. This information is passed to tableInserter.row below.
232+
// Input is one partition key per vector index followed by one quantized vector
233+
// per index.
234+
var vh row.VectorIndexUpdateHelper
235+
if n := len(r.ti.tableDesc().VectorIndexes()); n > 0 {
236+
vh.InitForPut(rowVals[:n], rowVals[n:n*2], r.ti.tableDesc())
237+
rowVals = rowVals[n*2:]
230238
}
231239

232240
// Error out the insert if the enforce_home_region session setting is on and
@@ -236,7 +244,7 @@ func (r *insertRun) processSourceRow(params runParams, rowVals tree.Datums) erro
236244
}
237245

238246
// Queue the insert in the KV batch.
239-
if err := r.ti.row(params.ctx, insertVals, pm, r.traceKV); err != nil {
247+
if err := r.ti.row(params.ctx, insertVals, pm, vh, r.traceKV); err != nil {
240248
return err
241249
}
242250

‎pkg/sql/logictest/logic.go

+25-11
Original file line numberDiff line numberDiff line change
@@ -319,16 +319,17 @@ import (
319319
// asynchronously, subsequent queries that depend on the state of
320320
// the query should be run with the "retry" option to ensure
321321
// deterministic test results.
322-
// - kvtrace: runs the query and compares against the results of the
323-
// kv operations trace of the query. kvtrace optionally accepts
324-
// arguments of the form kvtrace(op,op,...). Op is one of
325-
// the accepted k/v arguments such as 'CPut', 'Scan' etc. It
326-
// also accepts arguments of the form 'prefix=...'. For example,
327-
// if kvtrace(CPut,Del,prefix=/Table/54,prefix=/Table/55), the
328-
// results will be filtered to contain messages starting with
329-
// CPut /Table/54, CPut /Table/55, Del /Table/54, Del /Table/55.
330-
// Tenant IDs do not need to be included in prefixes and will be
331-
// removed from results. Cannot be combined with noticetrace.
322+
// - kvtrace: runs the query and compares against the results of the kv
323+
// operations trace of the query. kvtrace optionally accepts arguments of the
324+
// form kvtrace(op,op,...). Op is one of the accepted k/v arguments such as
325+
// 'CPut', 'Scan' etc. It also accepts arguments of the form 'prefix=...'. For
326+
// example, if kvtrace(CPut,Del,prefix=/Table/54,prefix=/Table/55), the results
327+
// will be filtered to contain messages starting with CPut /Table/54, CPut
328+
// /Table/55, Del /Table/54, Del /Table/55. The 'redactbytes' will redact the
329+
// contents of /BYTES/ values to prevent test flakiness in the presence of
330+
// nondeterminism and/or processor architecture differences. Tenant IDs do not
331+
// need to be included in prefixes and will be removed from results. Cannot be
332+
// combined with noticetrace.
332333
// - noticetrace: runs the query and compares only the notices that
333334
// appear. Cannot be combined with kvtrace.
334335
// - nodeidx=N: runs the query on node N of the cluster.
@@ -944,6 +945,10 @@ type logicQuery struct {
944945
// the particular operation types to filter on, such as CPut or Del.
945946
kvOpTypes []string
946947
keyPrefixFilters []string
948+
// kvtraceRedactBytes can only be used when kvtrace is true. When active, BYTES
949+
// values in keys are expunged from output (to prevent test failures where the
950+
// BYTES value is nondeterministic or architecture dependent).
951+
kvtraceRedactBytes bool
947952

948953
// nodeIdx determines which node on the cluster to execute a query on for the given query.
949954
nodeIdx int
@@ -2835,6 +2840,8 @@ func (t *logicTest) processSubtest(
28352840
matched = "/Tenant/%" + matched
28362841
}
28372842
query.keyPrefixFilters = append(query.keyPrefixFilters, matched)
2843+
} else if c == "redactbytes" {
2844+
query.kvtraceRedactBytes = true
28382845
} else if isAllowedKVOp(c) {
28392846
query.kvOpTypes = append(query.kvOpTypes, c)
28402847
} else {
@@ -2872,6 +2879,7 @@ func (t *logicTest) processSubtest(
28722879
query.kvtrace = true
28732880
query.kvOpTypes = nil
28742881
query.keyPrefixFilters = nil
2882+
query.kvtraceRedactBytes = false
28752883

28762884
case "noticetrace":
28772885
query.noticetrace = true
@@ -3017,7 +3025,13 @@ func (t *logicTest) processSubtest(
30173025

30183026
projection := `message`
30193027
if len(t.tenantApps) != 0 || t.cluster.StartedDefaultTestTenant() {
3020-
projection = `regexp_replace(message, '/Tenant/\d+', '')`
3028+
projection = `regexp_replace(message, '/Tenant/\d+', '', 'g')`
3029+
}
3030+
if query.kvtraceRedactBytes {
3031+
projection = fmt.Sprintf(
3032+
`regexp_replace(%s, '/BYTES/0x[abcdef\d]+', '/BYTES/:redacted:', 'g')`,
3033+
projection,
3034+
)
30213035
}
30223036
queryPrefix := fmt.Sprintf(`SELECT %s FROM [SHOW KV TRACE FOR SESSION] `, projection)
30233037
buildQuery := func(ops []string, keyFilters []string) string {

0 commit comments

Comments
 (0)