diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c55cda3fcf..e2e852d93a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add the new `go.opentelemetry.io/contrib/instrgen` package to provide auto-generated source code instrumentation. (#3068, #3108) - Support for stdoutlog exporter in `go.opentelemetry.io/contrib/config`. (#5850) +- Add `go.opentelemetry.io/contrib/processors/isolate` module. + This module provides an isolating log processor. (#5861) ### Removed diff --git a/CODEOWNERS b/CODEOWNERS index 077f34e7d6c..f9f5c7080f6 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -57,6 +57,7 @@ instrumentation/net/http/otelhttp/ @open-te instrumentation/runtime/ @open-telemetry/go-approvers @MadVikingGod processors/baggagecopy @open-telemetry/go-approvers @codeboten @MikeGoldsmith +processors/isolate @open-telemetry/go-approvers @pellared processors/minsev @open-telemetry/go-approvers @MrAlias propagators/autoprop/ @open-telemetry/go-approvers @MrAlias diff --git a/processors/isolate/example_test.go b/processors/isolate/example_test.go new file mode 100644 index 00000000000..4d8347485ce --- /dev/null +++ b/processors/isolate/example_test.go @@ -0,0 +1,25 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package isolate_test + +import ( + "go.opentelemetry.io/contrib/processors/isolate" + "go.opentelemetry.io/otel/sdk/log" +) + +func Example() { + // Log processing pipelines that process and emit telemetry. + var p1 log.Processor + var p2 log.Processor + var p3 log.Processor + + // Register the processors using + // isolate.NewLogProcessor and the log.WithProcessor option + // so that the log records are not shared between pipelines. + _ = log.NewLoggerProvider( + log.WithProcessor(isolate.NewLogProcessor(p1)), + log.WithProcessor(isolate.NewLogProcessor(p2)), + log.WithProcessor(isolate.NewLogProcessor(p3)), + ) +} diff --git a/processors/isolate/go.mod b/processors/isolate/go.mod new file mode 100644 index 00000000000..33c12f6bafc --- /dev/null +++ b/processors/isolate/go.mod @@ -0,0 +1,23 @@ +module go.opentelemetry.io/contrib/processors/isolate + +go 1.21 + +require ( + github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/otel/log v0.4.0 + go.opentelemetry.io/otel/sdk/log v0.4.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + go.opentelemetry.io/otel v1.28.0 // indirect + go.opentelemetry.io/otel/metric v1.28.0 // indirect + go.opentelemetry.io/otel/sdk v1.28.0 // indirect + go.opentelemetry.io/otel/trace v1.28.0 // indirect + golang.org/x/sys v0.22.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/processors/isolate/go.sum b/processors/isolate/go.sum new file mode 100644 index 00000000000..20c9445e1d9 --- /dev/null +++ b/processors/isolate/go.sum @@ -0,0 +1,33 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= +go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= +go.opentelemetry.io/otel/log v0.4.0 h1:/vZ+3Utqh18e8TPjuc3ecg284078KWrR8BRz+PQAj3o= +go.opentelemetry.io/otel/log v0.4.0/go.mod h1:DhGnQvky7pHy82MIRV43iXh3FlKN8UUKftn0KbLOq6I= +go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= +go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= +go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE= +go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg= +go.opentelemetry.io/otel/sdk/log v0.4.0 h1:1mMI22L82zLqf6KtkjrRy5BbagOTWdJsqMY/HSqILAA= +go.opentelemetry.io/otel/sdk/log v0.4.0/go.mod h1:AYJ9FVF0hNOgAVzUG/ybg/QttnXhUePWAupmCqtdESo= +go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= +go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/processors/isolate/processor.go b/processors/isolate/processor.go new file mode 100644 index 00000000000..aa68e24c8df --- /dev/null +++ b/processors/isolate/processor.go @@ -0,0 +1,60 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package isolate provides an isolating processor that can be used to +// configure independent processing pipelines. +package isolate // import "go.opentelemetry.io/contrib/processors/isolate" + +import ( + "context" + + "go.opentelemetry.io/otel/sdk/log" +) + +// NewLogProcessor returns a new [LogProcessor] that wraps the downstream +// [log.Processor]. +// +// If downstream is nil a default No-Op [log.Processor] is used. The returned +// processor will not be enabled for nor emit any records. +func NewLogProcessor(downstream log.Processor) *LogProcessor { + if downstream == nil { + downstream = defaultProcessor + } + return &LogProcessor{Processor: downstream} +} + +// LogProcessor is an [log.Processor] implementation clones the received log +// records in order to no share mutable data with subsequent registered processors. +// +// If the wrapped [log.Processor] is nil, calls to the LogProcessor methods +// will panic. +// +// Use [NewLogProcessor] to create a new LogProcessor that ensures +// no panics. +type LogProcessor struct { + log.Processor +} + +// Compile time assertion that LogProcessor implements log.Processor. +var _ log.Processor = (*LogProcessor)(nil) + +// OnEmit clones the record and calls the wrapped downstream processor. +func (p *LogProcessor) OnEmit(ctx context.Context, record log.Record) error { + record = record.Clone() + return p.Processor.OnEmit(ctx, record) +} + +// Enabled clones the record and calls the wrapped downstream processor. +func (p *LogProcessor) Enabled(ctx context.Context, record log.Record) bool { + record = record.Clone() + return p.Processor.Enabled(ctx, record) +} + +var defaultProcessor = noopProcessor{} + +type noopProcessor struct{} + +func (p noopProcessor) OnEmit(context.Context, log.Record) error { return nil } +func (p noopProcessor) Enabled(context.Context, log.Record) bool { return false } +func (p noopProcessor) Shutdown(context.Context) error { return nil } +func (p noopProcessor) ForceFlush(context.Context) error { return nil } diff --git a/processors/isolate/processor_test.go b/processors/isolate/processor_test.go new file mode 100644 index 00000000000..34b6ae81a62 --- /dev/null +++ b/processors/isolate/processor_test.go @@ -0,0 +1,140 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package isolate + +import ( + "context" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + + logapi "go.opentelemetry.io/otel/log" + "go.opentelemetry.io/otel/sdk/log" +) + +const testAttrCount = 10 + +var testCtx = context.WithValue(context.Background(), "k", "v") //nolint // Simplify for testing. + +func TestLogProcessorOnEmit(t *testing.T) { + wrapped := &processor{ReturnErr: assert.AnError} + + p := NewLogProcessor(wrapped) + + var r log.Record + for i := 0; i < testAttrCount; i++ { + r.AddAttributes(logapi.Int(strconv.Itoa(i), i)) + } + + assert.ErrorIs(t, p.OnEmit(testCtx, r), assert.AnError) + + // Assert passthrough of the arguments. + if assert.Len(t, wrapped.OnEmitCalls, 1) { + assert.Equal(t, testCtx, wrapped.OnEmitCalls[0].Ctx) + assert.Equal(t, r, wrapped.OnEmitCalls[0].Record) + } + + // Assert that the record is not being affected by subsequent modifications. + r.AddAttributes(logapi.String("foo", "bar")) + assert.Equal(t, testAttrCount, wrapped.OnEmitCalls[0].Record.AttributesLen(), "should be isolated from subsequent modifications") +} + +func TestLogProcessorEnabled(t *testing.T) { + wrapped := &processor{} + + p := NewLogProcessor(wrapped) + + var r log.Record + for i := 0; i < testAttrCount; i++ { + r.AddAttributes(logapi.Int(strconv.Itoa(i), i)) + } + + assert.True(t, p.Enabled(testCtx, r)) + + // Assert passthrough of the arguments. + if assert.Len(t, wrapped.EnabledCalls, 1) { + assert.Equal(t, testCtx, wrapped.EnabledCalls[0].Ctx) + assert.Equal(t, r, wrapped.EnabledCalls[0].Record) + } + + // Assert that the record is not being affected by subsequent modifications. + r.AddAttributes(logapi.String("foo", "bar")) + assert.Equal(t, testAttrCount, wrapped.EnabledCalls[0].Record.AttributesLen(), "should be isolated from subsequent modifications") +} + +type args struct { + Ctx context.Context + Record log.Record +} + +type processor struct { + ReturnErr error + + OnEmitCalls []args + EnabledCalls []args + ForceFlushCalls []context.Context + ShutdownCalls []context.Context +} + +func (p *processor) OnEmit(ctx context.Context, r log.Record) error { + p.OnEmitCalls = append(p.OnEmitCalls, args{ctx, r}) + return p.ReturnErr +} + +func (p *processor) Enabled(ctx context.Context, r log.Record) bool { + p.EnabledCalls = append(p.EnabledCalls, args{ctx, r}) + return true +} + +func (p *processor) Shutdown(ctx context.Context) error { + p.ShutdownCalls = append(p.ShutdownCalls, ctx) + return p.ReturnErr +} + +func (p *processor) ForceFlush(ctx context.Context) error { + p.ForceFlushCalls = append(p.ForceFlushCalls, ctx) + return p.ReturnErr +} + +func BenchmarkLogProcessor(b *testing.B) { + var ok bool + var err error + + var r log.Record + r.SetBody(logapi.StringValue("message")) + + var rWithShared log.Record + for i := 0; i < testAttrCount; i++ { + rWithShared.AddAttributes(logapi.Int(strconv.Itoa(i), i)) + } + + testCases := []struct { + desc string + r log.Record + }{ + { + desc: "Record without shared data", + r: r, + }, + { + desc: "Record with shared data", + r: rWithShared, + }, + } + + p := NewLogProcessor(noopProcessor{}) + + for _, tc := range testCases { + b.Run(tc.desc, func(b *testing.B) { + b.ReportAllocs() + for n := 0; n < b.N; n++ { + ok = p.Enabled(testCtx, tc.r) + err = p.OnEmit(testCtx, tc.r) + } + }) + } + + _, _ = ok, err +} diff --git a/versions.yaml b/versions.yaml index bc852e73a15..82f7e170f41 100644 --- a/versions.yaml +++ b/versions.yaml @@ -82,6 +82,7 @@ module-sets: version: v0.1.0 modules: - go.opentelemetry.io/contrib/processors/baggagecopy + - go.opentelemetry.io/contrib/processors/isolate - go.opentelemetry.io/contrib/processors/minsev experimental-detectors: version: v0.0.1