Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/elasticsearch] Support for profiles export #37567

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
61b2441
bootstrap profiles export
dmathieu Jan 23, 2025
47cd96b
parse multiple json docs
dmathieu Jan 28, 2025
d9d3ecc
introduce profiles transform
dmathieu Jan 28, 2025
6a9ff93
add tests for the transform method
dmathieu Jan 29, 2025
26bfe92
Merge branch 'main' into elasticsearch-profiles
dmathieu Jan 29, 2025
bd8ad31
fix go version/toolchain
dmathieu Jan 29, 2025
fb15046
add benchmark
dmathieu Jan 30, 2025
8c10496
upgrade ebpf-profiler
dmathieu Jan 30, 2025
a0d06b9
Merge branch 'main' into elasticsearch-profiles
dmathieu Jan 30, 2025
3cd4c32
add changelog entry
dmathieu Jan 30, 2025
50c7270
fix exporterhelper version
dmathieu Jan 30, 2025
37b2084
rename callback to pushData
dmathieu Jan 30, 2025
aac1e33
fix SerializeProfile comment
dmathieu Jan 30, 2025
88c1268
push to the all events index
dmathieu Jan 30, 2025
280c04d
fix go.mod again
dmathieu Jan 30, 2025
0729b43
Update exporter/elasticsearchexporter/internal/serializer/otelseriali…
dmathieu Jan 30, 2025
a915fbd
fix tests with removed fallback frame type
dmathieu Jan 30, 2025
3e0d86b
fix core versions
dmathieu Jan 30, 2025
e28c595
fix toolchain again
dmathieu Jan 30, 2025
f21e659
tidy integrationtests
dmathieu Jan 30, 2025
3e77ddb
Merge branch 'main' into elasticsearch-profiles
dmathieu Jan 31, 2025
5a1ed12
try keeping go version and toolchain
dmathieu Jan 31, 2025
94d1b32
fix versions
dmathieu Jan 31, 2025
306fe2d
upgrade ebpf-profiler
dmathieu Jan 31, 2025
f67901b
Merge branch 'main' into elasticsearch-profiles
dmathieu Jan 31, 2025
81e972e
Merge branch 'main' into elasticsearch-profiles
dmathieu Feb 3, 2025
21c8b07
use dynamic start of week
dmathieu Feb 3, 2025
7e4c20e
Update exporter/elasticsearchexporter/exporter.go
dmathieu Feb 4, 2025
7598c5d
remove unused ProfilesDynamicIndex
dmathieu Feb 4, 2025
86f6f97
Merge branch 'main' into elasticsearch-profiles
dmathieu Feb 4, 2025
65abc70
remove unused ProfilesIndex config
dmathieu Feb 4, 2025
b35d599
Merge branch 'main' into elasticsearch-profiles
dmathieu Feb 5, 2025
ed02380
Update exporter/elasticsearchexporter/internal/serializer/otelseriali…
dmathieu Feb 5, 2025
b644b8b
Merge branch 'main' into elasticsearch-profiles
dmathieu Feb 6, 2025
150ead6
Merge branch 'main' into elasticsearch-profiles
dmathieu Feb 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/elasticsearch-profiles.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add profiles support to elasticsearch exporter

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37567]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 1 addition & 1 deletion exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [development]: metrics |
| Stability | [development]: metrics, profiles |
| | [beta]: traces, logs |
| Distributions | [contrib] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aexporter%2Felasticsearch%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aexporter%2Felasticsearch) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aexporter%2Felasticsearch%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aexporter%2Felasticsearch) |
Expand Down
21 changes: 11 additions & 10 deletions exporter/elasticsearchexporter/attribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@ import "go.opentelemetry.io/collector/pdata/pcommon"

// dynamic index attribute key constants
const (
indexPrefix = "elasticsearch.index.prefix"
indexSuffix = "elasticsearch.index.suffix"
dataStreamDataset = "data_stream.dataset"
dataStreamNamespace = "data_stream.namespace"
dataStreamType = "data_stream.type"
defaultDataStreamDataset = "generic"
defaultDataStreamNamespace = "default"
defaultDataStreamTypeLogs = "logs"
defaultDataStreamTypeMetrics = "metrics"
defaultDataStreamTypeTraces = "traces"
indexPrefix = "elasticsearch.index.prefix"
indexSuffix = "elasticsearch.index.suffix"
dataStreamDataset = "data_stream.dataset"
dataStreamNamespace = "data_stream.namespace"
dataStreamType = "data_stream.type"
defaultDataStreamDataset = "generic"
defaultDataStreamNamespace = "default"
defaultDataStreamTypeLogs = "logs"
defaultDataStreamTypeMetrics = "metrics"
defaultDataStreamTypeTraces = "traces"
defaultDataStreamTypeProfiles = "profiles"
)

func getFromAttributes(name string, defaultValue string, attributeMaps ...pcommon.Map) (string, bool) {
Expand Down
60 changes: 60 additions & 0 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -16,6 +17,7 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

Expand Down Expand Up @@ -494,3 +496,61 @@ func (e *elasticsearchExporter) extractDocumentIDAttribute(m pcommon.Map) string
}
return v.AsString()
}

func (e *elasticsearchExporter) pushProfilesData(ctx context.Context, pd pprofile.Profiles) error {
e.wg.Add(1)
defer e.wg.Done()

session, err := e.bulkIndexer.StartSession(ctx)
if err != nil {
return err
}
defer session.End()

var errs []error
rps := pd.ResourceProfiles()
for i := 0; i < rps.Len(); i++ {
rp := rps.At(i)
resource := rp.Resource()
sps := rp.ScopeProfiles()
for j := 0; j < sps.Len(); j++ {
sp := sps.At(j)
scope := sp.Scope()
p := sp.Profiles()
for k := 0; k < p.Len(); k++ {
if err := e.pushProfileRecord(ctx, resource, p.At(k), scope, session); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}

if errors.Is(err, ErrInvalidTypeForBodyMapMode) {
e.Logger.Warn("dropping profile record", zap.Error(err))
continue
}

errs = append(errs, err)
}
}
}
}

if err := session.Flush(ctx); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}
errs = append(errs, err)
}
return errors.Join(errs...)
}

func (e *elasticsearchExporter) pushProfileRecord(
ctx context.Context,
resource pcommon.Resource,
record pprofile.Profile,
scope pcommon.InstrumentationScope,
bulkIndexerSession bulkIndexerSession,
) error {
return e.model.encodeProfile(resource, scope, record, func(buf *bytes.Buffer, docID, index string) error {
return bulkIndexerSession.Add(ctx, index, docID, buf, nil)
dmathieu marked this conversation as resolved.
Show resolved Hide resolved
})
}
34 changes: 30 additions & 4 deletions exporter/elasticsearchexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/exporter/exporterhelper/xexporterhelper"
"go.opentelemetry.io/collector/exporter/xexporter"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/metadata"
)
Expand All @@ -30,12 +32,13 @@ const (

// NewFactory creates a factory for Elastic exporter.
func NewFactory() exporter.Factory {
return exporter.NewFactory(
return xexporter.NewFactory(
metadata.Type,
createDefaultConfig,
exporter.WithLogs(createLogsExporter, metadata.LogsStability),
exporter.WithMetrics(createMetricsExporter, metadata.MetricsStability),
exporter.WithTraces(createTracesExporter, metadata.TracesStability),
xexporter.WithLogs(createLogsExporter, metadata.LogsStability),
xexporter.WithMetrics(createMetricsExporter, metadata.MetricsStability),
xexporter.WithTraces(createTracesExporter, metadata.TracesStability),
xexporter.WithProfiles(createProfilesExporter, metadata.ProfilesStability),
)
}

Expand Down Expand Up @@ -163,6 +166,29 @@ func createTracesExporter(ctx context.Context,
)
}

// createProfilesExporter creates a new exporter for profiles.
//
// Profiles are directly indexed into Elasticsearch.
func createProfilesExporter(
ctx context.Context,
set exporter.Settings,
cfg component.Config,
) (xexporter.Profiles, error) {
cf := cfg.(*Config)

handleDeprecatedConfig(cf, set.Logger)

exporter := newExporter(cf, set, "", false)

return xexporterhelper.NewProfilesExporter(
ctx,
set,
cfg,
exporter.pushProfilesData,
exporterhelperOptions(cf, exporter.Start, exporter.Shutdown)...,
)
}

func exporterhelperOptions(
cfg *Config,
start component.StartFunc,
Expand Down
21 changes: 16 additions & 5 deletions exporter/elasticsearchexporter/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter

go 1.22.0
go 1.22.2

require (
github.com/cenkalti/backoff/v4 v4.3.0
Expand All @@ -22,19 +22,26 @@ require (
go.opentelemetry.io/collector/confmap v1.25.0
go.opentelemetry.io/collector/consumer v1.25.0
go.opentelemetry.io/collector/exporter v0.119.0
go.opentelemetry.io/collector/exporter/exporterhelper/xexporterhelper v0.119.0
go.opentelemetry.io/collector/exporter/exportertest v0.119.0
go.opentelemetry.io/collector/exporter/xexporter v0.119.0
go.opentelemetry.io/collector/extension/auth/authtest v0.119.0
go.opentelemetry.io/collector/pdata v1.25.0
go.opentelemetry.io/collector/pdata/pprofile v0.119.0
go.opentelemetry.io/collector/semconv v0.119.0
go.opentelemetry.io/ebpf-profiler v0.0.0-20250131144501-9205feec476e
go.opentelemetry.io/otel v1.34.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
)

require (
github.com/armon/go-radix v1.0.0 // indirect
github.com/cilium/ebpf v0.16.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
github.com/elastic/go-elasticsearch/v8 v8.17.0 // indirect
github.com/elastic/go-freelru v0.16.0 // indirect
github.com/elastic/go-sysinfo v1.7.1 // indirect
github.com/elastic/go-windows v1.0.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
Expand All @@ -48,9 +55,11 @@ require (
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.1.2 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand All @@ -60,8 +69,10 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rs/cors v1.11.1 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.elastic.co/apm/module/apmzap/v2 v2.6.2 // indirect
go.elastic.co/apm/v2 v2.6.2 // indirect
go.elastic.co/fastjson v1.4.0 // indirect
Expand All @@ -71,30 +82,30 @@ require (
go.opentelemetry.io/collector/config/configtelemetry v0.119.0 // indirect
go.opentelemetry.io/collector/config/configtls v1.25.0 // indirect
go.opentelemetry.io/collector/consumer/consumererror v0.119.0 // indirect
go.opentelemetry.io/collector/consumer/consumererror/xconsumererror v0.119.0 // indirect
go.opentelemetry.io/collector/consumer/consumertest v0.119.0 // indirect
go.opentelemetry.io/collector/consumer/xconsumer v0.119.0 // indirect
go.opentelemetry.io/collector/exporter/xexporter v0.119.0 // indirect
go.opentelemetry.io/collector/extension v0.119.0 // indirect
go.opentelemetry.io/collector/extension/auth v0.119.0 // indirect
go.opentelemetry.io/collector/extension/xextension v0.119.0 // indirect
go.opentelemetry.io/collector/featuregate v1.25.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.119.0 // indirect
go.opentelemetry.io/collector/pipeline v0.119.0 // indirect
go.opentelemetry.io/collector/pipeline/xpipeline v0.119.0 // indirect
go.opentelemetry.io/collector/receiver v0.119.0 // indirect
go.opentelemetry.io/collector/receiver/receivertest v0.119.0 // indirect
go.opentelemetry.io/collector/receiver/xreceiver v0.119.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 // indirect
go.opentelemetry.io/otel v1.34.0 // indirect
go.opentelemetry.io/otel/metric v1.34.0 // indirect
go.opentelemetry.io/otel/sdk v1.34.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect
go.opentelemetry.io/otel/trace v1.34.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250102185135-69823020774d // indirect
google.golang.org/grpc v1.70.0 // indirect
google.golang.org/protobuf v1.36.4 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
Loading