Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

{filebeat/winlogbeat} Extract runner logic for eventlog #42736

Merged
merged 9 commits into from
Feb 20, 2025
Merged
Binary file removed filebeat/filebeat_windows_amd64.syso
Binary file not shown.
136 changes: 32 additions & 104 deletions filebeat/input/winlog/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,39 @@
package winlog

import (
"errors"
"fmt"
"io"
"time"

input "github.com/elastic/beats/v7/filebeat/input/v2"
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/go-concert/ctxtool"
"github.com/elastic/go-concert/timed"

"github.com/elastic/beats/v7/winlogbeat/checkpoint"
"github.com/elastic/beats/v7/winlogbeat/eventlog"
conf "github.com/elastic/elastic-agent-libs/config"
)

type eventlogRunner struct{}

const pluginName = "winlog"

type publisher struct {
cursorPub cursor.Publisher
}

func (pub *publisher) Publish(records []eventlog.Record) error {
for _, record := range records {
event := record.ToEvent()
if err := pub.cursorPub.Publish(event, record.Offset); err != nil {
// Publisher indicates disconnect when returning an error.
// stop trying to publish records and quit
return err
}
}
return nil
}

type winlogInput struct{}

// Plugin create a stateful input Plugin collecting logs from Windows Event Logs.
func Plugin(log *logp.Logger, store cursor.StateStore) input.Plugin {
return input.Plugin{
Expand All @@ -65,119 +77,35 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) {
}

sources := []cursor.Source{eventLog}
return sources, eventlogRunner{}, nil
return sources, winlogInput{}, nil
}

func (eventlogRunner) Name() string { return pluginName }
func (winlogInput) Name() string { return pluginName }

func (eventlogRunner) Test(source cursor.Source, ctx input.TestContext) error {
api := source.(eventlog.EventLog)
func (in winlogInput) Test(source cursor.Source, ctx input.TestContext) error {
api, _ := source.(eventlog.EventLog)
err := api.Open(checkpoint.EventLogState{})
if err != nil {
return fmt.Errorf("failed to open %q: %w", api.Channel(), err)
}
return api.Close()
}

func (eventlogRunner) Run(
func (in winlogInput) Run(
ctx input.Context,
source cursor.Source,
cursor cursor.Cursor,
publisher cursor.Publisher,
pub cursor.Publisher,
) error {
api := source.(eventlog.EventLog)
api, _ := source.(eventlog.EventLog)
log := ctx.Logger.With("eventlog", source.Name(), "channel", api.Channel())

// setup closing the API if either the run function is signaled asynchronously
// to shut down or when returning after io.EOF
cancelCtx, cancelFn := ctxtool.WithFunc(ctx.Cancelation, func() {
if err := api.Close(); err != nil {
log.Errorw("Error while closing Windows Event Log access", "error", err)
}
})
defer cancelFn()

// Flag used to detect repeat "channel not found" errors, eliminating log spam.
channelNotFoundErrDetected := false

runLoop:
for {
//nolint:nilerr // only log error if we are not shutting down
if cancelCtx.Err() != nil {
return nil
}

evtCheckpoint := initCheckpoint(log, cursor)
openErr := api.Open(evtCheckpoint)

switch {
case eventlog.IsRecoverable(openErr):
log.Errorw("Encountered recoverable error when opening Windows Event Log", "error", openErr)
_ = timed.Wait(cancelCtx, 5*time.Second)
continue
case !api.IsFile() && eventlog.IsChannelNotFound(openErr):
if !channelNotFoundErrDetected {
log.Errorw("Encountered channel not found error when opening Windows Event Log", "error", openErr)
} else {
log.Debugw("Encountered channel not found error when opening Windows Event Log", "error", openErr)
}
channelNotFoundErrDetected = true
_ = timed.Wait(cancelCtx, 5*time.Second)
continue
case openErr != nil:
return fmt.Errorf("failed to open Windows Event Log channel %q: %w", api.Channel(), openErr)
}
channelNotFoundErrDetected = false

log.Debug("Windows Event Log opened successfully")

// read loop
for cancelCtx.Err() == nil {
records, err := api.Read()
if eventlog.IsRecoverable(err) {
log.Errorw("Encountered recoverable error when reading from Windows Event Log", "error", err)
if resetErr := api.Reset(); resetErr != nil {
log.Errorw("Error resetting Windows Event Log handle", "error", resetErr)
}
continue runLoop
}
if !api.IsFile() && eventlog.IsChannelNotFound(err) {
log.Errorw("Encountered channel not found error when reading from Windows Event Log", "error", err)
if resetErr := api.Reset(); resetErr != nil {
log.Errorw("Error resetting Windows Event Log handle", "error", resetErr)
}
continue runLoop
}

if err != nil {
if errors.Is(err, io.EOF) {
log.Debugw("End of Winlog event stream reached", "error", err)
return nil
}

//nolint:nilerr // only log error if we are not shutting down
if cancelCtx.Err() != nil {
return nil
}

log.Errorw("Error occurred while reading from Windows Event Log", "error", err)
return err
}
if len(records) == 0 {
_ = timed.Wait(cancelCtx, time.Second)
continue
}

for _, record := range records {
event := record.ToEvent()
if err := publisher.Publish(event, record.Offset); err != nil {
// Publisher indicates disconnect when returning an error.
// stop trying to publish records and quit
return err
}
}
}
}
return eventlog.Run(
ctxtool.FromCanceller(ctx.Cancelation),
api,
initCheckpoint(log, cursor),
&publisher{cursorPub: pub},
log,
)
}

func initCheckpoint(log *logp.Logger, cursor cursor.Cursor) checkpoint.EventLogState {
Expand Down
117 changes: 24 additions & 93 deletions winlogbeat/beater/eventlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package beater

import (
"errors"
"io"
"time"
"context"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
Expand Down Expand Up @@ -54,6 +52,19 @@ type eventLoggerConfig struct {
KeepNull bool `config:"keep_null"`
}

type publisher struct {
client beat.Client
eventACKer *eventACKer
}

func (p *publisher) Publish(records []eventlog.Record) error {
p.eventACKer.Add(len(records))
for _, lr := range records {
p.client.Publish(lr.ToEvent())
}
return nil
}

func newEventLogger(
beatInfo beat.Info,
source eventlog.EventLog,
Expand Down Expand Up @@ -122,98 +133,18 @@ func (e *eventLogger) run(
client.Close()
}()

defer func() {
e.log.Info("Stop processing.")

if err := api.Close(); err != nil {
e.log.Warnw("Close() error.", "error", err)
return
}
ctx, cancelFn := context.WithCancel(context.Background())
go func() {
<-done
cancelFn()
}()

// Flag used to detect repeat "channel not found" errors, eliminating log spam.
channelNotFoundErrDetected := false

runLoop:
for stop := false; !stop; {
select {
case <-done:
return
default:
}

err = api.Open(state)

switch {
case eventlog.IsRecoverable(err):
e.log.Warnw("Open() encountered recoverable error. Trying again...", "error", err, "channel", api.Channel())
time.Sleep(time.Second * 5)
continue
case !api.IsFile() && eventlog.IsChannelNotFound(err):
if !channelNotFoundErrDetected {
e.log.Warnw("Open() encountered channel not found error. Trying again...", "error", err, "channel", api.Channel())
} else {
e.log.Debugw("Open() encountered channel not found error. Trying again...", "error", err, "channel", api.Channel())
}
channelNotFoundErrDetected = true
time.Sleep(time.Second * 5)
continue
case err != nil:
e.log.Warnw("Open() error. No events will be read from this source.", "error", err, "channel", api.Channel())
return
}
channelNotFoundErrDetected = false

e.log.Debug("Opened successfully.")

for !stop {
select {
case <-done:
return
default:
}

// Read from the event.
records, err := api.Read()
if eventlog.IsRecoverable(err) {
e.log.Warnw("Read() encountered recoverable error. Reopening handle...", "error", err, "channel", api.Channel())
if resetErr := api.Reset(); resetErr != nil {
e.log.Warnw("Reset() error.", "error", err)
}
continue runLoop
}
if !api.IsFile() && eventlog.IsChannelNotFound(err) {
e.log.Warnw("Read() encountered channel not found error for channel %q. Reopening handle...", "error", err, "channel", api.Channel())
if resetErr := api.Reset(); resetErr != nil {
e.log.Warnw("Reset() error.", "error", err)
}
continue runLoop
}

if err != nil {
if errors.Is(err, io.EOF) {
// Graceful stop.
stop = true
} else {
e.log.Warnw("Read() error.", "error", err, "channel", api.Channel())
return
}
}

e.log.Debugf("Read() returned %d records.", len(records))
if len(records) == 0 {
time.Sleep(time.Second)
if stop {
return
}
continue
}

eventACKer.Add(len(records))
for _, lr := range records {
client.Publish(lr.ToEvent())
}
}
publisher := &publisher{
client: client,
eventACKer: eventACKer,
}
if err := eventlog.Run(ctx, api, state, publisher, e.log); err != nil {
e.log.Error(err)
}
}

Expand Down
Loading
Loading