Skip to content

Commit 07b1421

Browse files
craig[bot]angles-n-daemonsandyyang890
committed
142996: structlogging: restructure hot range logger for testability r=angles-n-daemons a=angles-n-daemons structlogging: restructure hot range logger for testability This change does a few things to improve the testability of the hot ranges logger. The includes: __Logger__: * The introduction of a shouldLog function, which determines whether the system should log or not. * The breakout of the logging action into its own function. __Tests__: * The addition of a setup and teardown utility for the hot ranage logger. * The breakout of the default case and a timed case. * Tests for hot ranges which exist in the system to start. Fixes: #142995 Epic: CRDB-43150 143467: changefeedccl: update TestChangefeedLaggingSpanCheckpointing r=aerfrei,asg0451 a=andyyang890 This patch updates `TestChangefeedLaggingSpanCheckpointing` to ensure we checkpoint spans at at least two different timestamps. This patch also fixes the issue where the `OnRangeFeedStart` kvfeed testing knob was not being invoked in production code. Informs #137692 Fixes #143377 Release note: None Co-authored-by: Brian Dillmann <[email protected]> Co-authored-by: Andy Yang <[email protected]>
3 parents 1c93e8a + d13ad97 + 1668971 commit 07b1421

File tree

6 files changed

+161
-148
lines changed

6 files changed

+161
-148
lines changed

pkg/ccl/changefeedccl/changefeed_test.go

+57-27
Original file line numberDiff line numberDiff line change
@@ -2354,8 +2354,8 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) {
23542354
}
23552355
}
23562356

2357-
// Test checkpointing when the highwater does not move due to some issues with
2358-
// specific spans lagging behind
2357+
// TestChangefeedLaggingSpanCheckpointing tests checkpointing when the highwater
2358+
// does not advance due to specific spans lagging behind.
23592359
func TestChangefeedLaggingSpanCheckpointing(t *testing.T) {
23602360
defer leaktest.AfterTest(t)()
23612361
defer log.Scope(t).Close(t)
@@ -2369,7 +2369,7 @@ func TestChangefeedLaggingSpanCheckpointing(t *testing.T) {
23692369
DistSQL.(*execinfra.TestingKnobs).
23702370
Changefeed.(*TestingKnobs)
23712371

2372-
// Initialize table with multiple ranges.
2372+
// Initialize table with 20 ranges.
23732373
sqlDB.Exec(t, `
23742374
CREATE TABLE foo (key INT PRIMARY KEY);
23752375
INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000);
@@ -2381,28 +2381,39 @@ func TestChangefeedLaggingSpanCheckpointing(t *testing.T) {
23812381
changefeedbase.SpanCheckpointInterval.Override(
23822382
context.Background(), &s.ClusterSettings().SV, 10*time.Millisecond)
23832383
changefeedbase.SpanCheckpointMaxBytes.Override(
2384-
context.Background(), &s.ClusterSettings().SV, 100<<20)
2384+
context.Background(), &s.ClusterSettings().SV, 100<<20 /* 100 MiB */)
23852385
changefeedbase.SpanCheckpointLagThreshold.Override(
23862386
context.Background(), &s.ClusterSettings().SV, 10*time.Millisecond)
23872387

2388-
// We'll start changefeed with the cursor.
2388+
// We'll start the changefeed with the cursor set to the current time (not insert time).
2389+
// NB: The changefeed created in this test doesn't actually send any message events.
23892390
var tsStr string
23902391
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp() from foo`).Scan(&tsStr)
23912392
cursor := parseTimeToHLC(t, tsStr)
2393+
t.Logf("cursor: %v", cursor)
23922394

23932395
// Rangefeed will skip some of the checkpoints to simulate lagging spans.
23942396
var laggingSpans roachpb.SpanGroup
2395-
numLagging := 0
2397+
nonLaggingSpans := make(map[string]int)
2398+
var numLagging, numNonLagging int
23962399
knobs.FeedKnobs.ShouldSkipCheckpoint = func(checkpoint *kvpb.RangeFeedCheckpoint) bool {
2397-
// Skip spans that were skipped before; otherwise skip some spans.
2398-
seenBefore := laggingSpans.Encloses(checkpoint.Span)
2399-
if seenBefore || (numLagging < 5 && rnd.Int()%3 == 0) {
2400-
if !seenBefore {
2401-
laggingSpans.Add(checkpoint.Span)
2402-
numLagging++
2403-
}
2400+
// Skip spans that we already picked to be lagging.
2401+
if laggingSpans.Encloses(checkpoint.Span) {
2402+
return true /* skip */
2403+
}
2404+
// Skip additional updates for some non-lagging spans so that we can
2405+
// have more than one timestamp in the checkpoint.
2406+
if i, ok := nonLaggingSpans[checkpoint.Span.String()]; ok {
2407+
return i%3 == 0
2408+
}
2409+
// Ensure we have a few spans that are lagging at the cursor.
2410+
if numLagging == 0 || (numLagging < 5 && rnd.Int()%3 == 0) {
2411+
laggingSpans.Add(checkpoint.Span)
2412+
numLagging++
24042413
return true /* skip */
24052414
}
2415+
nonLaggingSpans[checkpoint.Span.String()] = numNonLagging
2416+
numNonLagging++
24062417
return false
24072418
}
24082419

@@ -2419,64 +2430,83 @@ func TestChangefeedLaggingSpanCheckpointing(t *testing.T) {
24192430
return job.Progress()
24202431
}
24212432

2422-
// Should eventually checkpoint all spans around the lagging span
2433+
// We should eventually checkpoint some spans that are ahead of the highwater.
2434+
// We'll wait until we have two unique timestamps.
24232435
testutils.SucceedsSoon(t, func() error {
24242436
progress := loadProgress()
2425-
if loadCheckpoint(t, progress) != nil {
2437+
cp := maps.Collect(loadCheckpoint(t, progress).All())
2438+
if len(cp) >= 2 {
24262439
return nil
24272440
}
2428-
return errors.New("waiting for checkpoint")
2441+
return errors.New("waiting for checkpoint with two different timestamps")
24292442
})
24302443

24312444
sqlDB.Exec(t, "PAUSE JOB $1", jobID)
24322445
waitForJobState(sqlDB, t, jobID, jobs.StatePaused)
24332446

24342447
// We expect highwater to be 0 (because we skipped some spans) or exactly cursor
2435-
// (this is mostly due to racy updates sent from aggregators to the frontier.
2448+
// (this is mostly due to racy updates sent from aggregators to the frontier).
24362449
// However, the checkpoint timestamp should be at least at the cursor.
24372450
progress := loadProgress()
2438-
require.True(t, progress.GetHighWater().IsEmpty() || *progress.GetHighWater() == cursor,
2439-
"expected empty highwater or %s, found %s", cursor, progress.GetHighWater())
2451+
require.True(t, progress.GetHighWater().IsEmpty() || progress.GetHighWater().Equal(cursor),
2452+
"expected empty highwater or %s, found %s", cursor, progress.GetHighWater())
24402453
spanLevelCheckpoint := loadCheckpoint(t, progress)
24412454
require.NotNil(t, spanLevelCheckpoint)
2442-
minCheckpointTS := spanLevelCheckpoint.MinTimestamp()
2443-
require.True(t, cursor.LessEq(minCheckpointTS))
2455+
require.True(t, cursor.LessEq(spanLevelCheckpoint.MinTimestamp()))
2456+
2457+
// Construct a reverse index from spans to timestamps.
2458+
spanTimestamps := make(map[string]hlc.Timestamp)
2459+
for ts, spans := range spanLevelCheckpoint.All() {
2460+
for _, s := range spans {
2461+
spanTimestamps[s.String()] = ts
2462+
}
2463+
}
24442464

2465+
var rangefeedStarted bool
24452466
var incorrectCheckpointErr error
24462467
knobs.FeedKnobs.OnRangeFeedStart = func(spans []kvcoord.SpanTimePair) {
2468+
rangefeedStarted = true
2469+
24472470
setErr := func(stp kvcoord.SpanTimePair, expectedTS hlc.Timestamp) {
24482471
incorrectCheckpointErr = errors.Newf(
24492472
"rangefeed for span %s expected to start @%s, started @%s instead",
24502473
stp.Span, expectedTS, stp.StartAfter)
24512474
}
24522475

2476+
// Verify that the start time for each span is correct.
24532477
for _, sp := range spans {
2454-
if laggingSpans.Encloses(sp.Span) {
2455-
if !sp.StartAfter.Equal(cursor) {
2456-
setErr(sp, cursor)
2478+
if checkpointTS := spanTimestamps[sp.Span.String()]; checkpointTS.IsSet() {
2479+
// Any span in the checkpoint should be resumed at its checkpoint timestamp.
2480+
if !sp.StartAfter.Equal(checkpointTS) {
2481+
setErr(sp, checkpointTS)
24572482
}
24582483
} else {
2459-
if !sp.StartAfter.Equal(minCheckpointTS) {
2460-
setErr(sp, minCheckpointTS)
2484+
// Any spans not in the checkpoint should be at the cursor.
2485+
if !sp.StartAfter.Equal(cursor) {
2486+
setErr(sp, cursor)
24612487
}
24622488
}
24632489
}
24642490
}
2491+
knobs.FeedKnobs.ShouldSkipCheckpoint = nil
24652492

24662493
sqlDB.Exec(t, "RESUME JOB $1", jobID)
24672494
waitForJobState(sqlDB, t, jobID, jobs.StateRunning)
24682495

24692496
// Wait until highwater advances past cursor.
24702497
testutils.SucceedsSoon(t, func() error {
24712498
progress := loadProgress()
2472-
if hw := progress.GetHighWater(); hw != nil && cursor.LessEq(*hw) {
2499+
if hw := progress.GetHighWater(); hw != nil && cursor.Less(*hw) {
24732500
return nil
24742501
}
24752502
return errors.New("waiting for checkpoint advance")
24762503
})
24772504

24782505
sqlDB.Exec(t, "PAUSE JOB $1", jobID)
24792506
waitForJobState(sqlDB, t, jobID, jobs.StatePaused)
2507+
// Verify the rangefeed started. This guards against the testing knob
2508+
// not being called, which was happening in earlier versions of the code.
2509+
require.True(t, rangefeedStarted)
24802510
// Verify we didn't see incorrect timestamps when resuming.
24812511
require.NoError(t, incorrectCheckpointErr)
24822512
}

pkg/ccl/changefeedccl/helpers_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
gosql "database/sql"
1212
gojson "encoding/json"
1313
"fmt"
14+
"maps"
1415
"math"
1516
"math/rand"
1617
"net/url"
@@ -868,7 +869,7 @@ func loadCheckpoint(t *testing.T, progress jobspb.Progress) *jobspb.TimestampSpa
868869
if spanLevelCheckpoint.IsEmpty() {
869870
return nil
870871
}
871-
t.Logf("found checkpoint: %#v", spanLevelCheckpoint)
872+
t.Logf("found checkpoint: %v", maps.Collect(spanLevelCheckpoint.All()))
872873
return spanLevelCheckpoint
873874
}
874875

pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,17 @@ func (p rangefeedFactory) Run(ctx context.Context, sink kvevent.Writer, cfg rang
9999
if cfg.RangeObserver != nil {
100100
rfOpts = append(rfOpts, kvcoord.WithRangeObserver(cfg.RangeObserver))
101101
}
102-
rfOpts = append(rfOpts, kvcoord.WithConsumerID(cfg.ConsumerID))
102+
if cfg.ConsumerID != 0 {
103+
rfOpts = append(rfOpts, kvcoord.WithConsumerID(cfg.ConsumerID))
104+
}
103105
if len(cfg.Knobs.RangefeedOptions) != 0 {
104106
rfOpts = append(rfOpts, cfg.Knobs.RangefeedOptions...)
105107
}
106108

107109
g.GoCtx(func(ctx context.Context) error {
110+
if cfg.Knobs.OnRangeFeedStart != nil {
111+
cfg.Knobs.OnRangeFeedStart(cfg.Spans)
112+
}
108113
return p(ctx, cfg.Spans, eventCh, rfOpts...)
109114
})
110115
return g.Wait()

pkg/server/structlogging/BUILD.bazel

-5
Original file line numberDiff line numberDiff line change
@@ -32,25 +32,20 @@ go_test(
3232
deps = [
3333
":structlogging",
3434
"//pkg/base",
35-
"//pkg/keys",
3635
"//pkg/kv/kvserver",
3736
"//pkg/kv/kvserver/allocator/plan",
3837
"//pkg/security/securityassets",
3938
"//pkg/security/securitytest",
4039
"//pkg/server",
41-
"//pkg/sql/catalog/desctestutils",
4240
"//pkg/testutils",
4341
"//pkg/testutils/serverutils",
4442
"//pkg/testutils/skip",
45-
"//pkg/testutils/sqlutils",
4643
"//pkg/testutils/testcluster",
4744
"//pkg/util/leaktest",
4845
"//pkg/util/log",
4946
"//pkg/util/log/eventpb",
5047
"//pkg/util/log/logpb",
5148
"//pkg/util/randutil",
5249
"//pkg/util/syncutil",
53-
"@com_github_cockroachdb_errors//:errors",
54-
"@com_github_stretchr_testify//assert",
5550
],
5651
)

pkg/server/structlogging/hot_ranges_log.go

+51-35
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ func (s *hotRangesLoggingScheduler) start(ctx context.Context, stopper *stop.Sto
8484
})
8585

8686
ticker := time.NewTicker(TelemetryHotRangesStatsInterval.Get(&s.st.SV))
87-
defer ticker.Stop()
8887

8988
for {
9089
select {
@@ -93,43 +92,60 @@ func (s *hotRangesLoggingScheduler) start(ctx context.Context, stopper *stop.Sto
9392
case <-ctx.Done():
9493
return
9594
case <-ticker.C:
96-
if !TelemetryHotRangesStatsEnabled.Get(&s.st.SV) {
97-
continue
98-
}
99-
resp, err := s.sServer.HotRangesV2(ctx,
100-
&serverpb.HotRangesRequest{NodeID: "local", PageSize: ReportTopHottestRanges})
101-
if err != nil {
102-
log.Warningf(ctx, "failed to get hot ranges: %s", err)
103-
continue
104-
}
105-
var events []logpb.EventPayload
106-
ts := timeutil.Now().UnixNano()
107-
108-
for _, r := range resp.Ranges {
109-
hrEvent := &eventpb.HotRangesStats{
110-
RangeID: int64(r.RangeID),
111-
Qps: r.QPS,
112-
Databases: r.Databases,
113-
Tables: r.Tables,
114-
Indexes: r.Indexes,
115-
SchemaName: r.SchemaName,
116-
CPUTimePerSecond: r.CPUTimePerSecond,
117-
ReadBytesPerSecond: r.ReadBytesPerSecond,
118-
WriteBytesPerSecond: r.WriteBytesPerSecond,
119-
ReadsPerSecond: r.ReadsPerSecond,
120-
WritesPerSecond: r.WritesPerSecond,
121-
LeaseholderNodeID: int32(r.LeaseholderNodeID),
122-
CommonEventDetails: logpb.CommonEventDetails{
123-
Timestamp: ts,
124-
},
125-
}
126-
events = append(events, hrEvent)
127-
}
128-
logutil.LogEventsWithDelay(ctx, events, stopper, TelemetryHotRangesStatsLoggingDelay.Get(&s.st.SV))
129-
95+
s.maybeLogHotRanges(ctx, stopper)
13096
case <-intervalChangedChan:
13197
ticker.Reset(TelemetryHotRangesStatsInterval.Get(&s.st.SV))
13298
}
13399
}
134100
})
135101
}
102+
103+
// maybeLogHotRanges is a small helper function which couples the
104+
// functionality of checking whether to log and logging.
105+
func (s *hotRangesLoggingScheduler) maybeLogHotRanges(ctx context.Context, stopper *stop.Stopper) {
106+
if s.shouldLog() {
107+
s.logHotRanges(ctx, stopper)
108+
}
109+
}
110+
111+
// shouldLog checks the below conditions to see whether it should emit logs.
112+
// - Is the cluster setting server.telemetry.hot_ranges_stats.enabled true?
113+
func (s *hotRangesLoggingScheduler) shouldLog() bool {
114+
return TelemetryHotRangesStatsEnabled.Get(&s.st.SV)
115+
}
116+
117+
// logHotRanges collects the hot ranges from this node's status server and
118+
// sends them to the TELEMETRY log channel.
119+
func (s *hotRangesLoggingScheduler) logHotRanges(ctx context.Context, stopper *stop.Stopper) {
120+
resp, err := s.sServer.HotRangesV2(ctx,
121+
&serverpb.HotRangesRequest{NodeID: "local", PageSize: ReportTopHottestRanges})
122+
if err != nil {
123+
log.Warningf(ctx, "failed to get hot ranges: %s", err)
124+
return
125+
}
126+
127+
var events []logpb.EventPayload
128+
ts := timeutil.Now().UnixNano()
129+
130+
for _, r := range resp.Ranges {
131+
hrEvent := &eventpb.HotRangesStats{
132+
RangeID: int64(r.RangeID),
133+
Qps: r.QPS,
134+
Databases: r.Databases,
135+
Tables: r.Tables,
136+
Indexes: r.Indexes,
137+
SchemaName: r.SchemaName,
138+
CPUTimePerSecond: r.CPUTimePerSecond,
139+
ReadBytesPerSecond: r.ReadBytesPerSecond,
140+
WriteBytesPerSecond: r.WriteBytesPerSecond,
141+
ReadsPerSecond: r.ReadsPerSecond,
142+
WritesPerSecond: r.WritesPerSecond,
143+
LeaseholderNodeID: int32(r.LeaseholderNodeID),
144+
CommonEventDetails: logpb.CommonEventDetails{
145+
Timestamp: ts,
146+
},
147+
}
148+
events = append(events, hrEvent)
149+
}
150+
logutil.LogEventsWithDelay(ctx, events, stopper, TelemetryHotRangesStatsLoggingDelay.Get(&s.st.SV))
151+
}

0 commit comments

Comments
 (0)