Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tracing,execinfrapb: include child metadata in trace agg metas #143090

Merged
merged 3 commits into from
Mar 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/backup/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func backup(
// Update the running aggregate of the component with the latest received
// aggregate.
resumer.mu.Lock()
resumer.mu.perNodeAggregatorStats[componentID] = agg.Events
resumer.mu.perNodeAggregatorStats[componentID] = *agg
resumer.mu.Unlock()
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/backup/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func restore(
// Update the running aggregate of the component with the latest received
// aggregate.
resumer.mu.Lock()
resumer.mu.perNodeAggregatorStats[componentID] = agg.Events
resumer.mu.perNodeAggregatorStats[componentID] = *agg
resumer.mu.Unlock()
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/crosscluster/logical/logical_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ func (rh *rowHandler) handleTraceAgg(agg *execinfrapb.TracingAggregatorEvents) {
// aggregate.
rh.r.mu.Lock()
defer rh.r.mu.Unlock()
rh.r.mu.perNodeAggregatorStats[componentID] = agg.Events
rh.r.mu.perNodeAggregatorStats[componentID] = *agg
}

func (rh *rowHandler) handleMeta(ctx context.Context, meta *execinfrapb.ProducerMetadata) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/crosscluster/physical/stream_ingestion_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func startDistIngestion(
// Update the running aggregate of the component with the latest received
// aggregate.
resumer.mu.Lock()
resumer.mu.perNodeAggregatorStats[componentID] = agg.Events
resumer.mu.perNodeAggregatorStats[componentID] = *agg
resumer.mu.Unlock()
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (r *Registry) maybeDumpTrace(resumerCtx context.Context, resumer Resumer, j
}

resumerTraceFilename := fmt.Sprintf("%s/resumer-trace/%s",
r.ID().String(), timeutil.Now().Format("20060102_150405.00"))
timeutil.Now().Format("20060102_150405.00"), r.ID().String())
td := jobspb.TraceData{CollectedSpans: sp.GetConfiguredRecording()}
if err := r.db.Txn(dumpCtx, func(ctx context.Context, txn isql.Txn) error {
return WriteProtobinExecutionDetailFile(dumpCtx, resumerTraceFilename, &td, txn, jobID)
Expand Down
6 changes: 3 additions & 3 deletions pkg/server/job_profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (e *executionDetailsBuilder) addLabelledGoroutines(ctx context.Context) {
log.Errorf(ctx, "failed to collect goroutines for job %d: %v", e.jobID, err.Error())
return
}
filename := fmt.Sprintf("goroutines.%s.txt", timeutil.Now().Format("20060102_150405.00"))
filename := fmt.Sprintf("%s/job-goroutines.txt", timeutil.Now().Format("20060102_150405.00"))
if err := e.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
return jobs.WriteExecutionDetailFile(ctx, filename, resp.Data, txn, e.jobID)
}); err != nil {
Expand All @@ -146,7 +146,7 @@ func (e *executionDetailsBuilder) addDistSQLDiagram(ctx context.Context) {
}
if row != nil && row[0] != tree.DNull {
dspDiagramURL := string(tree.MustBeDString(row[0]))
filename := fmt.Sprintf("distsql.%s.html", timeutil.Now().Format("20060102_150405.00"))
filename := fmt.Sprintf("%s/distsql-plan.html", timeutil.Now().Format("20060102_150405.00"))
if err := e.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
return jobs.WriteExecutionDetailFile(ctx, filename,
[]byte(fmt.Sprintf(`<meta http-equiv="Refresh" content="0; url=%s">`, dspDiagramURL)),
Expand All @@ -172,7 +172,7 @@ func (e *executionDetailsBuilder) addClusterWideTraces(ctx context.Context) {
return
}

filename := fmt.Sprintf("trace.%s.zip", timeutil.Now().Format("20060102_150405.00"))
filename := fmt.Sprintf("%s/trace.zip", timeutil.Now().Format("20060102_150405.00"))
if err := e.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
return jobs.WriteExecutionDetailFile(ctx, filename, zippedTrace, txn, e.jobID)
}); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/execinfrapb/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -379,4 +379,6 @@ message TracingAggregatorEvents {
(gogoproto.customname) = "FlowID",
(gogoproto.customtype) = "FlowID"];
map<string, bytes> events = 3;

map<string, util.tracing.tracingpb.OperationMetadata> span_totals = 4 [(gogoproto.nullable) = false];
}
27 changes: 15 additions & 12 deletions pkg/sql/jobs_profiler_execution_details_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,12 +378,12 @@ func TestListProfilerExecutionDetails(t *testing.T) {
files := listExecutionDetails(t, s, jobspb.JobID(importJobID))

patterns := []string{
"distsql\\..*\\.html",
".*/distsql-plan.html",
}
if !s.DeploymentMode().IsExternal() {
patterns = append(patterns, "goroutines\\..*\\.txt")
patterns = append(patterns, ".*/job-goroutines.txt")
}
patterns = append(patterns, "trace\\..*\\.zip")
patterns = append(patterns, ".*/trace.zip")

require.Len(t, files, len(patterns))
for i, pattern := range patterns {
Expand Down Expand Up @@ -426,17 +426,19 @@ func TestListProfilerExecutionDetails(t *testing.T) {
return nil
})
patterns = []string{
"[0-9]/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb",
"[0-9]/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb.txt",
"[0-9]/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb",
"[0-9]/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb.txt",
"distsql\\..*\\.html",
"distsql\\..*\\.html",
".*/distsql-plan.html",
".*/distsql-plan.html",
}
if !s.DeploymentMode().IsExternal() {
patterns = append(patterns, "goroutines\\..*\\.txt", "goroutines\\..*\\.txt")
patterns = append(patterns, ".*/job-goroutines.txt", ".*/job-goroutines.txt")
}
patterns = append(patterns, "trace\\..*\\.zip", "trace\\..*\\.zip")
patterns = append(patterns,
"[0-9_.]*/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb",
"[0-9_.]*/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb",
"[0-9_.]*/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb.txt",
"[0-9_.]*/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb.txt",
)
patterns = append(patterns, ".*/trace.zip", ".*/trace.zip")
for i, pattern := range patterns {
require.Regexp(t, pattern, files[i])
}
Expand Down Expand Up @@ -465,8 +467,9 @@ func listExecutionDetails(

edResp := serverpb.ListJobProfilerExecutionDetailsResponse{}
require.NoError(t, protoutil.Unmarshal(body, &edResp))
// Sort the responses with the variable date/time digits in the prefix removed.
sort.Slice(edResp.Files, func(i, j int) bool {
return edResp.Files[i] < edResp.Files[j]
return strings.TrimLeft(edResp.Files[i], "0123456789_.") < strings.TrimLeft(edResp.Files[j], "0123456789_.")
})
return edResp.Files
}
Expand Down
1 change: 1 addition & 0 deletions pkg/util/bulk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ go_library(
"//pkg/util/log",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
],
)
138 changes: 85 additions & 53 deletions pkg/util/bulk/aggregator_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"bytes"
"context"
"fmt"
"sort"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
)

// ConstructTracingAggregatorProducerMeta constructs a ProducerMetadata that
Expand All @@ -45,25 +47,37 @@ func ConstructTracingAggregatorProducerMeta(
}
})

sp := tracing.SpanFromContext(ctx)
if sp != nil {
recType := sp.RecordingType()
if recType != tracingpb.RecordingOff {
aggEvents.SpanTotals = sp.GetFullTraceRecording(recType).Root.ChildrenMetadata
}
}
return &execinfrapb.ProducerMetadata{AggregatorEvents: aggEvents}
}

// ComponentAggregatorStats is a mapping from a component to all the Aggregator
// Stats collected for that component.
type ComponentAggregatorStats map[execinfrapb.ComponentID]map[string][]byte
type ComponentAggregatorStats map[execinfrapb.ComponentID]execinfrapb.TracingAggregatorEvents

// DeepCopy takes a deep copy of the component aggregator stats map.
func (c ComponentAggregatorStats) DeepCopy() ComponentAggregatorStats {
mapCopy := make(ComponentAggregatorStats, len(c))
for k, v := range c {
innerMap := make(map[string][]byte, len(v))
for k2, v2 := range v {
copied := v
copied.Events = make(map[string][]byte, len(v.Events))
copied.SpanTotals = make(map[string]tracingpb.OperationMetadata, len(v.SpanTotals))
for k2, v2 := range v.Events {
// Create a copy of the byte slice to avoid modifying the original data.
dataCopy := make([]byte, len(v2))
copy(dataCopy, v2)
innerMap[k2] = dataCopy
copied.Events[k2] = dataCopy
}
for k2, v2 := range v.SpanTotals {
copied.SpanTotals[k2] = v2
}
mapCopy[k] = innerMap
mapCopy[k] = copied
}
return mapCopy
}
Expand All @@ -82,58 +96,76 @@ func FlushTracingAggregatorStats(
db isql.DB,
perNodeAggregatorStats ComponentAggregatorStats,
) error {
return db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
clusterWideAggregatorStats := make(map[string]tracing.AggregatorEvent)
asOf := timeutil.Now().Format("20060102_150405.00")
clusterWideSpanStats := make(map[string]tracingpb.OperationMetadata)
clusterWideAggregatorStats := make(map[string]tracing.AggregatorEvent)
ids := make([]execinfrapb.ComponentID, 0, len(perNodeAggregatorStats))

var clusterWideSummary bytes.Buffer
for component, nameToEvent := range perNodeAggregatorStats {
clusterWideSummary.WriteString(fmt.Sprintf("## SQL Instance ID: %s; Flow ID: %s\n\n",
component.SQLInstanceID.String(), component.FlowID.String()))
for name, event := range nameToEvent {
// Write a proto file per tag. This machine-readable file can be consumed
// by other places we want to display this information egs: annotated
// DistSQL diagrams, DBConsole etc.
filename := fmt.Sprintf("%s/%s",
component.SQLInstanceID.String(), asOf)
msg, err := protoreflect.DecodeMessage(name, event)
if err != nil {
clusterWideSummary.WriteString(fmt.Sprintf("invalid protocol message: %v", err))
// If we failed to decode the event write the error to the file and
// carry on.
continue
}

if err := jobs.WriteProtobinExecutionDetailFile(ctx, filename, msg, txn, jobID); err != nil {
return err
}

// Construct a single text file that contains information on a per-node
// basis as well as a cluster-wide aggregate.
clusterWideSummary.WriteString(fmt.Sprintf("# %s\n", name))

aggEvent := msg.(tracing.AggregatorEvent)
clusterWideSummary.WriteString(aggEvent.String())
clusterWideSummary.WriteString("\n")

if _, ok := clusterWideAggregatorStats[name]; ok {
clusterWideAggregatorStats[name].Combine(aggEvent)
} else {
clusterWideAggregatorStats[name] = aggEvent
}
}
for component := range perNodeAggregatorStats {
ids = append(ids, component)
}
sort.Slice(ids, func(i, j int) bool { return ids[i].SQLInstanceID < ids[j].SQLInstanceID })

// Write a summary for each per-node to a buffer. While doing so, accumulate a
// cluster-wide summary as well to be written to a second buffer below.
var perNode bytes.Buffer
fmt.Fprintf(&perNode, "# Per-componant Details (%d)\n", len(perNodeAggregatorStats))
for _, component := range ids {
nodeStats := perNodeAggregatorStats[component]
fmt.Fprintf(&perNode, "# SQL Instance ID: %s (%s); Flow/proc ID: %s/%d\n\n",
component.SQLInstanceID, component.Region, component.FlowID, component.ID)

// Print span stats.
perNode.WriteString("## Span Totals\n\n")
for name, stats := range nodeStats.SpanTotals {
fmt.Fprintf(&perNode, "- %-40s (%d):\t%s\n", name, stats.Count, stats.Duration)
}
perNode.WriteString("\n")

// Add span stats to the cluster-wide span stats.
for spanName, totals := range nodeStats.SpanTotals {
clusterWideSpanStats[spanName] = clusterWideSpanStats[spanName].Combine(totals)
}

for tag, event := range clusterWideAggregatorStats {
clusterWideSummary.WriteString("## Cluster-wide\n\n")
clusterWideSummary.WriteString(fmt.Sprintf("# %s\n", tag))
clusterWideSummary.WriteString(event.String())
perNode.WriteString("## Aggregate Stats\n\n")
for name, event := range nodeStats.Events {
msg, err := protoreflect.DecodeMessage(name, event)
if err != nil {
continue
}
aggEvent := msg.(tracing.AggregatorEvent)
fmt.Fprintf(&perNode, "- %s:\n%s\n\n", name, aggEvent)

// Populate the cluster-wide aggregator stats.
if _, ok := clusterWideAggregatorStats[name]; ok {
clusterWideAggregatorStats[name].Combine(aggEvent)
} else {
clusterWideAggregatorStats[name] = aggEvent
}
}
perNode.WriteString("\n")
}

// Ensure the file always has a trailing newline, regardless of whether or
// not the loops above wrote anything.
clusterWideSummary.WriteString("\n")
filename := fmt.Sprintf("aggregatorstats.%s.txt", asOf)
return jobs.WriteExecutionDetailFile(ctx, filename, clusterWideSummary.Bytes(), txn, jobID)
// Write the cluster-wide summary.
var combined bytes.Buffer
combined.WriteString("# Cluster-wide\n\n")
combined.WriteString("## Span Totals\n\n")
for name, stats := range clusterWideSpanStats {
fmt.Fprintf(&combined, " - %-40s (%d):\t%s\n", name, stats.Count, stats.Duration)
}
combined.WriteString("\n")
combined.WriteString("## Aggregate Stats\n\n")
for name, ev := range clusterWideAggregatorStats {
fmt.Fprintf(&combined, " - %s:\n%s\n", name, ev)
}
combined.WriteString("\n")

return db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
asOf := timeutil.Now().Format("20060102_150405.00")
combinedFilename := fmt.Sprintf("%s/trace-stats-cluster-wide.txt", asOf)
perNodeFilename := fmt.Sprintf("%s/trace-stats-by-node.txt", asOf)
if err := jobs.WriteExecutionDetailFile(ctx, combinedFilename, combined.Bytes(), txn, jobID); err != nil {
return err
}
return jobs.WriteExecutionDetailFile(ctx, perNodeFilename, perNode.Bytes(), txn, jobID)
})
}
10 changes: 10 additions & 0 deletions pkg/util/tracing/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,16 @@ func (sp *Span) GetRecording(recType tracingpb.RecordingType) tracingpb.Recordin
return sp.i.GetRecording(recType, false /* finishing */)
}

func (sp *Span) GetFullTraceRecording(recType tracingpb.RecordingType) Trace {
if sp.detectUseAfterFinish() {
return Trace{}
}
if sp.RecordingType() == tracingpb.RecordingOff {
return Trace{}
}
return sp.i.GetFullTraceRecording(recType)
}

// GetConfiguredRecording is like GetRecording, except the type of recording it
// returns is the one that the span has been previously configured with.
//
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/tracing/span_inner.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ func (s *spanInner) GetTraceRecording(recType tracingpb.RecordingType, finishing
return s.crdb.GetRecording(recType, finishing)
}

// GetFullTraceRecording returns the span's full recording, including detached
// children, as a Trace. See GetTraceRecording and WithDetachedRecording.
func (s *spanInner) GetFullTraceRecording(recType tracingpb.RecordingType) Trace {
return s.crdb.GetFullRecording(recType)
}

// GetRecording returns the span's recording.
//
// finishing indicates whether s is in the process of finishing. If it isn't,
Expand Down
Loading