Skip to content

Commit

Permalink
Return stats response and cache headers
Browse files Browse the repository at this point in the history
Signed-off-by: milesbryant <[email protected]>
  • Loading branch information
milesbxf committed Nov 15, 2024
1 parent 0577661 commit a5a0575
Show file tree
Hide file tree
Showing 11 changed files with 660 additions and 191 deletions.
17 changes: 10 additions & 7 deletions cmd/thanos/query_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ func registerQueryFrontend(app *extkingpin.App) {
cmd.Flag("query-frontend.log-queries-longer-than", "Log queries that are slower than the specified duration. "+
"Set to 0 to disable. Set to < 0 to enable on all queries.").Default("0").DurationVar(&cfg.CortexHandlerConfig.LogQueriesLongerThan)
cmd.Flag("query-frontend.query-stats-enabled", "True to enable query statistics tracking. "+
"When enabled, a message with some statistics is logged for every query.").Default("false").BoolVar(&cfg.CortexHandlerConfig.QueryStatsEnabled)
"When enabled, a message with some statistics is logged for every query.").Default("false").BoolVar(&cfg.QueryStatsEnabled)
cmd.Flag("query-frontend.cache-queryable-samples-stats", "True to enable caching of queryable samples stats in the cache. ").Default("false").BoolVar(&cfg.CacheQueryableSamplesStats)

cmd.Flag("query-frontend.org-id-header", "Deprecation Warning - This flag will be soon deprecated in favor of query-frontend.tenant-header"+
" and both flags cannot be used at the same time. "+
Expand Down Expand Up @@ -268,8 +269,9 @@ func runQueryFrontend(
return errors.Wrap(err, "initializing the query range cache config")
}
cfg.QueryRangeConfig.ResultsCacheConfig = &queryrange.ResultsCacheConfig{
Compression: cfg.CacheCompression,
CacheConfig: *cacheConfig,
Compression: cfg.CacheCompression,
CacheQueryableSamplesStats: cfg.CacheQueryableSamplesStats,
CacheConfig: *cacheConfig,
}
}

Expand All @@ -283,8 +285,9 @@ func runQueryFrontend(
return errors.Wrap(err, "initializing the labels cache config")
}
cfg.LabelsConfig.ResultsCacheConfig = &queryrange.ResultsCacheConfig{
Compression: cfg.CacheCompression,
CacheConfig: *cacheConfig,
Compression: cfg.CacheCompression,
CacheQueryableSamplesStats: cfg.CacheQueryableSamplesStats,
CacheConfig: *cacheConfig,
}
}

Expand Down Expand Up @@ -313,7 +316,7 @@ func runQueryFrontend(
return err
}

roundTripper, err := cortexfrontend.NewDownstreamRoundTripper(cfg.DownstreamURL, downstreamTripper, cfg.CortexHandlerConfig.QueryStatsEnabled)
roundTripper, err := cortexfrontend.NewDownstreamRoundTripper(cfg.DownstreamURL, downstreamTripper, cfg.QueryStatsEnabled)
if err != nil {
return errors.Wrap(err, "setup downstream roundtripper")
}
Expand All @@ -322,7 +325,7 @@ func runQueryFrontend(
roundTripper = tripperWare(roundTripper)

// Create the query frontend transport.
handler := transport.NewHandler(*cfg.CortexHandlerConfig, roundTripper, logger, nil)
handler := transport.NewHandler(*cfg.CortexHandlerConfig, roundTripper, logger, nil, cfg.QueryStatsEnabled)
if cfg.CompressResponses {
handler = gzhttp.GzipHandler(handler)
}
Expand Down
7 changes: 0 additions & 7 deletions internal/cortex/frontend/downstream_roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,6 @@ func (d downstreamRoundTripper) RoundTrip(r *http.Request) (*http.Response, erro
}
}

if d.queryStatsEnabled {
// add &stats query param to get thanos-query to add query statistics to log
q := r.URL.Query()
q.Set("stats", "true")
r.URL.RawQuery = q.Encode()
}

r.URL.Scheme = d.downstreamURL.Scheme
r.URL.Host = d.downstreamURL.Host
r.URL.Path = path.Join(d.downstreamURL.Path, r.URL.Path)
Expand Down
19 changes: 10 additions & 9 deletions internal/cortex/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ var (
type HandlerConfig struct {
LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"`
MaxBodySize int64 `yaml:"max_body_size"`
QueryStatsEnabled bool `yaml:"query_stats_enabled"`
}

// Handler accepts queries and forwards them to RoundTripper. It can log slow queries,
Expand All @@ -58,17 +57,19 @@ type Handler struct {
querySeconds *prometheus.HistogramVec
querySamplesTotal *prometheus.HistogramVec
activeUsers *util.ActiveUsersCleanupService
queryStatsEnabled bool
}

// NewHandler creates a new frontend handler.
func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logger, reg prometheus.Registerer) http.Handler {
func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logger, reg prometheus.Registerer, queryStatsEnabled bool) http.Handler {
h := &Handler{
cfg: cfg,
log: log,
roundTripper: roundTripper,
cfg: cfg,
log: log,
roundTripper: roundTripper,
queryStatsEnabled: queryStatsEnabled,
}

if cfg.QueryStatsEnabled {
if queryStatsEnabled {
h.querySeconds = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_query_frontend_query_seconds",
Help: "Total amount of wall clock time spend processing queries.",
Expand Down Expand Up @@ -130,7 +131,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(resp.StatusCode)

var respBuf bytes.Buffer
if f.cfg.QueryStatsEnabled {
if f.queryStatsEnabled {
// Buffer the response body for query stat tracking later
resp.Body = io.NopCloser(io.TeeReader(resp.Body, &respBuf))
}
Expand All @@ -141,7 +142,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
level.Error(util_log.WithContext(r.Context(), f.log)).Log("msg", "write response body error", "bytesCopied", bytesCopied, "err", err)
}

if f.cfg.QueryStatsEnabled {
if f.queryStatsEnabled {
// Parse the stats field out of the response body
var statsResponse ResponseWithStats
if err := json.Unmarshal(respBuf.Bytes(), &statsResponse); err == nil {
Expand All @@ -160,7 +161,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

// Check whether we should parse the query string.
shouldReportSlowQuery := f.cfg.LogQueriesLongerThan != 0 && queryResponseTime > f.cfg.LogQueriesLongerThan
if shouldReportSlowQuery || f.cfg.QueryStatsEnabled {
if shouldReportSlowQuery || f.queryStatsEnabled {
queryString = f.parseRequestQueryString(r, buf)
}

Expand Down
1 change: 0 additions & 1 deletion internal/cortex/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ type Config struct {
QueryIngestersWithin time.Duration `yaml:"query_ingesters_within"`
QueryStoreForLabels bool `yaml:"query_store_for_labels_enabled"`
AtModifierEnabled bool `yaml:"at_modifier_enabled"`
EnablePerStepStats bool `yaml:"per_step_stats_enabled"`

// QueryStoreAfter the time after which queries should also be sent to the store and not just ingesters.
QueryStoreAfter time.Duration `yaml:"query_store_after"`
Expand Down
31 changes: 29 additions & 2 deletions internal/cortex/querier/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,23 @@ func (s *PrometheusInstantQueryData) MarshalJSON() ([]byte, error) {
func StatsMerge(resps []Response) *PrometheusResponseStats {
output := map[int64]*PrometheusResponseQueryableSamplesStatsPerStep{}
hasStats := false

result := &PrometheusResponseStats{
Timings: &PrometheusResponseStats_Timings{
EvalTotalTime: 0,
ResultSortTime: 0,
QueryPreparationTime: 0,
InnerEvalTime: 0,
ExecQueueTime: 0,
ExecTotalTime: 0,
},
Samples: &PrometheusResponseStats_Samples{
TotalQueryableSamples: 0,
PeakSamples: 0,
TotalQueryableSamplesPerStep: []*PrometheusResponseQueryableSamplesStatsPerStep{},
},
}

for _, resp := range resps {
stats := resp.GetStats()
if stats == nil {
Expand All @@ -679,6 +696,18 @@ func StatsMerge(resps []Response) *PrometheusResponseStats {
for _, s := range stats.Samples.TotalQueryableSamplesPerStep {
output[s.GetTimestampMs()] = s
}

result.Timings.EvalTotalTime += stats.Timings.EvalTotalTime
result.Timings.ResultSortTime += stats.Timings.ResultSortTime
result.Timings.QueryPreparationTime += stats.Timings.QueryPreparationTime
result.Timings.InnerEvalTime += stats.Timings.InnerEvalTime
result.Timings.ExecQueueTime += stats.Timings.ExecQueueTime
result.Timings.ExecTotalTime += stats.Timings.ExecTotalTime

result.Samples.TotalQueryableSamples += stats.Samples.TotalQueryableSamples
if stats.Samples.PeakSamples > result.Samples.PeakSamples {
result.Samples.PeakSamples = stats.Samples.PeakSamples
}
}

if !hasStats {
Expand All @@ -692,10 +721,8 @@ func StatsMerge(resps []Response) *PrometheusResponseStats {

sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })

result := &PrometheusResponseStats{Samples: &PrometheusResponseSamplesStats{}}
for _, key := range keys {
result.Samples.TotalQueryableSamplesPerStep = append(result.Samples.TotalQueryableSamplesPerStep, output[key])
result.Samples.TotalQueryableSamples += output[key].Value
}

return result
Expand Down
Loading

0 comments on commit a5a0575

Please sign in to comment.