Skip to content

Commit ab85aff

Browse files
structlogging: restructure hot range logger for testability
This change does a few things to improve the testability of the hot ranges logger. The includes: __Logger__: * The introduction of a shouldLog function, which determines whether the system should log or not. * The breakout of the logging action into its own function. __Tests__: * The addition of a setup and teardown utility for the hot ranage logger. * The breakout of the default case and a timed case. Fixes: #142995 Epic: CRDB-43150
1 parent d404b01 commit ab85aff

File tree

3 files changed

+151
-115
lines changed

3 files changed

+151
-115
lines changed

pkg/server/structlogging/BUILD.bazel

-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ go_test(
5050
"//pkg/util/log/logpb",
5151
"//pkg/util/randutil",
5252
"//pkg/util/syncutil",
53-
"@com_github_cockroachdb_errors//:errors",
5453
"@com_github_stretchr_testify//assert",
5554
],
5655
)

pkg/server/structlogging/hot_ranges_log.go

+72-38
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ import (
2424
// ReportTopHottestRanges limits the number of ranges to be reported per iteration
2525
const ReportTopHottestRanges = 5
2626

27+
// HotRangeLogManualTicker is a channel that can be used to force the hot range
28+
// the logging task to tick.
29+
// Within normal operation, there will only be one function listening to this
30+
// ticker, but in the tests there may be multiple "nodes" within the process.
31+
// Tests then will need to send multiple requests, to trigger all the nodes.
32+
var HotRangeLogManualTicker = make(chan struct{}, 0)
33+
2734
var TelemetryHotRangesStatsInterval = settings.RegisterDurationSetting(
2835
settings.ApplicationLevel,
2936
"server.telemetry.hot_ranges_stats.interval",
@@ -50,9 +57,10 @@ var TelemetryHotRangesStatsLoggingDelay = settings.RegisterDurationSetting(
5057
// hotRangesLoggingScheduler is responsible for logging index usage stats
5158
// on a scheduled interval.
5259
type hotRangesLoggingScheduler struct {
53-
ie sql.InternalExecutor
54-
sServer serverpb.TenantStatusServer
55-
st *cluster.Settings
60+
ie sql.InternalExecutor
61+
sServer serverpb.TenantStatusServer
62+
st *cluster.Settings
63+
lastLogged time.Time
5664
}
5765

5866
// StartHotRangesLoggingScheduler starts the capture index usage statistics logging scheduler.
@@ -84,7 +92,6 @@ func (s *hotRangesLoggingScheduler) start(ctx context.Context, stopper *stop.Sto
8492
})
8593

8694
ticker := time.NewTicker(TelemetryHotRangesStatsInterval.Get(&s.st.SV))
87-
defer ticker.Stop()
8895

8996
for {
9097
select {
@@ -93,43 +100,70 @@ func (s *hotRangesLoggingScheduler) start(ctx context.Context, stopper *stop.Sto
93100
case <-ctx.Done():
94101
return
95102
case <-ticker.C:
96-
if !TelemetryHotRangesStatsEnabled.Get(&s.st.SV) {
97-
continue
98-
}
99-
resp, err := s.sServer.HotRangesV2(ctx,
100-
&serverpb.HotRangesRequest{NodeID: "local", PageSize: ReportTopHottestRanges})
101-
if err != nil {
102-
log.Warningf(ctx, "failed to get hot ranges: %s", err)
103-
continue
104-
}
105-
var events []logpb.EventPayload
106-
ts := timeutil.Now().UnixNano()
107-
108-
for _, r := range resp.Ranges {
109-
hrEvent := &eventpb.HotRangesStats{
110-
RangeID: int64(r.RangeID),
111-
Qps: r.QPS,
112-
Databases: r.Databases,
113-
Tables: r.Tables,
114-
Indexes: r.Indexes,
115-
SchemaName: r.SchemaName,
116-
CPUTimePerSecond: r.CPUTimePerSecond,
117-
ReadBytesPerSecond: r.ReadBytesPerSecond,
118-
WriteBytesPerSecond: r.WriteBytesPerSecond,
119-
ReadsPerSecond: r.ReadsPerSecond,
120-
WritesPerSecond: r.WritesPerSecond,
121-
LeaseholderNodeID: int32(r.LeaseholderNodeID),
122-
CommonEventDetails: logpb.CommonEventDetails{
123-
Timestamp: ts,
124-
},
125-
}
126-
events = append(events, hrEvent)
127-
}
128-
logutil.LogEventsWithDelay(ctx, events, stopper, TelemetryHotRangesStatsLoggingDelay.Get(&s.st.SV))
129-
103+
s.maybeLogHotRanges(ctx, stopper)
104+
case <-HotRangeLogManualTicker:
105+
s.maybeLogHotRanges(ctx, stopper)
130106
case <-intervalChangedChan:
131107
ticker.Reset(TelemetryHotRangesStatsInterval.Get(&s.st.SV))
132108
}
133109
}
134110
})
135111
}
112+
113+
// maybeLogHotRanges is a small helper function which couples the
114+
// functionality of checking whether to log and logging, with setting the
115+
// lastLogged timestamp.
116+
func (s *hotRangesLoggingScheduler) maybeLogHotRanges(ctx context.Context, stopper *stop.Stopper) {
117+
if s.shouldLog() {
118+
s.logHotRanges(ctx, stopper)
119+
}
120+
}
121+
122+
// shouldLog checks the below conditions to see whether it should emit logs.
123+
// - Is the cluster setting server.telemetry.hot_ranges_stats.enabled true?
124+
func (s *hotRangesLoggingScheduler) shouldLog() bool {
125+
if !TelemetryHotRangesStatsEnabled.Get(&s.st.SV) {
126+
return false
127+
}
128+
return true
129+
}
130+
131+
// logHotRanges collects the hot ranges from this node's status server and
132+
// sends them to the TELEMETRY log channel.
133+
func (s *hotRangesLoggingScheduler) logHotRanges(ctx context.Context, stopper *stop.Stopper) {
134+
// early exit conditions
135+
if !TelemetryHotRangesStatsEnabled.Get(&s.st.SV) {
136+
return
137+
}
138+
resp, err := s.sServer.HotRangesV2(ctx,
139+
&serverpb.HotRangesRequest{NodeID: "local", PageSize: ReportTopHottestRanges})
140+
if err != nil {
141+
log.Warningf(ctx, "failed to get hot ranges: %s", err)
142+
return
143+
}
144+
145+
var events []logpb.EventPayload
146+
ts := timeutil.Now().UnixNano()
147+
148+
for _, r := range resp.Ranges {
149+
hrEvent := &eventpb.HotRangesStats{
150+
RangeID: int64(r.RangeID),
151+
Qps: r.QPS,
152+
Databases: r.Databases,
153+
Tables: r.Tables,
154+
Indexes: r.Indexes,
155+
SchemaName: r.SchemaName,
156+
CPUTimePerSecond: r.CPUTimePerSecond,
157+
ReadBytesPerSecond: r.ReadBytesPerSecond,
158+
WriteBytesPerSecond: r.WriteBytesPerSecond,
159+
ReadsPerSecond: r.ReadsPerSecond,
160+
WritesPerSecond: r.WritesPerSecond,
161+
LeaseholderNodeID: int32(r.LeaseholderNodeID),
162+
CommonEventDetails: logpb.CommonEventDetails{
163+
Timestamp: ts,
164+
},
165+
}
166+
events = append(events, hrEvent)
167+
}
168+
logutil.LogEventsWithDelay(ctx, events, stopper, TelemetryHotRangesStatsLoggingDelay.Get(&s.st.SV))
169+
}

pkg/server/structlogging/hot_ranges_log_test.go

+79-76
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,24 @@ package structlogging_test
88
import (
99
"context"
1010
"encoding/json"
11+
"errors"
1112
"regexp"
13+
"slices"
1214
"testing"
1315
"time"
1416

1517
"github.com/cockroachdb/cockroach/pkg/base"
16-
"github.com/cockroachdb/cockroach/pkg/keys"
1718
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
1819
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/plan"
1920
"github.com/cockroachdb/cockroach/pkg/server/structlogging"
20-
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
2121
"github.com/cockroachdb/cockroach/pkg/testutils"
2222
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
2323
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
24-
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
2524
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2625
"github.com/cockroachdb/cockroach/pkg/util/log"
2726
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
2827
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
2928
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
30-
"github.com/cockroachdb/errors"
3129
"github.com/stretchr/testify/assert"
3230
)
3331

@@ -74,21 +72,12 @@ func (spy *hotRangesLogSpy) Reset() {
7472
spy.mu.logs = nil
7573
}
7674

77-
// TestHotRangesStatsTenants tests that hot ranges stats are logged per node.
78-
// The test will ensure each node contains 5 distinct range replicas for hot
79-
// ranges logging. Each node should thus log 5 distinct range ids.
80-
func TestHotRangesStats(t *testing.T) {
81-
defer leaktest.AfterTest(t)()
75+
func setupHotRangesLogTest(
76+
t *testing.T, ctx context.Context,
77+
) (serverutils.ApplicationLayerInterface, *hotRangesLogSpy, func()) {
8278
sc := log.ScopeWithoutShowLogs(t)
83-
defer sc.Close(t)
84-
85-
skip.UnderRace(t)
86-
87-
ctx := context.Background()
88-
spy := hotRangesLogSpy{t: t}
89-
defer log.InterceptWith(ctx, &spy)()
90-
91-
tc := serverutils.StartCluster(t, 3, base.TestClusterArgs{
79+
spy := &hotRangesLogSpy{t: t}
80+
tc := serverutils.StartCluster(t, 1, base.TestClusterArgs{
9281
ReplicationMode: base.ReplicationManual,
9382
ServerArgs: base.TestServerArgs{
9483
DefaultTestTenant: base.TestControlsTenantsExplicitly,
@@ -101,72 +90,86 @@ func TestHotRangesStats(t *testing.T) {
10190
},
10291
},
10392
})
104-
defer tc.Stopper().Stop(ctx)
105-
106-
db := tc.ServerConn(0)
107-
sqlutils.CreateTable(
108-
t, db, "foo",
109-
"k INT PRIMARY KEY, v INT",
110-
300,
111-
sqlutils.ToRowFn(sqlutils.RowIdxFn, sqlutils.RowModuloFn(2)),
112-
)
113-
114-
// Ensure both of node 1 and 2 have 5 distinct replicas from the table.
115-
tableDesc := desctestutils.TestingGetPublicTableDescriptor(
116-
tc.Server(0).DB(), keys.SystemSQLCodec, "test", "foo")
117-
tc.SplitTable(t, tableDesc, []serverutils.SplitPoint{
118-
{TargetNodeIdx: 1, Vals: []interface{}{100}},
119-
{TargetNodeIdx: 1, Vals: []interface{}{120}},
120-
{TargetNodeIdx: 1, Vals: []interface{}{140}},
121-
{TargetNodeIdx: 1, Vals: []interface{}{160}},
122-
{TargetNodeIdx: 1, Vals: []interface{}{180}},
123-
{TargetNodeIdx: 2, Vals: []interface{}{200}},
124-
{TargetNodeIdx: 2, Vals: []interface{}{220}},
125-
{TargetNodeIdx: 2, Vals: []interface{}{240}},
126-
{TargetNodeIdx: 2, Vals: []interface{}{260}},
127-
{TargetNodeIdx: 2, Vals: []interface{}{280}},
128-
})
12993

130-
// query table
131-
for i := 0; i < 300; i++ {
132-
db := tc.ServerConn(0)
133-
sqlutils.MakeSQLRunner(db).Query(t, `SELECT * FROM test.foo`)
94+
leakChecker := leaktest.AfterTest(t)
95+
logInterceptor := log.InterceptWith(ctx, spy)
96+
stopper := tc.Stopper()
97+
teardown := func() {
98+
stopper.Stop(ctx)
99+
sc.Close(t)
100+
logInterceptor()
101+
leakChecker()
134102
}
135103

136-
// Skip node 1 since it will contain many more replicas.
137-
// We only need to check nodes 2 and 3 to see that the nodes are logging their local hot ranges.
138-
rangeIDs := make(map[int64]struct{})
139-
for _, i := range []int{1, 2} {
140-
spy.Reset()
141-
ts := tc.ApplicationLayer(i)
142-
structlogging.TelemetryHotRangesStatsEnabled.Override(ctx, &ts.ClusterSettings().SV, true)
143-
structlogging.TelemetryHotRangesStatsInterval.Override(ctx, &ts.ClusterSettings().SV, time.Second)
144-
structlogging.TelemetryHotRangesStatsLoggingDelay.Override(ctx, &ts.ClusterSettings().SV, 0*time.Millisecond)
145-
146-
testutils.SucceedsSoon(t, func() error {
147-
logs := spy.Logs()
148-
if len(logs) < 5 {
149-
return errors.New("waiting for hot ranges to be logged")
150-
}
104+
ts := tc.ApplicationLayer(0)
105+
return ts, spy, teardown
106+
}
151107

152-
return nil
153-
})
154-
structlogging.TelemetryHotRangesStatsInterval.Override(ctx, &ts.ClusterSettings().SV, 1*time.Hour)
108+
// TestHotRangesStatsTenants tests that hot ranges stats are logged per node.
109+
// The test will ensure each node contains 5 distinct range replicas for hot
110+
// ranges logging. Each node should thus log 5 distinct range ids.
111+
func TestHotRangesStats(t *testing.T) {
112+
skip.UnderRace(t)
113+
ctx := context.Background()
114+
ts, spy, teardown := setupHotRangesLogTest(t, ctx)
115+
defer teardown()
116+
117+
structlogging.TelemetryHotRangesStatsEnabled.Override(ctx, &ts.ClusterSettings().SV, true)
118+
structlogging.TelemetryHotRangesStatsInterval.Override(ctx, &ts.ClusterSettings().SV, time.Millisecond)
119+
structlogging.TelemetryHotRangesStatsLoggingDelay.Override(ctx, &ts.ClusterSettings().SV, 0*time.Millisecond)
120+
121+
structlogging.HotRangeLogManualTicker <- struct{}{}
122+
testutils.SucceedsSoon(t, func() error {
123+
logs := spy.Logs()
155124

156-
// Get first 5 logs since the logging loop may have fired multiple times.
157-
// We should have gotten 5 distinct range ids, one for each split point above.
158-
logs := spy.Logs()[:5]
125+
// Depend on a range which we don't exist to go anywhere.
159126
for _, l := range logs {
160-
assert.Equal(t, l.Databases, []string{"‹test›"})
161-
assert.Equal(t, l.Tables, []string{"‹foo›"})
162-
assert.Equal(t, l.Indexes, []string{"‹foo_pkey›"})
163-
_, ok := rangeIDs[l.RangeID]
164-
if ok {
165-
t.Fatalf(`Logged ranges should be unique per node for this test.
166-
found range on node %d and node %d: %s %s %s %s %d`, i, l.LeaseholderNodeID, l.Databases, l.SchemaName, l.Tables, l.Indexes, l.RangeID)
127+
if !slices.Equal(l.Databases, []string{"‹system›"}) {
128+
continue
167129
}
168-
rangeIDs[l.RangeID] = struct{}{}
130+
if !slices.Equal(l.Tables, []string{"‹sqlliveness›"}) {
131+
continue
132+
}
133+
if !slices.Equal(l.Indexes, []string{"‹primary›"}) {
134+
continue
135+
}
136+
return nil
169137
}
138+
return errors.New("waited too long for the synthetic data")
139+
})
140+
}
170141

142+
func TestHotRangeLogIntervalSetting(t *testing.T) {
143+
skip.UnderRace(t)
144+
ctx := context.Background()
145+
ts, spy, teardown := setupHotRangesLogTest(t, ctx)
146+
defer teardown()
147+
148+
intervalDuration := 50 * time.Millisecond
149+
start := time.Now()
150+
structlogging.TelemetryHotRangesStatsEnabled.Override(ctx, &ts.ClusterSettings().SV, true)
151+
structlogging.TelemetryHotRangesStatsInterval.Override(ctx, &ts.ClusterSettings().SV, intervalDuration)
152+
structlogging.TelemetryHotRangesStatsLoggingDelay.Override(ctx, &ts.ClusterSettings().SV, 0*time.Millisecond)
153+
154+
// very that there's no logged hot ranges, despite the system ticking
155+
structlogging.HotRangeLogManualTicker <- struct{}{}
156+
if time.Since(start) > intervalDuration {
157+
// stress tests can cause scheduling delays, so that little code takes a
158+
// long time to run. we can bail if the above took longer than the expected
159+
// duration.
160+
return
171161
}
162+
163+
// verify no logs were sent.
164+
assert.Zero(t, len(spy.Logs()))
165+
166+
// sleep for the duration, retrigger the logger and verify that logs were sent.
167+
time.Sleep(intervalDuration * 2)
168+
testutils.SucceedsSoon(t, func() error {
169+
structlogging.HotRangeLogManualTicker <- struct{}{}
170+
if len(spy.Logs()) == 0 {
171+
return errors.New("no logs")
172+
}
173+
return nil
174+
})
172175
}

0 commit comments

Comments
 (0)