Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

structlogging: restructure hot range logger for testability #142996

Merged
merged 1 commit into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions pkg/server/structlogging/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,20 @@ go_test(
deps = [
":structlogging",
"//pkg/base",
"//pkg/keys",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/allocator/plan",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql/catalog/desctestutils",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/log/eventpb",
"//pkg/util/log/logpb",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//assert",
],
)
86 changes: 51 additions & 35 deletions pkg/server/structlogging/hot_ranges_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func (s *hotRangesLoggingScheduler) start(ctx context.Context, stopper *stop.Sto
})

ticker := time.NewTicker(TelemetryHotRangesStatsInterval.Get(&s.st.SV))
defer ticker.Stop()

for {
select {
Expand All @@ -93,43 +92,60 @@ func (s *hotRangesLoggingScheduler) start(ctx context.Context, stopper *stop.Sto
case <-ctx.Done():
return
case <-ticker.C:
if !TelemetryHotRangesStatsEnabled.Get(&s.st.SV) {
continue
}
resp, err := s.sServer.HotRangesV2(ctx,
&serverpb.HotRangesRequest{NodeID: "local", PageSize: ReportTopHottestRanges})
if err != nil {
log.Warningf(ctx, "failed to get hot ranges: %s", err)
continue
}
var events []logpb.EventPayload
ts := timeutil.Now().UnixNano()

for _, r := range resp.Ranges {
hrEvent := &eventpb.HotRangesStats{
RangeID: int64(r.RangeID),
Qps: r.QPS,
Databases: r.Databases,
Tables: r.Tables,
Indexes: r.Indexes,
SchemaName: r.SchemaName,
CPUTimePerSecond: r.CPUTimePerSecond,
ReadBytesPerSecond: r.ReadBytesPerSecond,
WriteBytesPerSecond: r.WriteBytesPerSecond,
ReadsPerSecond: r.ReadsPerSecond,
WritesPerSecond: r.WritesPerSecond,
LeaseholderNodeID: int32(r.LeaseholderNodeID),
CommonEventDetails: logpb.CommonEventDetails{
Timestamp: ts,
},
}
events = append(events, hrEvent)
}
logutil.LogEventsWithDelay(ctx, events, stopper, TelemetryHotRangesStatsLoggingDelay.Get(&s.st.SV))

s.maybeLogHotRanges(ctx, stopper)
case <-intervalChangedChan:
ticker.Reset(TelemetryHotRangesStatsInterval.Get(&s.st.SV))
}
}
})
}

// maybeLogHotRanges is a small helper function which couples the
// functionality of checking whether to log and logging.
func (s *hotRangesLoggingScheduler) maybeLogHotRanges(ctx context.Context, stopper *stop.Stopper) {
if s.shouldLog() {
s.logHotRanges(ctx, stopper)
}
}

// shouldLog checks the below conditions to see whether it should emit logs.
// - Is the cluster setting server.telemetry.hot_ranges_stats.enabled true?
func (s *hotRangesLoggingScheduler) shouldLog() bool {
return TelemetryHotRangesStatsEnabled.Get(&s.st.SV)
}
Comment on lines +111 to +115
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is a stylistic preference, but this function just checks 1 cluster setting and there's also only one caller of shouldLog as far as I can tell. Maybe we can just inline this call in maybeLogHotRanges?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, your question is exactly correct - for now this function acts only as a passthrough. Part of the reasoning behind this is the additional checks which will shortly follow this PR. In the future the function will look like:

func shouldLog(lastLogTime time.Time, topReplicaCPU ) bool {
    setttingIntervalDuration := IntervalSetting.Get(&s.st.SV)
    settingEnabled := EnabledSetting.Get(&s.st.SV)
    if settingEnabled && time.Since(lastLogTime) > settingIntervalDuration {
        return true
    }
    if topReplicaCPU > LogCPUThreshold {
        return true
    }
    return false
}

We're going to move the ticker interval to something small like 1m. From there we aim to log at the cluster setting's interval, or when the node in question goes over a certain load.


// logHotRanges collects the hot ranges from this node's status server and
// sends them to the TELEMETRY log channel.
func (s *hotRangesLoggingScheduler) logHotRanges(ctx context.Context, stopper *stop.Stopper) {
resp, err := s.sServer.HotRangesV2(ctx,
&serverpb.HotRangesRequest{NodeID: "local", PageSize: ReportTopHottestRanges})
if err != nil {
log.Warningf(ctx, "failed to get hot ranges: %s", err)
return
}

var events []logpb.EventPayload
ts := timeutil.Now().UnixNano()

for _, r := range resp.Ranges {
hrEvent := &eventpb.HotRangesStats{
RangeID: int64(r.RangeID),
Qps: r.QPS,
Databases: r.Databases,
Tables: r.Tables,
Indexes: r.Indexes,
SchemaName: r.SchemaName,
CPUTimePerSecond: r.CPUTimePerSecond,
ReadBytesPerSecond: r.ReadBytesPerSecond,
WriteBytesPerSecond: r.WriteBytesPerSecond,
ReadsPerSecond: r.ReadsPerSecond,
WritesPerSecond: r.WritesPerSecond,
LeaseholderNodeID: int32(r.LeaseholderNodeID),
CommonEventDetails: logpb.CommonEventDetails{
Timestamp: ts,
},
}
events = append(events, hrEvent)
}
logutil.LogEventsWithDelay(ctx, events, stopper, TelemetryHotRangesStatsLoggingDelay.Get(&s.st.SV))
}
124 changes: 45 additions & 79 deletions pkg/server/structlogging/hot_ranges_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,24 @@ package structlogging_test
import (
"context"
"encoding/json"
"errors"
"regexp"
"slices"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/plan"
"github.com/cockroachdb/cockroach/pkg/server/structlogging"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
)

type hotRangesLogSpy struct {
Expand Down Expand Up @@ -74,21 +71,12 @@ func (spy *hotRangesLogSpy) Reset() {
spy.mu.logs = nil
}

// TestHotRangesStatsTenants tests that hot ranges stats are logged per node.
// The test will ensure each node contains 5 distinct range replicas for hot
// ranges logging. Each node should thus log 5 distinct range ids.
func TestHotRangesStats(t *testing.T) {
defer leaktest.AfterTest(t)()
func setupHotRangesLogTest(
t *testing.T, ctx context.Context,
) (serverutils.ApplicationLayerInterface, *hotRangesLogSpy, func()) {
sc := log.ScopeWithoutShowLogs(t)
defer sc.Close(t)

skip.UnderRace(t)

ctx := context.Background()
spy := hotRangesLogSpy{t: t}
defer log.InterceptWith(ctx, &spy)()

tc := serverutils.StartCluster(t, 3, base.TestClusterArgs{
spy := &hotRangesLogSpy{t: t}
tc := serverutils.StartCluster(t, 1, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
DefaultTestTenant: base.TestControlsTenantsExplicitly,
Expand All @@ -101,72 +89,50 @@ func TestHotRangesStats(t *testing.T) {
},
},
})
defer tc.Stopper().Stop(ctx)

db := tc.ServerConn(0)
sqlutils.CreateTable(
t, db, "foo",
"k INT PRIMARY KEY, v INT",
300,
sqlutils.ToRowFn(sqlutils.RowIdxFn, sqlutils.RowModuloFn(2)),
)

// Ensure both of node 1 and 2 have 5 distinct replicas from the table.
tableDesc := desctestutils.TestingGetPublicTableDescriptor(
tc.Server(0).DB(), keys.SystemSQLCodec, "test", "foo")
tc.SplitTable(t, tableDesc, []serverutils.SplitPoint{
{TargetNodeIdx: 1, Vals: []interface{}{100}},
{TargetNodeIdx: 1, Vals: []interface{}{120}},
{TargetNodeIdx: 1, Vals: []interface{}{140}},
{TargetNodeIdx: 1, Vals: []interface{}{160}},
{TargetNodeIdx: 1, Vals: []interface{}{180}},
{TargetNodeIdx: 2, Vals: []interface{}{200}},
{TargetNodeIdx: 2, Vals: []interface{}{220}},
{TargetNodeIdx: 2, Vals: []interface{}{240}},
{TargetNodeIdx: 2, Vals: []interface{}{260}},
{TargetNodeIdx: 2, Vals: []interface{}{280}},
})

// query table
for i := 0; i < 300; i++ {
db := tc.ServerConn(0)
sqlutils.MakeSQLRunner(db).Query(t, `SELECT * FROM test.foo`)
leakChecker := leaktest.AfterTest(t)
logInterceptor := log.InterceptWith(ctx, spy)
stopper := tc.Stopper()
teardown := func() {
stopper.Stop(ctx)
sc.Close(t)
logInterceptor()
leakChecker()
}

// Skip node 1 since it will contain many more replicas.
// We only need to check nodes 2 and 3 to see that the nodes are logging their local hot ranges.
rangeIDs := make(map[int64]struct{})
for _, i := range []int{1, 2} {
spy.Reset()
ts := tc.ApplicationLayer(i)
structlogging.TelemetryHotRangesStatsEnabled.Override(ctx, &ts.ClusterSettings().SV, true)
structlogging.TelemetryHotRangesStatsInterval.Override(ctx, &ts.ClusterSettings().SV, time.Second)
structlogging.TelemetryHotRangesStatsLoggingDelay.Override(ctx, &ts.ClusterSettings().SV, 0*time.Millisecond)

testutils.SucceedsSoon(t, func() error {
logs := spy.Logs()
if len(logs) < 5 {
return errors.New("waiting for hot ranges to be logged")
}
ts := tc.ApplicationLayer(0)
return ts, spy, teardown
}

return nil
})
structlogging.TelemetryHotRangesStatsInterval.Override(ctx, &ts.ClusterSettings().SV, 1*time.Hour)
// TestHotRangeLogger tests that hot ranges stats are logged per node.
// The test will ensure each node contains 5 distinct range replicas for hot
// ranges logging. Each node should thus log 5 distinct range ids.
func TestHotRangeLogger(t *testing.T) {
skip.UnderRace(t)
ctx := context.Background()
ts, spy, teardown := setupHotRangesLogTest(t, ctx)
defer teardown()

// Get first 5 logs since the logging loop may have fired multiple times.
// We should have gotten 5 distinct range ids, one for each split point above.
logs := spy.Logs()[:5]
structlogging.TelemetryHotRangesStatsEnabled.Override(ctx, &ts.ClusterSettings().SV, true)
structlogging.TelemetryHotRangesStatsInterval.Override(ctx, &ts.ClusterSettings().SV, time.Millisecond)
structlogging.TelemetryHotRangesStatsLoggingDelay.Override(ctx, &ts.ClusterSettings().SV, 0*time.Millisecond)

testutils.SucceedsSoon(t, func() error {
logs := spy.Logs()

// Look for a descriptor, which we always expect to exist in the system.
for _, l := range logs {
assert.Equal(t, l.Databases, []string{"‹test›"})
assert.Equal(t, l.Tables, []string{"‹foo›"})
assert.Equal(t, l.Indexes, []string{"‹foo_pkey›"})
_, ok := rangeIDs[l.RangeID]
if ok {
t.Fatalf(`Logged ranges should be unique per node for this test.
found range on node %d and node %d: %s %s %s %s %d`, i, l.LeaseholderNodeID, l.Databases, l.SchemaName, l.Tables, l.Indexes, l.RangeID)
if !slices.Equal(l.Databases, []string{"‹system›"}) {
continue
}
if !slices.Equal(l.Tables, []string{"‹sqlliveness›"}) {
continue
}
rangeIDs[l.RangeID] = struct{}{}
if !slices.Equal(l.Indexes, []string{"‹primary›"}) {
continue
}
return nil
}

}
return errors.New("waited too long for the synthetic data")
})
}