Skip to content

Commit

Permalink
add ObserverInputAware interface
Browse files Browse the repository at this point in the history
  • Loading branch information
AndersonQ committed Feb 19, 2025
1 parent c119524 commit ff362e9
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 4 deletions.
4 changes: 2 additions & 2 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Client struct {
indexSelector outputs.IndexSelector
pipelineSelector *outil.Selector

observer outputs.Observer
observer outputs.ObserverInputAware

// If deadLetterIndex is set, events with bulk-ingest errors will be
// forwarded to this index. Otherwise, they will be dropped.
Expand Down Expand Up @@ -549,7 +549,7 @@ func (client *Client) Test(d testing.Driver) {
client.conn.Test(d)
}

func (stats bulkResultStats) reportToObserver(ob outputs.Observer) {
func (stats bulkResultStats) reportToObserver(ob outputs.ObserverInputAware) {
ob.AckedEventsE(stats.acked)
ob.PermanentErrorsE(stats.nonIndexable)
ob.DuplicateEventsE(stats.duplicates)
Expand Down
5 changes: 3 additions & 2 deletions libbeat/outputs/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ func (bm *batchMock) RetryEvents(events []publisher.Event) {
}

func TestPublish(t *testing.T) {

// TODO: extend to include per inout metrics
// include always events with and without inputID
makePublishTestClient := func(t *testing.T, url string) (*Client, *monitoring.Registry) {
reg := monitoring.NewRegistry()
internal := monitoring.NewRegistry()
internal := monitoring.GetNamespace("TestPublish").GetRegistry()
client, err := NewClient(
clientSettings{
observer: outputs.NewStats(reg, internal),
Expand Down
25 changes: 25 additions & 0 deletions libbeat/outputs/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,31 @@ type Observer interface {
ReportLatency(time.Duration) // report the duration a send to the output takes
}

// Observer provides an interface used by outputs to report common events on
// documents/events being published and I/O workload.
// TODO: fix docs
type ObserverInputAware interface {
NewBatchE([]publisher.Event)

RetryableErrors(int) // report number of events with retryable errors
PermanentError(e publisher.Event)
PermanentErrorsE([]publisher.Event)
DuplicateEventsE([]publisher.Event)
DeadLetterEventsE([]publisher.Event)
AckedEvent(e publisher.Event)
AckedEventsE([]publisher.Event)
ErrTooMany(int) // report too many requests response

BatchSplit() // report a batch was split for being too large to ingest

WriteError(error) // report an I/O error on write
WriteBytes(int) // report number of bytes being written
ReadError(error) // report an I/O error on read
ReadBytes(int) // report number of bytes being read

ReportLatency(time.Duration) // report the duration a send to the output takes
}

type emptyObserver struct{}

var nilObserver = (*emptyObserver)(nil)
Expand Down

0 comments on commit ff362e9

Please sign in to comment.