Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: return the jsonpath of json detected fields #16861

Merged
merged 12 commits into from
Mar 25, 2025
462 changes: 264 additions & 198 deletions pkg/logproto/logproto.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/logproto/logproto.proto
Original file line number Diff line number Diff line change
@@ -529,6 +529,7 @@ message DetectedField {
uint64 cardinality = 3;
repeated string parsers = 4 [(gogoproto.jsontag) = "parsers"];
bytes sketch = 5 [(gogoproto.jsontag) = "sketch,omitempty"];
repeated string jsonPath = 6 [(gogoproto.jsontag) = "jsonPath,omitempty"]; // Array representing original JSON path (e.g., ["user", "id"] for field "user_id")
}

message DetectedLabelsRequest {
18 changes: 18 additions & 0 deletions pkg/logql/log/labels.go
Original file line number Diff line number Diff line change
@@ -141,6 +141,7 @@ type BaseLabelsBuilder struct {
parserKeyHints ParserHint // label key hints for metric queries that allows to limit parser extractions to only this list of labels.
without, noLabels bool
referencedStructuredMetadata bool
jsonPaths map[string][]string // Maps label names to their original JSON paths

resultCache map[uint64]LabelsResult
*hasher
@@ -176,6 +177,7 @@ func NewBaseLabelsBuilderWithGrouping(groups []string, parserKeyHints ParserHint
parserKeyHints: parserKeyHints,
noLabels: noLabels,
without: without,
jsonPaths: make(map[string][]string),
}
}

@@ -413,6 +415,22 @@ func (b *LabelsBuilder) Add(category LabelCategory, labels ...labels.Label) *Lab
return b
}

// SetJSONPath sets the original JSON path parts that a label came from
func (b *LabelsBuilder) SetJSONPath(labelName string, jsonPath []string) *LabelsBuilder {
b.jsonPaths[labelName] = jsonPath
return b
}

// GetJSONPath gets the original JSON path parts for a given label if available
func (b *LabelsBuilder) GetJSONPath(labelName string) []string {
path, ok := b.jsonPaths[labelName]
if !ok {
return nil
}

return path
}

// Labels returns the labels from the builder. If no modifications
// were made, the original labels are returned.
func (b *LabelsBuilder) labels(categories ...LabelCategory) labels.Labels {
2 changes: 1 addition & 1 deletion pkg/logql/log/labels_test.go
Original file line number Diff line number Diff line change
@@ -489,7 +489,7 @@ func BenchmarkStreamLineSampleExtractor_Process(b *testing.B) {
matcher := labels.MustNewMatcher(labels.MatchEqual, "level", "info")
filter := NewStringLabelFilter(matcher)
stages := []Stage{
NewJSONParser(),
NewJSONParser(false),
filter,
}
ex, err := NewLineSampleExtractor(CountExtractor, stages, []string{}, false, false)
2 changes: 1 addition & 1 deletion pkg/logql/log/metrics_extraction_test.go
Original file line number Diff line number Diff line change
@@ -265,7 +265,7 @@ func Test_labelSampleExtractor_Extract(t *testing.T) {
}

func Test_Extract_ExpectedLabels(t *testing.T) {
ex := mustSampleExtractor(LabelExtractorWithStages("duration", ConvertDuration, []string{"foo"}, false, false, []Stage{NewJSONParser()}, NoopStage))
ex := mustSampleExtractor(LabelExtractorWithStages("duration", ConvertDuration, []string{"foo"}, false, false, []Stage{NewJSONParser(false)}, NoopStage))

f, lbs, ok := ex.ForStream(labels.FromStrings("bar", "foo")).ProcessString(0, `{"duration":"20ms","foo":"json"}`)
require.True(t, ok)
91 changes: 68 additions & 23 deletions pkg/logql/log/parser.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ import (
"fmt"
"strings"
"unicode/utf8"
"unsafe"

"github.com/grafana/jsonparser"

@@ -51,18 +52,22 @@ var (
)

type JSONParser struct {
prefixBuffer []byte // buffer used to build json keys
lbs *LabelsBuilder
prefixBuffer [][]byte // buffer used to build json keys
lbs *LabelsBuilder
captureJSONPath bool

keys internedStringSet
parserHints ParserHint
keys internedStringSet
parserHints ParserHint
sanitizedPrefixBuffer []byte
}

// NewJSONParser creates a log stage that can parse a json log line and add properties as labels.
func NewJSONParser() *JSONParser {
func NewJSONParser(captureJSONPath bool) *JSONParser {
return &JSONParser{
prefixBuffer: make([]byte, 0, 1024),
keys: internedStringSet{},
prefixBuffer: [][]byte{},
keys: internedStringSet{},
captureJSONPath: captureJSONPath,
sanitizedPrefixBuffer: make([]byte, 0, 64),
}
}

@@ -121,18 +126,18 @@ func (j *JSONParser) parseObject(key, value []byte, dataType jsonparser.ValueTyp

// nextKeyPrefix load the next prefix in the buffer and tells if it should be processed based on hints.
func (j *JSONParser) nextKeyPrefix(key []byte) bool {
// first add the spacer if needed.
if len(j.prefixBuffer) != 0 {
j.prefixBuffer = append(j.prefixBuffer, byte(jsonSpacer))
}
j.prefixBuffer = appendSanitized(j.prefixBuffer, key)
return j.lbs.ParserLabelHints().ShouldExtractPrefix(unsafeGetString(j.prefixBuffer))
j.prefixBuffer = append(j.prefixBuffer, key)

sanitized := j.buildSanitizedPrefixFromBuffer()
return j.lbs.ParserLabelHints().ShouldExtractPrefix(
string(sanitized),
)
}

func (j *JSONParser) parseLabelValue(key, value []byte, dataType jsonparser.ValueType) error {
// the first time we use the field as label key.
if len(j.prefixBuffer) == 0 {
key, ok := j.keys.Get(key, func() (string, bool) {
sanitizedKey, ok := j.keys.Get(key, func() (string, bool) {
field := sanitizeLabelKey(string(key), true)
if j.lbs.BaseHas(field) {
field = field + duplicateSuffix
@@ -145,8 +150,12 @@ func (j *JSONParser) parseLabelValue(key, value []byte, dataType jsonparser.Valu
if !ok {
return nil
}
j.lbs.Set(ParsedLabel, key, readValue(value, dataType))
if !j.parserHints.ShouldContinueParsingLine(key, j.lbs) {
j.lbs.Set(ParsedLabel, sanitizedKey, readValue(value, dataType))
if j.captureJSONPath {
j.lbs.SetJSONPath(sanitizedKey, []string{string(key)})
}

if !j.parserHints.ShouldContinueParsingLine(sanitizedKey, j.lbs) {
return errLabelDoesNotMatch
}
return nil
@@ -156,32 +165,68 @@ func (j *JSONParser) parseLabelValue(key, value []byte, dataType jsonparser.Valu

// snapshot the current prefix position
prefixLen := len(j.prefixBuffer)
j.prefixBuffer = append(j.prefixBuffer, byte(jsonSpacer))
j.prefixBuffer = appendSanitized(j.prefixBuffer, key)
keyString, ok := j.keys.Get(j.prefixBuffer, func() (string, bool) {
if j.lbs.BaseHas(string(j.prefixBuffer)) {
j.prefixBuffer = append(j.prefixBuffer, duplicateSuffix...)
j.prefixBuffer = append(j.prefixBuffer, key)

sanitized := j.buildSanitizedPrefixFromBuffer()
keyString, ok := j.keys.Get(sanitized, func() (string, bool) {
if j.lbs.BaseHas(string(sanitized)) {
j.prefixBuffer[prefixLen] = append(key, duplicateSuffix...)
}
if !j.parserHints.ShouldExtract(string(j.prefixBuffer)) {

keyPrefix := j.buildSanitizedPrefixFromBuffer()
if !j.parserHints.ShouldExtract(string(keyPrefix)) {
return "", false
}

return string(j.prefixBuffer), true
return string(keyPrefix), true
})

if j.captureJSONPath {
jsonPath := j.buildJSONPathFromPrefixBuffer()
j.lbs.SetJSONPath(keyString, jsonPath)
}

// reset the prefix position
j.prefixBuffer = j.prefixBuffer[:prefixLen]
if !ok {
return nil
}

j.lbs.Set(ParsedLabel, keyString, readValue(value, dataType))

if !j.parserHints.ShouldContinueParsingLine(keyString, j.lbs) {
return errLabelDoesNotMatch
}
return nil
}

func (j *JSONParser) buildSanitizedPrefixFromBuffer() []byte {
j.sanitizedPrefixBuffer = j.sanitizedPrefixBuffer[:0]

for i, part := range j.prefixBuffer {
if len(bytes.TrimSpace(part)) == 0 {
continue
}

if i > 0 && len(j.sanitizedPrefixBuffer) > 0 {
j.sanitizedPrefixBuffer = append(j.sanitizedPrefixBuffer, byte(jsonSpacer))
}
j.sanitizedPrefixBuffer = appendSanitized(j.sanitizedPrefixBuffer, part)
}

return j.sanitizedPrefixBuffer
}

func (j *JSONParser) buildJSONPathFromPrefixBuffer() []string {
jsonPath := make([]string, 0, len(j.prefixBuffer))
for _, part := range j.prefixBuffer {
partStr := unsafe.String(unsafe.SliceData(part), len(part)) // #nosec G103 -- we know the string is not mutated
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the parts of the path will not change, so I've optimized this to use unsafe.String, which saves ~20% in terms of allocations.

jsonPath = append(jsonPath, partStr)
}

return jsonPath
}

func (j *JSONParser) RequiredLabelNames() []string { return []string{} }

func readValue(v []byte, dataType jsonparser.ValueType) string {
239 changes: 226 additions & 13 deletions pkg/logql/log/parser_test.go
Original file line number Diff line number Diff line change
@@ -13,11 +13,12 @@ import (

func Test_jsonParser_Parse(t *testing.T) {
tests := []struct {
name string
line []byte
lbs labels.Labels
want labels.Labels
hints ParserHint
name string
line []byte
lbs labels.Labels
want labels.Labels
wantJSONPath map[string][]string
hints ParserHint
}{
{
"multi depth",
@@ -28,43 +29,64 @@ func Test_jsonParser_Parse(t *testing.T) {
"pod_uuid", "foo",
"pod_deployment_ref", "foobar",
),
map[string][]string{
"app": {"app"},
"namespace": {"namespace"},
"pod_uuid": {"pod", "uuid"},
"pod_deployment_ref": {"pod", "deployment", "ref"},
},
NoParserHints(),
},
{
"numeric",
{"numeric",
[]byte(`{"counter":1, "price": {"_net_":5.56909}}`),
labels.EmptyLabels(),
labels.FromStrings("counter", "1",
"price__net_", "5.56909",
),
map[string][]string{
"counter": {"counter"},
"price__net_": {"price", "_net_"},
},
NoParserHints(),
},
{
"whitespace key value",
[]byte(`{" ": {"foo":"bar"}}`),
labels.EmptyLabels(),
labels.FromStrings("foo", "bar"),
map[string][]string{
"foo": {" ", "foo"},
},
NoParserHints(),
},
{
"whitespace key and whitespace subkey values",
[]byte(`{" ": {" ":"bar"}}`),
labels.EmptyLabels(),
labels.FromStrings("", "bar"),
map[string][]string{
"": {" ", " "},
},
NoParserHints(),
},
{
"whitespace key and empty subkey values",
[]byte(`{" ": {"":"bar"}}`),
labels.EmptyLabels(),
labels.FromStrings("", "bar"),
map[string][]string{
"": {" ", ""},
},
NoParserHints(),
},
{
"empty key and empty subkey values",
[]byte(`{"": {"":"bar"}}`),
labels.EmptyLabels(),
labels.FromStrings("", "bar"),
map[string][]string{
"": {"", ""},
},
NoParserHints(),
},
{
@@ -75,6 +97,11 @@ func Test_jsonParser_Parse(t *testing.T) {
"price__net_", "5.56909",
"foo", `foo\"bar`,
),
map[string][]string{
"counter": {"counter"},
"price__net_": {"price", "_net_"},
"foo": {"foo"},
},
NoParserHints(),
},
{
@@ -85,20 +112,41 @@ func Test_jsonParser_Parse(t *testing.T) {
"price__net_", "5.56909",
"foo", " ",
),
map[string][]string{
"counter": {"counter"},
"price__net_": {"price", "_net_"},
"foo": {"foo"},
},
NoParserHints(),
},
{
"skip arrays",
[]byte(`{"counter":1, "price": {"net_":["10","20"]}}`),
labels.EmptyLabels(),
labels.FromStrings("counter", "1"),
map[string][]string{
"counter": {"counter"},
},
NoParserHints(),
},
{
"bad key replaced",
[]byte(`{"cou-nter":1}`),
labels.EmptyLabels(),
labels.FromStrings("cou_nter", "1"),
map[string][]string{
"cou_nter": {"cou-nter"},
},
NoParserHints(),
},
{
"nested bad key replaced",
[]byte(`{"foo":{"cou-nter":1}}"`),
labels.EmptyLabels(),
labels.FromStrings("foo_cou_nter", "1"),
map[string][]string{
"foo_cou_nter": {"foo", "cou-nter"},
},
NoParserHints(),
},
{
@@ -108,6 +156,7 @@ func Test_jsonParser_Parse(t *testing.T) {
labels.FromStrings("__error__", "JSONParserErr",
"__error_details__", "Value looks like object, but can't find closing '}' symbol",
),
map[string][]string{},
NoParserHints(),
},
{
@@ -118,6 +167,7 @@ func Test_jsonParser_Parse(t *testing.T) {
"__error_details__", "Value looks like object, but can't find closing '}' symbol",
"__preserve_error__", "true",
),
map[string][]string{},
NewParserHint([]string{"__error__"}, nil, false, true, "", nil),
},
{
@@ -131,16 +181,30 @@ func Test_jsonParser_Parse(t *testing.T) {
"next_err", "false",
"pod_deployment_ref", "foobar",
),
map[string][]string{
"app_extracted": {"app"},
"namespace": {"namespace"},
"pod_uuid": {"pod", "uuid"},
"next_err": {"next", "err"},
"pod_deployment_ref": {"pod", "deployment", "ref"},
},
NoParserHints(),
},
}
for _, tt := range tests {
j := NewJSONParser()
j := NewJSONParser(true)
t.Run(tt.name, func(t *testing.T) {
b := NewBaseLabelsBuilderWithGrouping(nil, tt.hints, false, false).ForLabels(tt.lbs, tt.lbs.Hash())
b.Reset()
_, _ = j.Process(0, tt.line, b)
require.Equal(t, tt.want, b.LabelsResult().Labels())

// Check JSON paths if provided
if len(tt.wantJSONPath) > 0 {
for k, parts := range tt.wantJSONPath {
require.Equal(t, parts, b.GetJSONPath(k), "incorrect json path parts for key %s", k)
}
}
})
}
}
@@ -164,7 +228,7 @@ func TestKeyShortCircuit(t *testing.T) {
p Stage
LabelFilterParseHint *labels.Matcher
}{
{"json", jsonLine, NewJSONParser(), labels.MustNewMatcher(labels.MatchEqual, "response_latency_seconds", "nope")},
{"json", jsonLine, NewJSONParser(false), labels.MustNewMatcher(labels.MatchEqual, "response_latency_seconds", "nope")},
{"unpack", packedLike, NewUnpackParser(), labels.MustNewMatcher(labels.MatchEqual, "pod", "nope")},
{"logfmt", logfmtLine, NewLogfmtParser(false, false), labels.MustNewMatcher(labels.MatchEqual, "info", "nope")},
{"regex greedy", nginxline, mustStage(NewRegexpParser(`GET (?P<path>.*?)/\?`)), labels.MustNewMatcher(labels.MatchEqual, "path", "nope")},
@@ -205,7 +269,7 @@ func TestLabelShortCircuit(t *testing.T) {
p Stage
line []byte
}{
{"json", NewJSONParser(), simpleJsn},
{"json", NewJSONParser(false), simpleJsn},
{"logfmt", NewLogfmtParser(false, false), logFmt},
{"logfmt-expression", mustStage(NewLogfmtExpressionParser([]LabelExtractionExpr{NewLabelExtractionExpr("name", "name")}, false)), logFmt},
}
@@ -651,8 +715,8 @@ func Benchmark_Parser(b *testing.B) {
LabelParseHints []string // hints to reduce label extractions.
LabelFilterParseHint *labels.Matcher
}{
{"json", jsonLine, NewJSONParser(), []string{"response_latency_seconds"}, labels.MustNewMatcher(labels.MatchEqual, "the_real_ip", "nope")},
{"jsonParser-not json line", nginxline, NewJSONParser(), []string{"response_latency_seconds"}, labels.MustNewMatcher(labels.MatchEqual, "the_real_ip", "nope")},
{"json", jsonLine, NewJSONParser(false), []string{"response_latency_seconds"}, labels.MustNewMatcher(labels.MatchEqual, "the_real_ip", "nope")},
{"jsonParser-not json line", nginxline, NewJSONParser(false), []string{"response_latency_seconds"}, labels.MustNewMatcher(labels.MatchEqual, "the_real_ip", "nope")},
{"unpack", packedLike, NewUnpackParser(), []string{"pod"}, labels.MustNewMatcher(labels.MatchEqual, "app", "nope")},
{"unpack-not json line", nginxline, NewUnpackParser(), []string{"pod"}, labels.MustNewMatcher(labels.MatchEqual, "app", "nope")},
{"logfmt", logfmtLine, NewLogfmtParser(false, false), []string{"info", "throughput", "org_id"}, labels.MustNewMatcher(labels.MatchEqual, "latency", "nope")},
@@ -699,6 +763,155 @@ func Benchmark_Parser(b *testing.B) {
}
}

func Benchmark_Parser_JSONPath(b *testing.B) {
lbs := labels.FromStrings("cluster", "qa-us-central1",
"namespace", "qa",
"filename", "/var/log/pods/ingress-nginx_nginx-ingress-controller-7745855568-blq6t_1f8962ef-f858-4188-a573-ba276a3cacc3/ingress-nginx/0.log",
"job", "ingress-nginx/nginx-ingress-controller",
"name", "nginx-ingress-controller",
"pod", "nginx-ingress-controller-7745855568-blq6t",
"pod_template_hash", "7745855568",
"stream", "stdout",
)

jsonLine := `{
"invalid": "a\\xc5z",
"proxy_protocol_addr": "",
"remote_addr": "3.112.221.14",
"remote_user": "",
"upstream_addr": "10.12.15.234:5000",
"the_real_ip": "3.112.221.14",
"timestamp": "2020-12-11T16:20:07+00:00",
"protocol": "HTTP/1.1",
"upstream_name": "hosted-grafana-hosted-grafana-api-80",
"request": {
"id": "c8eacb6053552c0cd1ae443bc660e140",
"time": "0.001",
"method": "GET",
"host": "hg-api-qa-us-central1.grafana.net",
"uri": "/",
"size" : "128",
"user_agent":"worldping-api-",
"referer": ""
},
"response": {
"status": 200,
"upstream_status": "200",
"size": "1155",
"size_sent": "265",
"latency_seconds": "0.001"
}
}`
for _, tt := range []struct {
name string
line string
s Stage
LabelParseHints []string // hints to reduce label extractions.
LabelFilterParseHint *labels.Matcher
}{
{"json", jsonLine, NewJSONParser(true), []string{"response_latency_seconds"}, labels.MustNewMatcher(labels.MatchEqual, "the_real_ip", "nope")},
} {
b.Run(tt.name, func(b *testing.B) {
line := []byte(tt.line)
b.Run("no labels hints", func(b *testing.B) {
b.ReportAllocs()
builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash())
for n := 0; n < b.N; n++ {
builder.Reset()
_, _ = tt.s.Process(0, line, builder)
builder.LabelsResult()
}
expectedJSONPath := map[string][]string{
"invalid": {"invalid"},
"proxy_protocol_addr": {"proxy_protocol_addr"},
"remote_addr": {"remote_addr"},
"remote_user": {"remote_user"},
"upstream_addr": {"upstream_addr"},
"the_real_ip": {"the_real_ip"},
"timestamp": {"timestamp"},
"protocol": {"protocol"},
"upstream_name": {"upstream_name"},
"request_id": {"request", "id"},
"request_time": {"request", "time"},
"request_method": {"request", "method"},
"request_host": {"request", "host"},
"request_uri": {"request", "uri"},
"request_size": {"request", "size"},
"request_user_agent": {"request", "user_agent"},
"request_referer": {"request", "referer"},
"response_status": {"response", "status"},
"response_upstream_status": {"response", "upstream_status"},
"response_size": {"response", "size"},
"response_size_sent": {"response", "size_sent"},
"response_latency_seconds": {"response", "latency_seconds"},
}

for k, parts := range expectedJSONPath {
require.Equal(b, parts, builder.GetJSONPath(k), "incorrect json path parts for key %s", k)
}
})

b.Run("labels hints", func(b *testing.B) {
b.ReportAllocs()
builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash())
builder.parserKeyHints = NewParserHint(tt.LabelParseHints, tt.LabelParseHints, false, false, "", nil)

for n := 0; n < b.N; n++ {
builder.Reset()
_, _ = tt.s.Process(0, line, builder)
builder.LabelsResult()
}

expectedJSONPath := map[string][]string{
"proxy_protocol_addr": {"proxy_protocol_addr"},
"remote_addr": {"remote_addr"},
"remote_user": {"remote_user"},
"upstream_addr": {"upstream_addr"},
"the_real_ip": {"the_real_ip"},
"protocol": {"protocol"},
"upstream_name": {"upstream_name"},
"response_status": {"response", "status"},
"invalid": {"invalid"},
"timestamp": {"timestamp"},
"response_upstream_status": {"response", "upstream_status"},
"response_size": {"response", "size"},
"response_size_sent": {"response", "size_sent"},
"response_latency_seconds": {"response", "latency_seconds"},
}

for k, parts := range expectedJSONPath {
require.Equal(b, parts, builder.GetJSONPath(k), "incorrect json path parts for key %s", k)
}
})

b.Run("inline stages", func(b *testing.B) {
b.ReportAllocs()
stages := []Stage{NewStringLabelFilter(tt.LabelFilterParseHint)}
builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash())
builder.parserKeyHints = NewParserHint(nil, nil, false, false, ", nil", stages)
for n := 0; n < b.N; n++ {
builder.Reset()
_, _ = tt.s.Process(0, line, builder)
builder.LabelsResult()
}

expectedJSONPath := map[string][]string{
"invalid": {"invalid"},
"proxy_protocol_addr": {"proxy_protocol_addr"},
"remote_addr": {"remote_addr"},
"remote_user": {"remote_user"},
"upstream_addr": {"upstream_addr"},
"the_real_ip": {"the_real_ip"},
}

for k, parts := range expectedJSONPath {
require.Equal(b, parts, builder.GetJSONPath(k), "incorrect json path parts for key %s", k)
}
})
})
}
}

func BenchmarkKeyExtraction(b *testing.B) {
simpleJsn := []byte(`{
"data": "Click Here",
@@ -720,7 +933,7 @@ func BenchmarkKeyExtraction(b *testing.B) {
p Stage
line []byte
}{
{"json", NewJSONParser(), simpleJsn},
{"json", NewJSONParser(false), simpleJsn},
{"logfmt", NewLogfmtParser(false, false), logFmt},
{"logfmt-expression", mustStage(NewLogfmtExpressionParser([]LabelExtractionExpr{NewLabelExtractionExpr("name", "name")}, false)), logFmt},
}
10 changes: 5 additions & 5 deletions pkg/logql/log/pipeline_test.go
Original file line number Diff line number Diff line change
@@ -327,7 +327,7 @@ func TestDropLabelsPipeline(t *testing.T) {
"drop __error__",
[]Stage{
NewLogfmtParser(true, false),
NewJSONParser(),
NewJSONParser(false),
NewDropLabels([]NamedLabelMatcher{
{
nil,
@@ -364,7 +364,7 @@ func TestDropLabelsPipeline(t *testing.T) {
"drop __error__ with matching value",
[]Stage{
NewLogfmtParser(true, false),
NewJSONParser(),
NewJSONParser(false),
NewDropLabels([]NamedLabelMatcher{
{
labels.MustNewMatcher(labels.MatchEqual, logqlmodel.ErrorLabel, errLogfmt),
@@ -593,7 +593,7 @@ func Benchmark_Pipeline(b *testing.B) {
NewNumericLabelFilter(LabelFilterEqual, "status", 200.0),
),
mustNewLabelsFormatter([]LabelFmt{NewRenameLabelFmt("caller_foo", "caller"), NewTemplateLabelFmt("new", "{{.query_type}}:{{.range_type}}")}),
NewJSONParser(),
NewJSONParser(false),
NewStringLabelFilter(labels.MustNewMatcher(labels.MatchEqual, logqlmodel.ErrorLabel, errJSON)),
newMustLineFormatter("Q=>{{.query}},D=>{{.duration}}"),
}
@@ -735,11 +735,11 @@ func invalidJSONBenchmark(b *testing.B, parser Stage) {
}

func BenchmarkJSONParser(b *testing.B) {
jsonBenchmark(b, NewJSONParser())
jsonBenchmark(b, NewJSONParser(false))
}

func BenchmarkJSONParserInvalidLine(b *testing.B) {
invalidJSONBenchmark(b, NewJSONParser())
invalidJSONBenchmark(b, NewJSONParser(false))
}

func BenchmarkJSONExpressionParser(b *testing.B) {
2 changes: 1 addition & 1 deletion pkg/logql/syntax/ast.go
Original file line number Diff line number Diff line change
@@ -752,7 +752,7 @@ func (e *LineParserExpr) Accept(v RootVisitor) { v.VisitLabelParser(e) }
func (e *LineParserExpr) Stage() (log.Stage, error) {
switch e.Op {
case OpParserTypeJSON:
return log.NewJSONParser(), nil
return log.NewJSONParser(false), nil
case OpParserTypeRegexp:
return log.NewRegexpParser(e.Param)
case OpParserTypeUnpack:
2 changes: 1 addition & 1 deletion pkg/logql/syntax/ast_test.go
Original file line number Diff line number Diff line change
@@ -879,7 +879,7 @@ func Test_parserExpr_Parser(t *testing.T) {
wantErr bool
wantPanic bool
}{
{"json", OpParserTypeJSON, "", log.NewJSONParser(), false, false},
{"json", OpParserTypeJSON, "", log.NewJSONParser(false), false, false},
{"unpack", OpParserTypeUnpack, "", log.NewUnpackParser(), false, false},
{"pattern", OpParserTypePattern, "<foo> bar <buzz>", mustNewPatternParser("<foo> bar <buzz>"), false, false},
{"pattern err", OpParserTypePattern, "bar", nil, true, true},
2 changes: 1 addition & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
@@ -1130,7 +1130,7 @@ func parseEntry(entry push.Entry, lbls *logql_log.LabelsBuilder) (map[string][]s

line := entry.Line
parser := "json"
jsonParser := logql_log.NewJSONParser()
jsonParser := logql_log.NewJSONParser(true)
_, jsonSuccess := jsonParser.Process(0, []byte(line), lbls)
if !jsonSuccess || lbls.HasErr() {
lbls.Reset()
18 changes: 10 additions & 8 deletions pkg/querier/queryrange/detected_fields.go
Original file line number Diff line number Diff line change
@@ -68,6 +68,7 @@ func NewDetectedFieldsHandler(
Type: v.fieldType,
Cardinality: v.Estimate(),
Parsers: p,
JsonPath: v.jsonPath,
}

fieldCount++
@@ -226,20 +227,15 @@ type parsedFields struct {
sketch *hyperloglog.Sketch
fieldType logproto.DetectedFieldType
parsers []string
jsonPath []string // Original JSON path as an array of components (e.g., ["user", "id"] for field "user_id")
}

func newParsedFields(parsers []string) *parsedFields {
return &parsedFields{
sketch: hyperloglog.New(),
fieldType: logproto.DetectedFieldString,
parsers: parsers,
}
}

func newParsedLabels() *parsedFields {
return &parsedFields{
sketch: hyperloglog.New(),
fieldType: logproto.DetectedFieldString,
jsonPath: nil,
}
}

@@ -341,6 +337,12 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*p
}
}

// If we parsed with JSON, check for a JSON path
if slices.Contains(parsers, "json") {
// Get the JSON path if it exists
df.jsonPath = entryLbls.GetJSONPath(k)
}

detectType := true
for _, v := range vals {
parsedFields := detectedFields[k]
@@ -404,7 +406,7 @@ func parseEntry(entry push.Entry, lbls *logql_log.LabelsBuilder) (map[string][]s

line := entry.Line
parser := "json"
jsonParser := logql_log.NewJSONParser()
jsonParser := logql_log.NewJSONParser(true)
_, jsonSuccess := jsonParser.Process(0, []byte(line), lblBuilder)
if !jsonSuccess || lblBuilder.HasErr() {
lblBuilder.Reset()
180 changes: 180 additions & 0 deletions pkg/querier/queryrange/detected_fields_test.go
Original file line number Diff line number Diff line change
@@ -993,6 +993,7 @@ func logHandler(stream logproto.Stream) base.Handler {
})
}

// TODO(twhitney): Is this releated to the now deprecated Querier endpoint?
func TestQuerier_DetectedFields(t *testing.T) {
limits := fakeLimits{
maxSeries: math.MaxInt32,
@@ -1446,3 +1447,182 @@ func BenchmarkQuerierDetectedFields(b *testing.B) {
require.True(b, ok)
}
}

func TestNestedJSONFieldDetection(t *testing.T) {
t.Run("correctly detects nested JSON fields", func(t *testing.T) {
now := time.Now()

nestedJSONLines := []push.Entry{
{
Timestamp: now,
Line: `{
"user":{
"id":123,
"name":"alice",
"settings":{
"theme":"dark",
"notifications":true
}
},
"app":{
"version":"1.0",
"metrics":{
"cpu":45.6,
"memory":"1.5GB"
}
}
}`,
StructuredMetadata: []push.LabelAdapter{},
},
{
Timestamp: now,
Line: `{
"user":{
"id":456,
"name":"bob",
"settings":{
"theme":"light",
"notifications":false
}
},
"app":{
"version":"1.0",
"metrics":{
"cpu":32.1,
"memory":"2.0GB"
}
}
}`,
StructuredMetadata: []push.LabelAdapter{},
},
}

nestedJSONLbls := `{cluster="test-cluster", job="json-test"}`
nestedJSONMetric, err := parser.ParseMetric(nestedJSONLbls)
require.NoError(t, err)

nestedJSONStream := push.Stream{
Labels: nestedJSONLbls,
Entries: nestedJSONLines,
Hash: nestedJSONMetric.Hash(),
}

df := parseDetectedFields(uint32(20), logqlmodel.Streams([]push.Stream{nestedJSONStream}))

// Test for nested fields
expectedNestedFieldTypes := map[string]logproto.DetectedFieldType{
"user_id": logproto.DetectedFieldInt,
"user_name": logproto.DetectedFieldString,
"user_settings_theme": logproto.DetectedFieldString,
"user_settings_notifications": logproto.DetectedFieldBoolean,
"app_version": logproto.DetectedFieldFloat,
"app_metrics_cpu": logproto.DetectedFieldFloat,
"app_metrics_memory": logproto.DetectedFieldBytes,
}

expectFieldsToPaths := map[string][]string{
"user_id": {"user", "id"},
"user_name": {"user", "name"},
"user_settings_theme": {"user", "settings", "theme"},
"user_settings_notifications": {"user", "settings", "notifications"},
"app_version": {"app", "version"},
"app_metrics_cpu": {"app", "metrics", "cpu"},
"app_metrics_memory": {"app", "metrics", "memory"},
}

for field, expectedType := range expectedNestedFieldTypes {
require.Contains(t, df, field, "Missing expected nested field: %s", field)
require.Equal(t, expectedType, df[field].fieldType, "Wrong type for field %s", field)
}

for field, expectedPath := range expectFieldsToPaths {
require.Contains(t, df, field, "Missing expected nested field: %s", field)
require.Equal(t, expectedPath, df[field].jsonPath, "Wrong json path for field %s", field)
}
})

t.Run("correctly detects sanitized JSON fields, including difficult keys", func(t *testing.T) {
now := time.Now()

nestedJSONLines := []push.Entry{
{
Timestamp: now,
Line: `{
"user":{
"id":123,
"name":"alice",
"settings":{
"theme":"dark",
"notifications":true,
}
},
"app-id": "abc",
"app_name": "foo",
"app": {
"terrible/key/name": "four",
},
"other.bad.key.name": "three",
"key with spaces": "space",
"nested key with spaces": {
"nest": "thermostat",
}
}`,
StructuredMetadata: []push.LabelAdapter{},
},
{
Timestamp: now,
Line: `{
"user":{
"id":456,
"name":"bob",
"settings":{
"theme":"light",
"notifications":false
},
},
"app-id": "xyz",
"app_name": "bar",
"app": {
"terrible/key/name": "four",
},
"other.bad.key.name": "five",
"key with spaces": "blank",
"nested key with spaces": {
"nest": "protect",
}
}`,
StructuredMetadata: []push.LabelAdapter{},
},
}

nestedJSONLbls := `{cluster="test-cluster", job="json-test"}`
nestedJSONMetric, err := parser.ParseMetric(nestedJSONLbls)
require.NoError(t, err)

nestedJSONStream := push.Stream{
Labels: nestedJSONLbls,
Entries: nestedJSONLines,
Hash: nestedJSONMetric.Hash(),
}

df := parseDetectedFields(uint32(20), logqlmodel.Streams([]push.Stream{nestedJSONStream}))

expectFieldsToPaths := map[string][]string{
"user_id": {"user", "id"},
"user_name": {"user", "name"},
"user_settings_theme": {"user", "settings", "theme"},
"user_settings_notifications": {"user", "settings", "notifications"},
"app_id": {"app-id"},
"app_name": {"app_name"},
"app_terrible_key_name": {"app", "terrible/key/name"},
"other_bad_key_name": {"other.bad.key.name"},
"key_with_spaces": {"key with spaces"},
"nested_key_with_spaces_nest": {"nested key with spaces", "nest"},
}

for field, expectedPath := range expectFieldsToPaths {
require.Contains(t, df, field, "Missing expected nested field: %s", field)
require.Equal(t, expectedPath, df[field].jsonPath, "Wrong json path for field %s", field)
}
})
}