Skip to content

release-25.1: sql: improve observability of index merge timestamp #143059

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -1917,6 +1917,15 @@ func ValidateForwardIndexes(
idx.GetID())
indexName = idx.GetName()
}
if !idx.IsUnique() {
// For non-unique indexes, the table row count must match the index
// key count. Unlike unique indexes, we don't filter out any rows,
// so every row must have a corresponding entry in the index. A
// mismatch indicates an assertion failure.
return errors.AssertionFailedf(
"validation of non-unique index %s failed: expected %d rows, found %d",
idx.GetName(), errors.Safe(expectedCount), errors.Safe(idxLen))
}
// TODO(vivek): find the offending row and include it in the error.
return pgerror.WithConstraintName(pgerror.Newf(pgcode.UniqueViolation,
"duplicate key value violates unique constraint %q",
Expand Down Expand Up @@ -2977,8 +2986,15 @@ func indexTruncateInTxn(
// part of a restore, then timestamp will be too old and the job will fail. On
// the next resume, a mergeTimestamp newer than the GC time will be picked and
// the job can continue.
func getMergeTimestamp(clock *hlc.Clock) hlc.Timestamp {
return clock.Now()
func getMergeTimestamp(ctx context.Context, clock *hlc.Clock) hlc.Timestamp {
// Use the current timestamp plus the maximum allowed offset to account for
// potential clock skew across nodes. The chosen timestamp must be greater
// than all commit timestamps used so far. This may result in seeing rows
// that are already present in the index being merged, but that’s fine as
// they will be treated as no-ops.
ts := clock.Now().AddDuration(clock.MaxOffset())
log.Infof(ctx, "merging all keys in temporary index before time %v", ts)
return ts
}

func (sc *SchemaChanger) distIndexMerge(
Expand All @@ -2989,8 +3005,7 @@ func (sc *SchemaChanger) distIndexMerge(
fractionScaler *multiStageFractionScaler,
) error {

mergeTimestamp := getMergeTimestamp(sc.clock)
log.Infof(ctx, "merging all keys in temporary index before time %v", mergeTimestamp)
mergeTimestamp := getMergeTimestamp(ctx, sc.clock)

// Gather the initial resume spans for the merge process.
progress, err := extractMergeProgress(sc.job, tableDesc, addedIndexes, temporaryIndexes)
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ func (m *Manager) WaitForOneVersion(
return nil, err
}
if detail.count == 0 {
log.Infof(ctx, "all leases have expired at %v: desc=%v", now, descs)
break
}
if detail.count != lastCount {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/mvcc_backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (im *IndexBackfillerMergePlanner) MergeIndexes(
)
return tracker.SetMergeProgress(ctx, progress)
}
mergeTimeStamp := getMergeTimestamp(im.execCfg.Clock)
mergeTimeStamp := getMergeTimestamp(ctx, im.execCfg.Clock)
protectedTimestampCleaner := im.execCfg.ProtectedTimestampManager.TryToProtectBeforeGC(ctx, job, descriptor, mergeTimeStamp)
defer func() {
cleanupError := protectedTimestampCleaner(ctx)
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/schemachanger/scexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ go_test(
"//pkg/util/timeutil",
"@com_github_golang_mock//gomock",
"@com_github_stretchr_testify//require",
"//pkg/sql/schemachanger/scplan",
"//pkg/testutils/testcluster",
"//pkg/util/ctxgroup",
],
)

Expand Down
87 changes: 87 additions & 0 deletions pkg/sql/schemachanger/scexec/exec_backfill_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ package scexec_test

import (
"context"
"fmt"
"math/rand"
"sync/atomic"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand All @@ -19,8 +22,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scdeps/sctestdeps"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scop"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scplan"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -332,3 +338,84 @@ func TestExecBackfiller(t *testing.T) {
}

}

// TestMergeTimestampSkew will ensure we do not miss rows during the merge phase
// if the clocks are skewed.
func TestMergeTimestampSkew(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
var hookEnabled atomic.Bool
var ingestFn func() error
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{
BeforeStage: func(p scplan.Plan, stageIdx int) error {
if !hookEnabled.Load() {
return nil
}
return ingestFn()
},
},
},
},
})
defer tc.Stopper().Stop(ctx)
db1 := tc.ApplicationLayer(0).SQLConn(t, serverutils.DBName("defaultdb"))
r1 := sqlutils.MakeSQLRunner(db1)

// Add a second node so that we can use a separate clock for it.
manualClock2 := hlc.NewHybridManualClock()
tc.AddAndStartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
WallClock: manualClock2,
},
},
})
db2 := tc.ApplicationLayer(1).SQLConn(t, serverutils.DBName("defaultdb"))
r2 := sqlutils.MakeSQLRunner(db2)

t.Run("create_non_unique_index", func(t *testing.T) {
r1.ExecMultiple(t,
`ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1, constraints = '[-node2]'`,
"CREATE TABLE t_idx(n int)",
"INSERT INTO t_idx(n) SELECT * FROM generate_series(1, 100)",
)
additionalInserts := 0

// Each stage we will insert a new row from a different node. That node will
// use a skewed clock.
ingestFn = func() error {
manualClock2.Increment(10000000)
keyVal := 1000 + additionalInserts
additionalInserts++
r2.Exec(t, fmt.Sprintf("INSERT INTO t_idx(n) VALUES (%d)", keyVal))
return nil
}
hookEnabled.Store(true)
defer hookEnabled.Store(false)

grp := ctxgroup.WithContext(ctx)
grp.GoCtx(func(ctx context.Context) error {
r1.Exec(t, "CREATE INDEX i1 ON t_idx (n)")
hookEnabled.Store(false)
return nil
})
require.NoError(t, grp.Wait())

// Compare row count with between new index and the primary key
for _, q := range []string{
`SELECT count(1) FROM t_idx@i1`,
`SELECT count(1) FROM t_idx@t_idx_pkey`,
} {
var rowCount int
res := r1.QueryRow(t, q)
res.Scan(&rowCount)
expectedRowCount := 100 + additionalInserts
require.Equal(t, expectedRowCount, rowCount, "post create index row count mismatch in query %q", q)
}
})
}
6 changes: 2 additions & 4 deletions pkg/sql/schemachanger/scexec/exec_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ import (
"github.com/cockroachdb/errors"
)

func executeValidateUniqueIndex(
ctx context.Context, deps Dependencies, op *scop.ValidateIndex,
) error {
func executeValidateIndex(ctx context.Context, deps Dependencies, op *scop.ValidateIndex) error {
descs, err := deps.Catalog().MustReadImmutableDescriptors(ctx, op.TableID)
if err != nil {
return err
Expand Down Expand Up @@ -112,7 +110,7 @@ func executeValidationOps(ctx context.Context, deps Dependencies, ops []scop.Op)
func executeValidationOp(ctx context.Context, deps Dependencies, op scop.Op) (err error) {
switch op := op.(type) {
case *scop.ValidateIndex:
if err = executeValidateUniqueIndex(ctx, deps, op); err != nil {
if err = executeValidateIndex(ctx, deps, op); err != nil {
if !scerrors.HasSchemaChangerUserError(err) {
return errors.Wrapf(err, "%T: %v", op, op)
}
Expand Down