Skip to content

Commit

Permalink
{filebeat/winlogbeat} Extract runner logic for eventlog (#42736)
Browse files Browse the repository at this point in the history
* Extract runner logic for eventlog

* do not use unnecessary runner struct

* fix lint

* Recover file

(cherry picked from commit a28ac4c)
  • Loading branch information
marc-gr authored and mergify[bot] committed Feb 20, 2025
1 parent 29a97e7 commit f2c1f78
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 197 deletions.
136 changes: 32 additions & 104 deletions filebeat/input/winlog/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,39 @@
package winlog

import (
"errors"
"fmt"
"io"
"time"

input "github.com/elastic/beats/v7/filebeat/input/v2"
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/go-concert/ctxtool"
"github.com/elastic/go-concert/timed"

"github.com/elastic/beats/v7/winlogbeat/checkpoint"
"github.com/elastic/beats/v7/winlogbeat/eventlog"
conf "github.com/elastic/elastic-agent-libs/config"
)

type eventlogRunner struct{}

const pluginName = "winlog"

type publisher struct {
cursorPub cursor.Publisher
}

func (pub *publisher) Publish(records []eventlog.Record) error {
for _, record := range records {
event := record.ToEvent()
if err := pub.cursorPub.Publish(event, record.Offset); err != nil {
// Publisher indicates disconnect when returning an error.
// stop trying to publish records and quit
return err
}
}
return nil
}

type winlogInput struct{}

// Plugin create a stateful input Plugin collecting logs from Windows Event Logs.
func Plugin(log *logp.Logger, store cursor.StateStore) input.Plugin {
return input.Plugin{
Expand All @@ -65,119 +77,35 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) {
}

sources := []cursor.Source{eventLog}
return sources, eventlogRunner{}, nil
return sources, winlogInput{}, nil
}

func (eventlogRunner) Name() string { return pluginName }
func (winlogInput) Name() string { return pluginName }

func (eventlogRunner) Test(source cursor.Source, ctx input.TestContext) error {
api := source.(eventlog.EventLog)
func (in winlogInput) Test(source cursor.Source, ctx input.TestContext) error {
api, _ := source.(eventlog.EventLog)
err := api.Open(checkpoint.EventLogState{})
if err != nil {
return fmt.Errorf("failed to open %q: %w", api.Channel(), err)
}
return api.Close()
}

func (eventlogRunner) Run(
func (in winlogInput) Run(
ctx input.Context,
source cursor.Source,
cursor cursor.Cursor,
publisher cursor.Publisher,
pub cursor.Publisher,
) error {
api := source.(eventlog.EventLog)
api, _ := source.(eventlog.EventLog)
log := ctx.Logger.With("eventlog", source.Name(), "channel", api.Channel())

// setup closing the API if either the run function is signaled asynchronously
// to shut down or when returning after io.EOF
cancelCtx, cancelFn := ctxtool.WithFunc(ctx.Cancelation, func() {
if err := api.Close(); err != nil {
log.Errorw("Error while closing Windows Event Log access", "error", err)
}
})
defer cancelFn()

// Flag used to detect repeat "channel not found" errors, eliminating log spam.
channelNotFoundErrDetected := false

runLoop:
for {
//nolint:nilerr // only log error if we are not shutting down
if cancelCtx.Err() != nil {
return nil
}

evtCheckpoint := initCheckpoint(log, cursor)
openErr := api.Open(evtCheckpoint)

switch {
case eventlog.IsRecoverable(openErr):
log.Errorw("Encountered recoverable error when opening Windows Event Log", "error", openErr)
_ = timed.Wait(cancelCtx, 5*time.Second)
continue
case !api.IsFile() && eventlog.IsChannelNotFound(openErr):
if !channelNotFoundErrDetected {
log.Errorw("Encountered channel not found error when opening Windows Event Log", "error", openErr)
} else {
log.Debugw("Encountered channel not found error when opening Windows Event Log", "error", openErr)
}
channelNotFoundErrDetected = true
_ = timed.Wait(cancelCtx, 5*time.Second)
continue
case openErr != nil:
return fmt.Errorf("failed to open Windows Event Log channel %q: %w", api.Channel(), openErr)
}
channelNotFoundErrDetected = false

log.Debug("Windows Event Log opened successfully")

// read loop
for cancelCtx.Err() == nil {
records, err := api.Read()
if eventlog.IsRecoverable(err) {
log.Errorw("Encountered recoverable error when reading from Windows Event Log", "error", err)
if resetErr := api.Reset(); resetErr != nil {
log.Errorw("Error resetting Windows Event Log handle", "error", resetErr)
}
continue runLoop
}
if !api.IsFile() && eventlog.IsChannelNotFound(err) {
log.Errorw("Encountered channel not found error when reading from Windows Event Log", "error", err)
if resetErr := api.Reset(); resetErr != nil {
log.Errorw("Error resetting Windows Event Log handle", "error", resetErr)
}
continue runLoop
}

if err != nil {
if errors.Is(err, io.EOF) {
log.Debugw("End of Winlog event stream reached", "error", err)
return nil
}

//nolint:nilerr // only log error if we are not shutting down
if cancelCtx.Err() != nil {
return nil
}

log.Errorw("Error occurred while reading from Windows Event Log", "error", err)
return err
}
if len(records) == 0 {
_ = timed.Wait(cancelCtx, time.Second)
continue
}

for _, record := range records {
event := record.ToEvent()
if err := publisher.Publish(event, record.Offset); err != nil {
// Publisher indicates disconnect when returning an error.
// stop trying to publish records and quit
return err
}
}
}
}
return eventlog.Run(
ctxtool.FromCanceller(ctx.Cancelation),
api,
initCheckpoint(log, cursor),
&publisher{cursorPub: pub},
log,
)
}

func initCheckpoint(log *logp.Logger, cursor cursor.Cursor) checkpoint.EventLogState {
Expand Down
117 changes: 24 additions & 93 deletions winlogbeat/beater/eventlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package beater

import (
"errors"
"io"
"time"
"context"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
Expand Down Expand Up @@ -54,6 +52,19 @@ type eventLoggerConfig struct {
KeepNull bool `config:"keep_null"`
}

type publisher struct {
client beat.Client
eventACKer *eventACKer
}

func (p *publisher) Publish(records []eventlog.Record) error {
p.eventACKer.Add(len(records))
for _, lr := range records {
p.client.Publish(lr.ToEvent())
}
return nil
}

func newEventLogger(
beatInfo beat.Info,
source eventlog.EventLog,
Expand Down Expand Up @@ -122,98 +133,18 @@ func (e *eventLogger) run(
client.Close()
}()

defer func() {
e.log.Info("Stop processing.")

if err := api.Close(); err != nil {
e.log.Warnw("Close() error.", "error", err)
return
}
ctx, cancelFn := context.WithCancel(context.Background())
go func() {
<-done
cancelFn()
}()

// Flag used to detect repeat "channel not found" errors, eliminating log spam.
channelNotFoundErrDetected := false

runLoop:
for stop := false; !stop; {
select {
case <-done:
return
default:
}

err = api.Open(state)

switch {
case eventlog.IsRecoverable(err):
e.log.Warnw("Open() encountered recoverable error. Trying again...", "error", err, "channel", api.Channel())
time.Sleep(time.Second * 5)
continue
case !api.IsFile() && eventlog.IsChannelNotFound(err):
if !channelNotFoundErrDetected {
e.log.Warnw("Open() encountered channel not found error. Trying again...", "error", err, "channel", api.Channel())
} else {
e.log.Debugw("Open() encountered channel not found error. Trying again...", "error", err, "channel", api.Channel())
}
channelNotFoundErrDetected = true
time.Sleep(time.Second * 5)
continue
case err != nil:
e.log.Warnw("Open() error. No events will be read from this source.", "error", err, "channel", api.Channel())
return
}
channelNotFoundErrDetected = false

e.log.Debug("Opened successfully.")

for !stop {
select {
case <-done:
return
default:
}

// Read from the event.
records, err := api.Read()
if eventlog.IsRecoverable(err) {
e.log.Warnw("Read() encountered recoverable error. Reopening handle...", "error", err, "channel", api.Channel())
if resetErr := api.Reset(); resetErr != nil {
e.log.Warnw("Reset() error.", "error", err)
}
continue runLoop
}
if !api.IsFile() && eventlog.IsChannelNotFound(err) {
e.log.Warnw("Read() encountered channel not found error for channel %q. Reopening handle...", "error", err, "channel", api.Channel())
if resetErr := api.Reset(); resetErr != nil {
e.log.Warnw("Reset() error.", "error", err)
}
continue runLoop
}

if err != nil {
if errors.Is(err, io.EOF) {
// Graceful stop.
stop = true
} else {
e.log.Warnw("Read() error.", "error", err, "channel", api.Channel())
return
}
}

e.log.Debugf("Read() returned %d records.", len(records))
if len(records) == 0 {
time.Sleep(time.Second)
if stop {
return
}
continue
}

eventACKer.Add(len(records))
for _, lr := range records {
client.Publish(lr.ToEvent())
}
}
publisher := &publisher{
client: client,
eventACKer: eventACKer,
}
if err := eventlog.Run(ctx, api, state, publisher, e.log); err != nil {
e.log.Error(err)
}
}

Expand Down
Loading

0 comments on commit f2c1f78

Please sign in to comment.