Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - running CI #42793

Draft
wants to merge 39 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
1bef168
wip
AndersonQ Jan 30, 2025
c1802e8
wip
AndersonQ Jan 31, 2025
5734db9
WIP - metrics added to input_metrics.json on filestream
AndersonQ Feb 3, 2025
df0bb94
wip - remove logs when publishing events
AndersonQ Feb 4, 2025
3520ff9
wip
AndersonQ Feb 4, 2025
f275100
wip
AndersonQ Feb 5, 2025
dbd124e
WIP - read and write to the beat instance scoped metric namespace
AndersonQ Feb 5, 2025
44d316f
wip
AndersonQ Feb 6, 2025
534fd00
clean up
AndersonQ Feb 6, 2025
632c1a9
fix tests
AndersonQ Feb 6, 2025
5544f4b
license headers
AndersonQ Feb 6, 2025
007b2d0
add tests
AndersonQ Feb 7, 2025
9266d0b
clean up
AndersonQ Feb 7, 2025
8c0d9da
using stream_id instead of input_id
AndersonQ Feb 7, 2025
bcb0925
add TODO
AndersonQ Feb 10, 2025
9752245
add bench
AndersonQ Feb 10, 2025
2a69020
.
AndersonQ Feb 10, 2025
c4afef9
passing down inputID
AndersonQ Feb 10, 2025
514287d
add integration test
AndersonQ Feb 11, 2025
75dcabe
add total events per input, clean up and refactor
AndersonQ Feb 11, 2025
e4ffbf1
add tests for the inputs/ monitoring endpoint
AndersonQ Feb 11, 2025
689d62b
add inputID when creating the pipeline client
AndersonQ Feb 11, 2025
a93b906
passing down the input context on awss3 input
AndersonQ Feb 11, 2025
4281c7e
wip
AndersonQ Feb 11, 2025
7f2c5b4
wip - where to add tests
AndersonQ Feb 11, 2025
6984ad3
wip
AndersonQ Feb 12, 2025
4bd2b6e
wip: cel: integration tests
AndersonQ Feb 13, 2025
56cbe18
add cel integration tests and move tests to x-pack
AndersonQ Feb 14, 2025
6a0d179
add httpjson test
AndersonQ Feb 14, 2025
f48ee59
clean up
AndersonQ Feb 17, 2025
8d09ef5
fix tests
AndersonQ Feb 17, 2025
b9991e2
add license header
AndersonQ Feb 17, 2025
c0348b4
add changelog
AndersonQ Feb 17, 2025
9fc4e2e
add docs for each input
AndersonQ Feb 17, 2025
999c1e2
making the linter happy
AndersonQ Feb 19, 2025
a504e9f
add eventID to publisher.Event
AndersonQ Feb 19, 2025
f1568b3
add new methods to collect metric by input
AndersonQ Feb 19, 2025
6ddf4bb
added metrics to ES output - so far so good
AndersonQ Feb 19, 2025
3330e9a
add ObserverInputAware interface
AndersonQ Feb 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Replace default Ubuntu-based images with UBI-minimal-based ones {pull}42150[42150]
- Fix templates and docs to use correct `--` version of command line arguments. {issue}42038[42038] {pull}42060[42060]
- removed support for a single `-` to precede multi-letter command line arguments. Use `--` instead. {issue}42117[42117] {pull}42209[42209]
- add per-input metrics to libbeat pipeline client {pull}42618[42618]

*Auditbeat*

Expand Down
4 changes: 2 additions & 2 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,15 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea
}

if b.API != nil {
if err = inputmon.AttachHandler(b.API.Router()); err != nil {
if err = inputmon.AttachHandler(b.Info, b.API.Router()); err != nil {
return nil, fmt.Errorf("failed attach inputs api to monitoring endpoint server: %w", err)
}
}

if b.Manager != nil {
b.Manager.RegisterDiagnosticHook("input_metrics", "Metrics from active inputs.",
"input_metrics.json", "application/json", func() []byte {
data, err := inputmon.MetricSnapshotJSON()
data, err := inputmon.MetricSnapshotJSON(b.Info)
if err != nil {
logp.L().Warnw("Failed to collect input metric snapshot for Agent diagnostics.", "error", err)
return []byte(err.Error())
Expand Down
4 changes: 4 additions & 0 deletions filebeat/docs/inputs/input-filestream.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ observe the activity of the input. Note that metrics from processors are not inc
| `messages_truncated_total` | Total number of messages truncated.
| `bytes_processed_total` | Total number of bytes processed.
| `events_processed_total` | Total number of events processed.
| `events_pipeline_total` | Total number of events processed by the publishing pipeline.
| `events_pipeline_failed_total` | Total number of events failed at the publishing pipeline.
| `events_pipeline_filtered_total` | Total number of events filtered at the publishing pipeline.
| `events_pipeline_published_total` | Total number of events published at the publishing pipeline.
| `processing_errors_total` | Total number of processing errors.
| `processing_time` | Histogram of the elapsed time to process messages (expressed in nanoseconds).
|=======
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ func startHarvester(
defer releaseResource(resource)

client, err := hg.pipeline.ConnectWith(beat.ClientConfig{
InputID: ctx.ID,
EventListener: newInputACKHandler(hg.ackCH),
})
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions filebeat/input/v2/input-cursor/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (inp *managedInput) runSource(
}()

client, err := pipeline.ConnectWith(beat.ClientConfig{
InputID: ctx.ID,
EventListener: newInputACKHandler(ctx.Logger),
})
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions filebeat/input/v2/input-stateless/stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (si configuredInput) Run(ctx v2.Context, pipeline beat.PipelineConnector) (
}()

client, err := pipeline.ConnectWith(beat.ClientConfig{
InputID: ctx.ID,
PublishMode: beat.DefaultGuarantees,
})
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ type Client interface {
// ClientConfig defines common configuration options one can pass to
// Pipeline.ConnectWith to control the clients behavior and provide ACK support.
type ClientConfig struct {
// InputID is the input ID of the input using the client. The InputID is
// used to aggregate pipeline metrics per input.
InputID string

PublishMode PublishMode

Processing ProcessingConfig
Expand Down
10 changes: 6 additions & 4 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,11 +588,13 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {

var publisher *pipeline.Pipeline
monitors := pipeline.Monitors{
Metrics: reg,
Telemetry: monitoring.GetNamespace("state").GetRegistry(),
Logger: logp.L().Named("publisher"),
Tracer: b.Instrumentation.Tracer(),
BeatRegistry: b.Info.Monitoring.Namespace.GetRegistry(),
Metrics: reg,
Telemetry: monitoring.GetNamespace("state").GetRegistry(),
Logger: logp.L().Named("publisher"),
Tracer: b.Instrumentation.Tracer(),
}
// here
outputFactory := b.makeOutputFactory(b.Config.Output)
settings := pipeline.Settings{
// Since now publisher is closed on Stop, we want to give some
Expand Down
57 changes: 46 additions & 11 deletions libbeat/monitoring/inputmon/httphandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/gorilla/handlers"
"github.com/gorilla/mux"

"github.com/elastic/beats/v7/libbeat/beat"
libbeatmonitoring "github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring"
)

Expand All @@ -36,19 +38,31 @@ const (
)

type handler struct {
registry *monitoring.Registry
registryDataset *monitoring.Registry
registryInternalInputs *monitoring.Registry
}

// AttachHandler attaches an HTTP handler to the given mux.Router to handle
// requests to /inputs.
func AttachHandler(r *mux.Router) error {
return attachHandler(r, globalRegistry())
func AttachHandler(beatInfo beat.Info, r *mux.Router) error {
intInputsReg := beatInfo.Monitoring.Namespace.GetRegistry().
GetRegistry(libbeatmonitoring.RegistryNameInternalInputs)
if intInputsReg == nil {
intInputsReg = beatInfo.Monitoring.Namespace.GetRegistry().
NewRegistry(libbeatmonitoring.RegistryNameInternalInputs)
}

return attachHandler(r, globalRegistry(), intInputsReg)
}

func attachHandler(r *mux.Router, registry *monitoring.Registry) error {
h := &handler{registry: registry}
func attachHandler(r *mux.Router, datasetReg, intInputsReg *monitoring.Registry) error {
r = r.PathPrefix(route).Subrouter()
return r.StrictSlash(true).Handle("/", validationHandler("GET", []string{"pretty", "type"}, h.allInputs)).GetError()

h := &handler{
registryDataset: datasetReg,
registryInternalInputs: intInputsReg,
}
return r.StrictSlash(true).Handle("/", validationHandler(http.MethodGet, []string{"pretty", "type"}, h.allInputs)).GetError()
}

func (h *handler) allInputs(w http.ResponseWriter, req *http.Request) {
Expand All @@ -64,14 +78,15 @@ func (h *handler) allInputs(w http.ResponseWriter, req *http.Request) {
return
}

filtered := filteredSnapshot(h.registry, requestedType)
filtered := filteredSnapshot(
h.registryDataset, h.registryInternalInputs, requestedType)

w.Header().Set(contentType, applicationJSON)
serveJSON(w, filtered, requestedPretty)
}

func filteredSnapshot(r *monitoring.Registry, requestedType string) []map[string]any {
metrics := monitoring.CollectStructSnapshot(r, monitoring.Full, false)
func filteredSnapshot(dataset, intInputs *monitoring.Registry, requestedType string) []map[string]any {
metrics := monitoring.CollectStructSnapshot(dataset, monitoring.Full, false)

filtered := make([]map[string]any, 0, len(metrics))
for _, ifc := range metrics {
Expand All @@ -81,19 +96,39 @@ func filteredSnapshot(r *monitoring.Registry, requestedType string) []map[string
}

// Require all entries to have an 'input' and 'id' to be accessed through this API.
if id, ok := m["id"].(string); !ok || id == "" {
id, ok := m["id"].(string)
if !ok || id == "" {
continue
}

if inputType, ok := m["input"].(string); !ok || (requestedType != "" && !strings.EqualFold(inputType, requestedType)) {
if inputType, ok := m["input"].(string); !ok || (requestedType != "" &&
!strings.EqualFold(inputType, requestedType)) {
continue
}

// merge metrics stored in the internal namespace if any is found
mergeInternalMetrics(intInputs, id, m)

filtered = append(filtered, m)
}
return filtered
}

// mergeInternalMetrics looks for a registry identified by id in the internal
// registry. If found, all the metrics are merged into m, if not, m is not
// changed.
func mergeInternalMetrics(internal *monitoring.Registry, id string, m map[string]any) {
reg := internal.GetRegistry(id)
if reg == nil {
return
}

intInput := monitoring.CollectStructSnapshot(reg, monitoring.Full, false)
for k, v := range intInput {
m[k] = v
}
}

func serveJSON(w http.ResponseWriter, value any, pretty bool) {
w.Header().Set(contentType, applicationJSON)
enc := json.NewEncoder(w)
Expand Down
9 changes: 7 additions & 2 deletions libbeat/monitoring/inputmon/httphandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/monitoring"
)

Expand All @@ -53,7 +54,9 @@ var testCases = []TestCase{
}

func TestHandler(t *testing.T) {
namespace := monitoring.GetNamespace("TestHandler")
parent := monitoring.NewRegistry()

reg, _ := NewInputRegistry("foo", "123abc", parent)
monitoring.NewInt(reg, "gauge").Set(13344)

Expand All @@ -68,7 +71,9 @@ func TestHandler(t *testing.T) {
s := httptest.NewServer(r)
defer s.Close()

if err := attachHandler(r, parent); err != nil {
beatInfo := beat.Info{}
beatInfo.Monitoring.Namespace = namespace
if err := attachHandler(r, parent, monitoring.NewRegistry()); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -116,7 +121,7 @@ func BenchmarkHandlers(b *testing.B) {
monitoring.NewInt(reg, "gauge").Set(int64(i))
}

h := &handler{registry: reg}
h := &handler{registryDataset: reg}

b.Run("allInputs", func(b *testing.B) {
req := httptest.NewRequest(http.MethodGet, "/inputs/", nil)
Expand Down
22 changes: 19 additions & 3 deletions libbeat/monitoring/inputmon/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

"github.com/gofrs/uuid/v5"

"github.com/elastic/beats/v7/libbeat/beat"
libbeatmonitoring "github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
)
Expand Down Expand Up @@ -81,7 +83,21 @@ func globalRegistry() *monitoring.Registry {
}

// MetricSnapshotJSON returns a snapshot of the input metric values from the
// global 'dataset' monitoring namespace encoded as a JSON array (pretty formatted).
func MetricSnapshotJSON() ([]byte, error) {
return json.MarshalIndent(filteredSnapshot(globalRegistry(), ""), "", " ")
// global 'dataset' and libbeatmonitoring.RegistryNameInternalInputs monitoring
// namespace from the beatInfo instance. It returns a pretty formated JSON array
// as a byte slice.
func MetricSnapshotJSON(beatInfo beat.Info) ([]byte, error) {
intReg := beatInfo.Monitoring.Namespace.GetRegistry().
GetRegistry(libbeatmonitoring.RegistryNameInternalInputs)
if intReg == nil {
intReg = beatInfo.Monitoring.Namespace.GetRegistry().
NewRegistry(libbeatmonitoring.RegistryNameInternalInputs)
}
return json.MarshalIndent(
filteredSnapshot(
globalRegistry(),
intReg,
""),
"",
" ")
}
44 changes: 37 additions & 7 deletions libbeat/monitoring/inputmon/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
libbeatmonitoring "github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring"
)

Expand Down Expand Up @@ -87,17 +89,45 @@ func TestMetricSnapshotJSON(t *testing.T) {
require.NoError(t, globalRegistry().Clear())
})

r, cancel := NewInputRegistry("test", "my-id", nil)
defer cancel()
monitoring.NewInt(r, "foo_total").Set(100)

jsonBytes, err := MetricSnapshotJSON()
inputID := "input-with-pipeline-metrics"
r1, cancel1 := NewInputRegistry("test", inputID, nil)
defer cancel1()
monitoring.NewInt(r1, "foo1_total").Set(100)

r2, cancel2 := NewInputRegistry(
"test", "input-without-pipeline-metrics", nil)
defer cancel2()
monitoring.NewInt(r2, "foo2_total").Set(100)

// this metric should not be reported
r3 := globalRegistry().NewRegistry("another-registry")
monitoring.NewInt(r3, "foo3_total").Set(100)

// this metric should not be reported
r4 := globalRegistry().NewRegistry("yet-another-registry")
monitoring.NewString(r4, "id").Set("some-id")
monitoring.NewInt(r3, "foo3_total").Set(100)

bInfo := beat.Info{}
bInfo.Monitoring.Namespace = monitoring.GetNamespace("TestMetricSnapshotJSON")
intInputReg := bInfo.Monitoring.Namespace.GetRegistry().
NewRegistry(libbeatmonitoring.RegistryNameInternalInputs).
NewRegistry(inputID)
monitoring.NewInt(intInputReg, "events_pipeline_total").Set(100)

jsonBytes, err := MetricSnapshotJSON(bInfo)
require.NoError(t, err)

const expected = `[
{
"foo_total": 100,
"id": "my-id",
"events_pipeline_total": 100,
"foo1_total": 100,
"id": "input-with-pipeline-metrics",
"input": "test"
},
{
"foo2_total": 100,
"id": "input-without-pipeline-metrics",
"input": "test"
}
]`
Expand Down
22 changes: 22 additions & 0 deletions libbeat/monitoring/monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package monitoring

const (
RegistryNameInternalInputs = "internal.inputs"
)
Loading
Loading