Skip to content

Commit 11d09c7

Browse files
committed
Enable Thanos Query Stats Propagation & cache response headers
Thanos Query gives us some really nice detailed internal stats - but thanos-query-frontend annoyingly blats them by trying to parse the response as a Prometheus response, which has different fields and a different structure. In addition, thanos-query-frontend doesn't even pass the &stats=all parameter through if you try to request it. This change fixes this by propagating the stats request parameter to the downstream, and decoding/encoding the Thanos Query stats structure properly. As an extra, we also set a response header if we get a cache hit so that upstreams can use this. Signed-off-by: milesbryant <[email protected]>
1 parent 0577661 commit 11d09c7

File tree

10 files changed

+633
-300
lines changed

10 files changed

+633
-300
lines changed

cmd/thanos/query_frontend.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,6 @@ func registerQueryFrontend(app *extkingpin.App) {
145145

146146
cmd.Flag("query-frontend.log-queries-longer-than", "Log queries that are slower than the specified duration. "+
147147
"Set to 0 to disable. Set to < 0 to enable on all queries.").Default("0").DurationVar(&cfg.CortexHandlerConfig.LogQueriesLongerThan)
148-
cmd.Flag("query-frontend.query-stats-enabled", "True to enable query statistics tracking. "+
149-
"When enabled, a message with some statistics is logged for every query.").Default("false").BoolVar(&cfg.CortexHandlerConfig.QueryStatsEnabled)
150148

151149
cmd.Flag("query-frontend.org-id-header", "Deprecation Warning - This flag will be soon deprecated in favor of query-frontend.tenant-header"+
152150
" and both flags cannot be used at the same time. "+
@@ -313,7 +311,7 @@ func runQueryFrontend(
313311
return err
314312
}
315313

316-
roundTripper, err := cortexfrontend.NewDownstreamRoundTripper(cfg.DownstreamURL, downstreamTripper, cfg.CortexHandlerConfig.QueryStatsEnabled)
314+
roundTripper, err := cortexfrontend.NewDownstreamRoundTripper(cfg.DownstreamURL, downstreamTripper)
317315
if err != nil {
318316
return errors.Wrap(err, "setup downstream roundtripper")
319317
}

internal/cortex/frontend/downstream_roundtripper.go

+4-12
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,17 @@ import (
1313

1414
// RoundTripper that forwards requests to downstream URL.
1515
type downstreamRoundTripper struct {
16-
downstreamURL *url.URL
17-
transport http.RoundTripper
18-
queryStatsEnabled bool
16+
downstreamURL *url.URL
17+
transport http.RoundTripper
1918
}
2019

21-
func NewDownstreamRoundTripper(downstreamURL string, transport http.RoundTripper, queryStatsEnabled bool) (http.RoundTripper, error) {
20+
func NewDownstreamRoundTripper(downstreamURL string, transport http.RoundTripper) (http.RoundTripper, error) {
2221
u, err := url.Parse(downstreamURL)
2322
if err != nil {
2423
return nil, err
2524
}
2625

27-
return &downstreamRoundTripper{downstreamURL: u, transport: transport, queryStatsEnabled: queryStatsEnabled}, nil
26+
return &downstreamRoundTripper{downstreamURL: u, transport: transport}, nil
2827
}
2928

3029
func (d downstreamRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
@@ -37,13 +36,6 @@ func (d downstreamRoundTripper) RoundTrip(r *http.Request) (*http.Response, erro
3736
}
3837
}
3938

40-
if d.queryStatsEnabled {
41-
// add &stats query param to get thanos-query to add query statistics to log
42-
q := r.URL.Query()
43-
q.Set("stats", "true")
44-
r.URL.RawQuery = q.Encode()
45-
}
46-
4739
r.URL.Scheme = d.downstreamURL.Scheme
4840
r.URL.Host = d.downstreamURL.Host
4941
r.URL.Path = path.Join(d.downstreamURL.Path, r.URL.Path)

internal/cortex/frontend/transport/handler.go

+1-89
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,8 @@ package transport
66
import (
77
"bytes"
88
"context"
9-
"encoding/json"
109
"errors"
1110
"fmt"
12-
"github.com/prometheus/client_golang/prometheus/promauto"
1311
"github.com/prometheus/prometheus/util/stats"
1412
"io"
1513
"net/http"
@@ -44,7 +42,6 @@ var (
4442
type HandlerConfig struct {
4543
LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"`
4644
MaxBodySize int64 `yaml:"max_body_size"`
47-
QueryStatsEnabled bool `yaml:"query_stats_enabled"`
4845
}
4946

5047
// Handler accepts queries and forwards them to RoundTripper. It can log slow queries,
@@ -68,27 +65,6 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
6865
roundTripper: roundTripper,
6966
}
7067

71-
if cfg.QueryStatsEnabled {
72-
h.querySeconds = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
73-
Name: "thanos_query_frontend_query_seconds",
74-
Help: "Total amount of wall clock time spend processing queries.",
75-
Buckets: []float64{0.01, 0.1, 0.5, 1, 2, 5, 10, 30, 60, 120, 360},
76-
}, []string{"user"})
77-
78-
h.querySamplesTotal = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
79-
Name: "thanos_query_frontend_query_total_fetched_samples",
80-
Help: "Number of samples touched to execute a query.",
81-
Buckets: []float64{1, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000},
82-
}, []string{"user"})
83-
84-
h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
85-
h.querySeconds.DeleteLabelValues(user)
86-
h.querySamplesTotal.DeleteLabelValues(user)
87-
})
88-
// If cleaner stops or fail, we will simply not clean the metrics for inactive users.
89-
_ = h.activeUsers.StartAsync(context.Background())
90-
}
91-
9268
return h
9369
}
9470

@@ -129,38 +105,15 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
129105

130106
w.WriteHeader(resp.StatusCode)
131107

132-
var respBuf bytes.Buffer
133-
if f.cfg.QueryStatsEnabled {
134-
// Buffer the response body for query stat tracking later
135-
resp.Body = io.NopCloser(io.TeeReader(resp.Body, &respBuf))
136-
}
137-
138108
// log copy response body error so that we will know even though success response code returned
139109
bytesCopied, err := io.Copy(w, resp.Body)
140110
if err != nil && !errors.Is(err, syscall.EPIPE) {
141111
level.Error(util_log.WithContext(r.Context(), f.log)).Log("msg", "write response body error", "bytesCopied", bytesCopied, "err", err)
142112
}
143113

144-
if f.cfg.QueryStatsEnabled {
145-
// Parse the stats field out of the response body
146-
var statsResponse ResponseWithStats
147-
if err := json.Unmarshal(respBuf.Bytes(), &statsResponse); err == nil {
148-
if statsResponse.Data.Stats != nil {
149-
queryString = f.parseRequestQueryString(r, buf)
150-
f.reportQueryStats(r, queryString, queryResponseTime, statsResponse.Data.Stats)
151-
} else {
152-
// Don't fail the request if the stats are nil, just log a warning
153-
level.Warn(util_log.WithContext(r.Context(), f.log)).Log("msg", "error parsing query stats", "err", errors.New("stats are nil"))
154-
}
155-
} else {
156-
// Don't fail the request if the stats are nil, just log a warning
157-
level.Warn(util_log.WithContext(r.Context(), f.log)).Log("msg", "error parsing query stats", "err", err)
158-
}
159-
}
160-
161114
// Check whether we should parse the query string.
162115
shouldReportSlowQuery := f.cfg.LogQueriesLongerThan != 0 && queryResponseTime > f.cfg.LogQueriesLongerThan
163-
if shouldReportSlowQuery || f.cfg.QueryStatsEnabled {
116+
if shouldReportSlowQuery {
164117
queryString = f.parseRequestQueryString(r, buf)
165118
}
166119

@@ -203,47 +156,6 @@ func (f *Handler) reportSlowQuery(r *http.Request, responseHeaders http.Header,
203156
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
204157
}
205158

206-
func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *stats.BuiltinStats) {
207-
remoteUser, _, _ := r.BasicAuth()
208-
209-
// Log stats.
210-
logMessage := []interface{}{
211-
"msg", "query stats",
212-
"component", "query-frontend",
213-
"method", r.Method,
214-
"path", r.URL.Path,
215-
"remote_user", remoteUser,
216-
"remote_addr", r.RemoteAddr,
217-
"response_time", queryResponseTime,
218-
"query_timings_preparation_time", stats.Timings.QueryPreparationTime,
219-
"query_timings_eval_total_time", stats.Timings.EvalTotalTime,
220-
"query_timings_exec_total_time", stats.Timings.ExecTotalTime,
221-
"query_timings_exec_queue_time", stats.Timings.ExecQueueTime,
222-
"query_timings_inner_eval_time", stats.Timings.InnerEvalTime,
223-
"query_timings_result_sort_time", stats.Timings.ResultSortTime,
224-
}
225-
if stats.Samples != nil {
226-
samples := stats.Samples
227-
228-
logMessage = append(logMessage, []interface{}{
229-
"total_queryable_samples", samples.TotalQueryableSamples,
230-
"peak_samples", samples.PeakSamples,
231-
}...)
232-
}
233-
234-
logMessage = append(logMessage, formatQueryString(queryString)...)
235-
236-
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
237-
238-
// Record metrics.
239-
if f.querySeconds != nil {
240-
f.querySeconds.WithLabelValues(remoteUser).Observe(queryResponseTime.Seconds())
241-
}
242-
if f.querySamplesTotal != nil && stats.Samples != nil {
243-
f.querySamplesTotal.WithLabelValues(remoteUser).Observe(float64(stats.Samples.TotalQueryableSamples))
244-
}
245-
}
246-
247159
func (f *Handler) parseRequestQueryString(r *http.Request, bodyBuf bytes.Buffer) url.Values {
248160
// Use previously buffered body.
249161
r.Body = io.NopCloser(&bodyBuf)

internal/cortex/querier/querier.go

-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ type Config struct {
2121
QueryIngestersWithin time.Duration `yaml:"query_ingesters_within"`
2222
QueryStoreForLabels bool `yaml:"query_store_for_labels_enabled"`
2323
AtModifierEnabled bool `yaml:"at_modifier_enabled"`
24-
EnablePerStepStats bool `yaml:"per_step_stats_enabled"`
2524

2625
// QueryStoreAfter the time after which queries should also be sent to the store and not just ingesters.
2726
QueryStoreAfter time.Duration `yaml:"query_store_after"`

internal/cortex/querier/queryrange/query_range.go

+29-2
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,23 @@ func (s *PrometheusInstantQueryData) MarshalJSON() ([]byte, error) {
665665
func StatsMerge(resps []Response) *PrometheusResponseStats {
666666
output := map[int64]*PrometheusResponseQueryableSamplesStatsPerStep{}
667667
hasStats := false
668+
669+
result := &PrometheusResponseStats{
670+
Timings: &PrometheusResponseStats_Timings{
671+
EvalTotalTime: 0,
672+
ResultSortTime: 0,
673+
QueryPreparationTime: 0,
674+
InnerEvalTime: 0,
675+
ExecQueueTime: 0,
676+
ExecTotalTime: 0,
677+
},
678+
Samples: &PrometheusResponseStats_Samples{
679+
TotalQueryableSamples: 0,
680+
PeakSamples: 0,
681+
TotalQueryableSamplesPerStep: []*PrometheusResponseQueryableSamplesStatsPerStep{},
682+
},
683+
}
684+
668685
for _, resp := range resps {
669686
stats := resp.GetStats()
670687
if stats == nil {
@@ -679,6 +696,18 @@ func StatsMerge(resps []Response) *PrometheusResponseStats {
679696
for _, s := range stats.Samples.TotalQueryableSamplesPerStep {
680697
output[s.GetTimestampMs()] = s
681698
}
699+
700+
result.Timings.EvalTotalTime += stats.Timings.EvalTotalTime
701+
result.Timings.ResultSortTime += stats.Timings.ResultSortTime
702+
result.Timings.QueryPreparationTime += stats.Timings.QueryPreparationTime
703+
result.Timings.InnerEvalTime += stats.Timings.InnerEvalTime
704+
result.Timings.ExecQueueTime += stats.Timings.ExecQueueTime
705+
result.Timings.ExecTotalTime += stats.Timings.ExecTotalTime
706+
707+
result.Samples.TotalQueryableSamples += stats.Samples.TotalQueryableSamples
708+
if stats.Samples.PeakSamples > result.Samples.PeakSamples {
709+
result.Samples.PeakSamples = stats.Samples.PeakSamples
710+
}
682711
}
683712

684713
if !hasStats {
@@ -692,10 +721,8 @@ func StatsMerge(resps []Response) *PrometheusResponseStats {
692721

693722
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
694723

695-
result := &PrometheusResponseStats{Samples: &PrometheusResponseSamplesStats{}}
696724
for _, key := range keys {
697725
result.Samples.TotalQueryableSamplesPerStep = append(result.Samples.TotalQueryableSamplesPerStep, output[key])
698-
result.Samples.TotalQueryableSamples += output[key].Value
699726
}
700727

701728
return result

0 commit comments

Comments
 (0)