diff --git a/.chloggen/fix-flush-short.yaml b/.chloggen/fix-flush-short.yaml new file mode 100644 index 000000000000..7e9df8662ee6 --- /dev/null +++ b/.chloggen/fix-flush-short.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: filelogreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix issue where flushed tokens could be truncated. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35042] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/pkg/stanza/fileconsumer/internal/reader/factory.go b/pkg/stanza/fileconsumer/internal/reader/factory.go index f314e4aacae8..ef35230bce36 100644 --- a/pkg/stanza/fileconsumer/internal/reader/factory.go +++ b/pkg/stanza/fileconsumer/internal/reader/factory.go @@ -20,6 +20,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenlen" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) @@ -56,7 +57,11 @@ func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader if err != nil { return nil, err } - m := &Metadata{Fingerprint: fp, FileAttributes: attributes} + m := &Metadata{ + Fingerprint: fp, + FileAttributes: attributes, + TokenLenState: &tokenlen.State{}, + } if f.FlushTimeout > 0 { m.FlushState = &flush.State{LastDataChange: time.Now()} } @@ -64,6 +69,11 @@ func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader } func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, err error) { + // Ensure TokenLenState is initialized + if m.TokenLenState == nil { + m.TokenLenState = &tokenlen.State{} + } + r = &Reader{ Metadata: m, set: f.TelemetrySettings, @@ -77,6 +87,7 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, includeFileRecordNum: f.IncludeFileRecordNumber, compression: f.Compression, acquireFSLock: f.AcquireFSLock, + emitFunc: f.EmitFunc, } r.set.Logger = r.set.Logger.With(zap.String("path", r.fileName)) @@ -100,9 +111,10 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, r.Offset = info.Size() } - flushFunc := m.FlushState.Func(f.SplitFunc, f.FlushTimeout) + tokenLenFunc := m.TokenLenState.Func(f.SplitFunc) + flushFunc := m.FlushState.Func(tokenLenFunc, f.FlushTimeout) r.contentSplitFunc = trim.WithFunc(trim.ToLength(flushFunc, f.MaxLogSize), f.TrimFunc) - r.emitFunc = f.EmitFunc + if f.HeaderConfig != nil && !m.HeaderFinalized { r.headerSplitFunc = f.HeaderConfig.SplitFunc r.headerReader, err = header.NewReader(f.TelemetrySettings, *f.HeaderConfig) diff --git a/pkg/stanza/fileconsumer/internal/reader/reader.go b/pkg/stanza/fileconsumer/internal/reader/reader.go index 3a591574fbc6..838add80cfb0 100644 --- a/pkg/stanza/fileconsumer/internal/reader/reader.go +++ b/pkg/stanza/fileconsumer/internal/reader/reader.go @@ -21,6 +21,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/scanner" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenlen" ) type Metadata struct { @@ -30,6 +31,7 @@ type Metadata struct { FileAttributes map[string]any HeaderFinalized bool FlushState *flush.State + TokenLenState *tokenlen.State } // Reader manages a single file @@ -177,7 +179,14 @@ func (r *Reader) readHeader(ctx context.Context) (doneReadingFile bool) { func (r *Reader) readContents(ctx context.Context) { // Create the scanner to read the contents of the file. - s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.contentSplitFunc) + bufferSize := r.initialBufferSize + if r.TokenLenState.MinimumLength > bufferSize { + // If we previously saw a potential token larger than the default buffer, + // size the buffer to be at least one byte larger so we can see if there's more data + bufferSize = r.TokenLenState.MinimumLength + 1 + } + + s := scanner.New(r, r.maxLogSize, bufferSize, r.Offset, r.contentSplitFunc) // Iterate over the contents of the file. for { diff --git a/pkg/stanza/fileconsumer/internal/reader/reader_test.go b/pkg/stanza/fileconsumer/internal/reader/reader_test.go index d7d4871f43e7..e7ff78f8a4ff 100644 --- a/pkg/stanza/fileconsumer/internal/reader/reader_test.go +++ b/pkg/stanza/fileconsumer/internal/reader/reader_test.go @@ -189,6 +189,7 @@ func TestFingerprintChangeSize(t *testing.T) { func TestFlushPeriodEOF(t *testing.T) { tempDir := t.TempDir() temp := filetest.OpenTemp(t, tempDir) + // Create a long enough initial token, so the scanner can't read the whole file at once aContentLength := 2 * 16 * 1024 content := []byte(strings.Repeat("a", aContentLength)) @@ -223,3 +224,101 @@ func TestFlushPeriodEOF(t *testing.T) { r.ReadToEnd(context.Background()) sink.ExpectToken(t, []byte{'b'}) } + +func TestUntermintedLongLogEntry(t *testing.T) { + tempDir := t.TempDir() + temp := filetest.OpenTemp(t, tempDir) + + // Create a log entry longer than DefaultBufferSize (16KB) but shorter than maxLogSize + content := filetest.TokenWithLength(20 * 1024) // 20KB + _, err := temp.WriteString(string(content)) // no newline + require.NoError(t, err) + + // Use a controlled clock. It advances by 1ns each time Now() is called, which may happen + // a few times during a call to ReadToEnd. + clock := internaltime.NewAlwaysIncreasingClock() + internaltime.Now = clock.Now + internaltime.Since = clock.Since + defer func() { + internaltime.Now = time.Now + internaltime.Since = time.Since + }() + + // Use a long flush period to ensure it does not expire DURING a ReadToEnd + flushPeriod := time.Second + + f, sink := testFactory(t, withFlushPeriod(flushPeriod)) + fp, err := f.NewFingerprint(temp) + require.NoError(t, err) + r, err := f.NewReader(temp, fp) + require.NoError(t, err) + assert.Equal(t, int64(0), r.Offset) + + // First ReadToEnd should not emit anything as flush period hasn't expired + r.ReadToEnd(context.Background()) + sink.ExpectNoCalls(t) + + // Advance time past the flush period to test behavior after timer is expired + clock.Advance(2 * flushPeriod) + + // Second ReadToEnd should emit the full untruncated token + r.ReadToEnd(context.Background()) + sink.ExpectToken(t, content) + + sink.ExpectNoCalls(t) +} + +func TestUntermintedLogEntryGrows(t *testing.T) { + tempDir := t.TempDir() + temp := filetest.OpenTemp(t, tempDir) + + // Create a log entry longer than DefaultBufferSize (16KB) but shorter than maxLogSize + content := filetest.TokenWithLength(20 * 1024) // 20KB + _, err := temp.WriteString(string(content)) // no newline + require.NoError(t, err) + + // Use a controlled clock. It advances by 1ns each time Now() is called, which may happen + // a few times during a call to ReadToEnd. + clock := internaltime.NewAlwaysIncreasingClock() + internaltime.Now = clock.Now + internaltime.Since = clock.Since + defer func() { + internaltime.Now = time.Now + internaltime.Since = time.Since + }() + + // Use a long flush period to ensure it does not expire DURING a ReadToEnd + flushPeriod := time.Second + + f, sink := testFactory(t, withFlushPeriod(flushPeriod)) + fp, err := f.NewFingerprint(temp) + require.NoError(t, err) + r, err := f.NewReader(temp, fp) + require.NoError(t, err) + assert.Equal(t, int64(0), r.Offset) + + // First ReadToEnd should not emit anything as flush period hasn't expired + r.ReadToEnd(context.Background()) + sink.ExpectNoCalls(t) + + // Advance time past the flush period to test behavior after timer is expired + clock.Advance(2 * flushPeriod) + + // Write additional unterminated content to ensure all is picked up in the same token + // The flusher should notice new data and not return anything on the next call + additionalContext := filetest.TokenWithLength(1024) + _, err = temp.WriteString(string(additionalContext)) // no newline + require.NoError(t, err) + + r.ReadToEnd(context.Background()) + sink.ExpectNoCalls(t) + + // Advance time past the flush period to test behavior after timer is expired + clock.Advance(2 * flushPeriod) + + // Finally, since we haven't seen new data, flusher should emit the token + r.ReadToEnd(context.Background()) + sink.ExpectToken(t, append(content, additionalContext...)) + + sink.ExpectNoCalls(t) +} diff --git a/pkg/stanza/internal/time/time.go b/pkg/stanza/internal/time/time.go index 34269bbf3fbf..6d57c781aaf3 100644 --- a/pkg/stanza/internal/time/time.go +++ b/pkg/stanza/internal/time/time.go @@ -35,3 +35,7 @@ func (c AlwaysIncreasingClock) Since(t time.Time) time.Duration { c.FakeClock.Advance(time.Nanosecond) return c.FakeClock.Since(t) } + +func (c AlwaysIncreasingClock) Advance(d time.Duration) { + c.FakeClock.Advance(d) +} diff --git a/pkg/stanza/tokenlen/tokenlen.go b/pkg/stanza/tokenlen/tokenlen.go new file mode 100644 index 000000000000..326e64c3a950 --- /dev/null +++ b/pkg/stanza/tokenlen/tokenlen.go @@ -0,0 +1,36 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tokenlen // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenlen" + +import "bufio" + +// State tracks the potential length of a token before any terminator checking +type State struct { + MinimumLength int +} + +// Func wraps a bufio.SplitFunc to track potential token lengths +// Records the length of the data before delegating to the wrapped function +func (s *State) Func(splitFunc bufio.SplitFunc) bufio.SplitFunc { + if s == nil { + return splitFunc + } + + return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + // Note the potential token length but don't update state until we know + // whether or not a token is actually returned + potentialLen := len(data) + + advance, token, err = splitFunc(data, atEOF) + if advance == 0 && token == nil && err == nil { + // The splitFunc is asking for more data. Remember how much + // we saw previously so the buffer can be sized appropriately. + s.MinimumLength = potentialLen + } else { + // A token was returned. This state represented that token, so clear it. + s.MinimumLength = 0 + } + return advance, token, err + } +} diff --git a/pkg/stanza/tokenlen/tokenlen_test.go b/pkg/stanza/tokenlen/tokenlen_test.go new file mode 100644 index 000000000000..c91cd628d305 --- /dev/null +++ b/pkg/stanza/tokenlen/tokenlen_test.go @@ -0,0 +1,91 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tokenlen + +import ( + "bufio" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestTokenLenState_Func(t *testing.T) { + cases := []struct { + name string + input []byte + atEOF bool + expectedLen int + expectedToken []byte + expectedAdv int + expectedErr error + }{ + { + name: "no token yet", + input: []byte("partial"), + atEOF: false, + expectedLen: len("partial"), + }, + { + name: "complete token", + input: []byte("complete\ntoken"), + atEOF: false, + expectedLen: 0, // should clear state after finding token + expectedToken: []byte("complete"), + expectedAdv: len("complete\n"), + }, + { + name: "growing token", + input: []byte("growing"), + atEOF: false, + expectedLen: len("growing"), + }, + { + name: "flush at EOF", + input: []byte("flush"), + atEOF: true, + expectedLen: 0, // should clear state after flushing + expectedToken: []byte("flush"), + expectedAdv: len("flush"), + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + state := &State{} + splitFunc := state.Func(bufio.ScanLines) + + adv, token, err := splitFunc(tc.input, tc.atEOF) + require.Equal(t, tc.expectedErr, err) + require.Equal(t, tc.expectedToken, token) + require.Equal(t, tc.expectedAdv, adv) + require.Equal(t, tc.expectedLen, state.MinimumLength) + }) + } +} + +func TestTokenLenState_GrowingToken(t *testing.T) { + state := &State{} + splitFunc := state.Func(bufio.ScanLines) + + // First call with partial token + adv, token, err := splitFunc([]byte("part"), false) + require.NoError(t, err) + require.Nil(t, token) + require.Equal(t, 0, adv) + require.Equal(t, len("part"), state.MinimumLength) + + // Second call with longer partial token + adv, token, err = splitFunc([]byte("partial"), false) + require.NoError(t, err) + require.Nil(t, token) + require.Equal(t, 0, adv) + require.Equal(t, len("partial"), state.MinimumLength) + + // Final call with complete token + adv, token, err = splitFunc([]byte("partial\ntoken"), false) + require.NoError(t, err) + require.Equal(t, []byte("partial"), token) + require.Equal(t, len("partial\n"), adv) + require.Equal(t, 0, state.MinimumLength) // State should be cleared after emitting token +}