From c6ffaa2e1ce04f7f8e1742ba59cf9f7e8a4ec6b9 Mon Sep 17 00:00:00 2001 From: Mathew Merrick Date: Thu, 23 Jan 2025 18:04:16 +0000 Subject: [PATCH 1/3] pktmon testing --- pkg/plugin/pktmon/pktmon_windows.go | 9 +++++++-- pkg/plugin/pktmon/pktmon_windows_test.go | 22 ++++++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) create mode 100644 pkg/plugin/pktmon/pktmon_windows_test.go diff --git a/pkg/plugin/pktmon/pktmon_windows.go b/pkg/plugin/pktmon/pktmon_windows.go index 4b0759f292..55d20c5ae6 100644 --- a/pkg/plugin/pktmon/pktmon_windows.go +++ b/pkg/plugin/pktmon/pktmon_windows.go @@ -21,6 +21,7 @@ import ( "go.uber.org/zap/zapio" "golang.org/x/sync/errgroup" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" ) @@ -161,11 +162,15 @@ func (p *Plugin) Start(ctx context.Context) error { g.Go(func() error { for { err := p.GetFlow(ctx) - if _, ok := status.FromError(err); ok { + if stat, ok := status.FromError(err); ok { + if stat.Code() == codes.Internal { + p.l.Error("failed to get flow, retriable:", zap.Error(err)) + } p.l.Error("failed to get flow, retriable:", zap.Error(err)) continue + } else { + return errors.Wrapf(err, "failed to get flow") } - return errors.Wrapf(err, "failed to get flow, unrecoverable") } }) diff --git a/pkg/plugin/pktmon/pktmon_windows_test.go b/pkg/plugin/pktmon/pktmon_windows_test.go new file mode 100644 index 0000000000..753cd4439c --- /dev/null +++ b/pkg/plugin/pktmon/pktmon_windows_test.go @@ -0,0 +1,22 @@ +package pktmon + +import ( + "context" + "testing" + + "github.com/microsoft/retina/pkg/config" +) + +func TestStart(t *testing.T) { + // TestStart tests the Start function. + t.Run("TestStart", func(t *testing.T) { + // Create a new Plugin. + p := New(&config.Config{}) + // Start the Plugin. + err := p.Start(context.Background()) + // Check if the error is nil. + if err != nil { + t.Errorf("got %v, want nil", err) + } + }) +} From 735a74e2d35642a712275ba56adadaa33274dd9c Mon Sep 17 00:00:00 2001 From: Mathew Merrick Date: Thu, 23 Jan 2025 13:28:02 -0800 Subject: [PATCH 2/3] skip known error messages in pktmon grpc --- pkg/plugin/pktmon/pktmon_grpc_windows.go | 163 ++++++++++++++++++++++ pkg/plugin/pktmon/pktmon_windows.go | 164 +++++------------------ pkg/plugin/pktmon/pktmon_windows_test.go | 70 ++++++++-- 3 files changed, 255 insertions(+), 142 deletions(-) create mode 100644 pkg/plugin/pktmon/pktmon_grpc_windows.go diff --git a/pkg/plugin/pktmon/pktmon_grpc_windows.go b/pkg/plugin/pktmon/pktmon_grpc_windows.go new file mode 100644 index 0000000000..06300a27e3 --- /dev/null +++ b/pkg/plugin/pktmon/pktmon_grpc_windows.go @@ -0,0 +1,163 @@ +package pktmon + +import ( + "context" + "encoding/json" + "fmt" + "os" + "os/exec" + + "github.com/pkg/errors" + + observerv1 "github.com/cilium/cilium/api/v1/observer" + "github.com/microsoft/retina/pkg/log" + "github.com/microsoft/retina/pkg/utils" + "go.uber.org/zap" + "go.uber.org/zap/zapio" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +var ( + ErrNilStream = errors.New("stream is nil") +) + +type GRPCManager interface { + SetupStream() error + StartStream(ctx context.Context) error + ReceiveFromStream() (*observerv1.GetFlowsResponse, error) + + RunPktMonServer(ctx context.Context) error + Stop() error +} + +type PktmonGRPCManager struct { + grpcClient *GRPCClient + stream observerv1.Observer_GetFlowsClient + l *log.ZapLogger + + pktmonCmd *exec.Cmd + stdWriter *zapio.Writer + errWriter *zapio.Writer +} + +func (p *PktmonGRPCManager) SetupStream() error { + var err error + fn := func() error { + p.l.Info("creating pktmon client") + p.grpcClient, err = newGRPCClient() + if err != nil { + return errors.Wrapf(err, "failed to create pktmon client before getting flows") + } + + return nil + } + err = utils.Retry(fn, connectionRetryAttempts) + if err != nil { + return errors.Wrapf(err, "failed to create pktmon client") + } + + return nil +} + +func (p *PktmonGRPCManager) StartStream(ctx context.Context) error { + if p.grpcClient == nil { + return errors.Wrapf(ErrNilGrpcClient, "unable to start stream") + } + + var err error + fn := func() error { + p.stream, err = p.grpcClient.GetFlows(ctx, &observerv1.GetFlowsRequest{}) + if err != nil { + return errors.Wrapf(err, "failed to open pktmon stream") + } + return nil + } + err = utils.Retry(fn, connectionRetryAttempts) + if err != nil { + return errors.Wrapf(err, "failed to create pktmon client") + } + + return nil +} + +func (p *PktmonGRPCManager) ReceiveFromStream() (*observerv1.GetFlowsResponse, error) { + if p.stream == nil { + return nil, errors.Wrapf(ErrNilStream, "unable to receive from stream") + } + + return p.stream.Recv() +} + +func newGRPCClient() (*GRPCClient, error) { + retryPolicy := map[string]any{ + "methodConfig": []map[string]any{ + { + "waitForReady": true, + "retryPolicy": map[string]any{ + "MaxAttempts": connectionRetryAttempts, + "InitialBackoff": ".01s", + "MaxBackoff": ".01s", + "BackoffMultiplier": 1.0, + "RetryableStatusCodes": []string{"UNAVAILABLE"}, + }, + }, + }, + } + + bytes, err := json.Marshal(retryPolicy) + if err != nil { + return nil, errors.Wrapf(err, "failed to marshal retry policy") + } + + retryPolicyStr := string(bytes) + + conn, err := grpc.Dial(fmt.Sprintf("%s:%s", "unix", socket), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicyStr)) + if err != nil { + return nil, errors.Wrapf(err, "failed to dial pktmon server:") + } + + return &GRPCClient{observerv1.NewObserverClient(conn)}, nil +} + +func (p *PktmonGRPCManager) RunPktMonServer(ctx context.Context) error { + p.stdWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.InfoLevel} + defer p.stdWriter.Close() + p.errWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.ErrorLevel} + defer p.errWriter.Close() + + pwd, err := os.Getwd() + if err != nil { + return errors.Wrapf(err, "failed to get current working directory for pktmon") + } + + cmd := pwd + "\\" + "controller-pktmon.exe" + + p.pktmonCmd = exec.CommandContext(ctx, cmd) + p.pktmonCmd.Dir = pwd + p.pktmonCmd.Args = append(p.pktmonCmd.Args, "--socketpath", socket) + p.pktmonCmd.Env = os.Environ() + p.pktmonCmd.Stdout = p.stdWriter + p.pktmonCmd.Stderr = p.errWriter + + p.l.Info("calling start on pktmon stream server", zap.String("cmd", p.pktmonCmd.String())) + + // block this thread, and should it ever return, it's a problem + err = p.pktmonCmd.Run() + if err != nil { + return errors.Wrapf(err, "pktmon server exited when it should not have") + } + + // we never want to return happy from this + return errors.Wrapf(ErrUnexpectedExit, "pktmon server exited unexpectedly") +} + +func (p *PktmonGRPCManager) Stop() error { + if p.pktmonCmd != nil { + err := p.pktmonCmd.Process.Kill() + if err != nil { + return errors.Wrapf(err, "failed to kill pktmon server during stop") + } + } + return nil +} diff --git a/pkg/plugin/pktmon/pktmon_windows.go b/pkg/plugin/pktmon/pktmon_windows.go index 55d20c5ae6..edd0311400 100644 --- a/pkg/plugin/pktmon/pktmon_windows.go +++ b/pkg/plugin/pktmon/pktmon_windows.go @@ -2,10 +2,7 @@ package pktmon import ( "context" - "encoding/json" - "fmt" - "os" - "os/exec" + "strings" "github.com/pkg/errors" @@ -18,11 +15,8 @@ import ( "github.com/microsoft/retina/pkg/plugin/registry" "github.com/microsoft/retina/pkg/utils" "go.uber.org/zap" - "go.uber.org/zap/zapio" "golang.org/x/sync/errgroup" - "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" ) @@ -44,12 +38,8 @@ type Plugin struct { enricher enricher.EnricherInterface externalChannel chan *v1.Event l *log.ZapLogger - pktmonCmd *exec.Cmd - stdWriter *zapio.Writer - errWriter *zapio.Writer - grpcClient *GRPCClient - stream observerv1.Observer_GetFlowsClient + grpcManager GRPCManager } func init() { @@ -63,6 +53,9 @@ func New(*kcfg.Config) registry.Plugin { } func (p *Plugin) Init() error { + p.grpcManager = &PktmonGRPCManager{ + l: p.l, + } return nil } @@ -74,69 +67,6 @@ type GRPCClient struct { observerv1.ObserverClient } -func newGRPCClient() (*GRPCClient, error) { - retryPolicy := map[string]any{ - "methodConfig": []map[string]any{ - { - "waitForReady": true, - "retryPolicy": map[string]any{ - "MaxAttempts": connectionRetryAttempts, - "InitialBackoff": ".01s", - "MaxBackoff": ".01s", - "BackoffMultiplier": 1.0, - "RetryableStatusCodes": []string{"UNAVAILABLE"}, - }, - }, - }, - } - - bytes, err := json.Marshal(retryPolicy) - if err != nil { - return nil, errors.Wrapf(err, "failed to marshal retry policy") - } - - retryPolicyStr := string(bytes) - - conn, err := grpc.Dial(fmt.Sprintf("%s:%s", "unix", socket), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicyStr)) - if err != nil { - return nil, errors.Wrapf(err, "failed to dial pktmon server:") - } - - return &GRPCClient{observerv1.NewObserverClient(conn)}, nil -} - -func (p *Plugin) RunPktMonServer(ctx context.Context) error { - p.stdWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.InfoLevel} - defer p.stdWriter.Close() - p.errWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.ErrorLevel} - defer p.errWriter.Close() - - pwd, err := os.Getwd() - if err != nil { - return errors.Wrapf(err, "failed to get current working directory for pktmon") - } - - cmd := pwd + "\\" + "controller-pktmon.exe" - - p.pktmonCmd = exec.CommandContext(ctx, cmd) - p.pktmonCmd.Dir = pwd - p.pktmonCmd.Args = append(p.pktmonCmd.Args, "--socketpath", socket) - p.pktmonCmd.Env = os.Environ() - p.pktmonCmd.Stdout = p.stdWriter - p.pktmonCmd.Stderr = p.errWriter - - p.l.Info("calling start on pktmon stream server", zap.String("cmd", p.pktmonCmd.String())) - - // block this thread, and should it ever return, it's a problem - err = p.pktmonCmd.Run() - if err != nil { - return errors.Wrapf(err, "pktmon server exited when it should not have") - } - - // we never want to return happy from this - return errors.Wrapf(ErrUnexpectedExit, "pktmon server exited unexpectedly") -} - func (p *Plugin) Start(ctx context.Context) error { p.enricher = enricher.Instance() if p.enricher == nil { @@ -146,14 +76,14 @@ func (p *Plugin) Start(ctx context.Context) error { g, ctx := errgroup.WithContext(ctx) g.Go(func() error { - err := p.RunPktMonServer(ctx) + err := p.grpcManager.RunPktMonServer(ctx) if err != nil { return errors.Wrapf(err, "pktmon server exited") } return nil }) - err := p.SetupStream() + err := p.grpcManager.SetupStream() if err != nil { return errors.Wrapf(err, "failed to setup initial pktmon stream") } @@ -163,65 +93,41 @@ func (p *Plugin) Start(ctx context.Context) error { for { err := p.GetFlow(ctx) if stat, ok := status.FromError(err); ok { - if stat.Code() == codes.Internal { - p.l.Error("failed to get flow, retriable:", zap.Error(err)) + + if stat.Code() == codes.Unavailable && strings.Contains(err.Error(), "frame too large") { // so far it's the only retriable error + p.l.Error("failed to get flow, frame issue:", zap.Error(err)) + continue + } + + // commonly seen with: + // {"error":"failed to receive pktmon event: rpc error: code = Internal desc = unexpected EOF"} + // {"error":"failed to receive pktmon event: rpc error: code = Internal desc = received 65576-bytes data exceeding the limit 65535 bytes"} + // {"error":"failed to receive pktmon event: rpc error: code = Internal desc = grpc: failed to unmarshal the received message: proto: cannot parse invalid wire-format data"} + // {"error":"failed to receive pktmon event: rpc error: code = Internal desc = grpc: failed to unmarshal the received message: string field contains invalid UTF-8"} + // These errors don't impact subsequent messages received, and don't impact session connectivity, + // so we can log them and ignore them for further investigation without restarting + if stat.Code() == codes.Internal && + (strings.Contains(err.Error(), "unexpected EOF") || + strings.Contains(err.Error(), "exceeding the limit") || + strings.Contains(err.Error(), "cannot parse invalid wire-format data") || + strings.Contains(err.Error(), "string field contains invalid UTF-8")) { + + p.l.Error("failed to get flow, internal error:", zap.Error(err)) + continue } - p.l.Error("failed to get flow, retriable:", zap.Error(err)) - continue - } else { - return errors.Wrapf(err, "failed to get flow") } + return errors.Wrapf(err, "failed to get flow") } }) return g.Wait() } -func (p *Plugin) SetupStream() error { - var err error - fn := func() error { - p.l.Info("creating pktmon client") - p.grpcClient, err = newGRPCClient() - if err != nil { - return errors.Wrapf(err, "failed to create pktmon client before getting flows") - } - - return nil - } - err = utils.Retry(fn, connectionRetryAttempts) - if err != nil { - return errors.Wrapf(err, "failed to create pktmon client") - } - - return nil -} - -func (p *Plugin) StartStream(ctx context.Context) error { - if p.grpcClient == nil { - return errors.Wrapf(ErrNilGrpcClient, "unable to start stream") - } - - var err error - fn := func() error { - p.stream, err = p.grpcClient.GetFlows(ctx, &observerv1.GetFlowsRequest{}) - if err != nil { - return errors.Wrapf(err, "failed to open pktmon stream") - } - return nil - } - err = utils.Retry(fn, connectionRetryAttempts) - if err != nil { - return errors.Wrapf(err, "failed to create pktmon client") - } - - return nil -} - func (p *Plugin) GetFlow(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - err := p.StartStream(ctx) + err := p.grpcManager.StartStream(ctx) if err != nil { return errors.Wrapf(err, "failed to setup pktmon stream") } @@ -231,7 +137,7 @@ func (p *Plugin) GetFlow(ctx context.Context) error { case <-ctx.Done(): return errors.Wrapf(ctx.Err(), "pktmon plugin context done") default: - event, err := p.stream.Recv() + event, err := p.grpcManager.ReceiveFromStream() if err != nil { return errors.Wrapf(err, "failed to receive pktmon event") } @@ -273,13 +179,9 @@ func (p *Plugin) SetupChannel(ch chan *v1.Event) error { } func (p *Plugin) Stop() error { - if p.pktmonCmd != nil { - err := p.pktmonCmd.Process.Kill() - if err != nil { - return errors.Wrapf(err, "failed to kill pktmon server during stop") - } + if p.grpcManager != nil { + return errors.Wrapf(p.grpcManager.Stop(), "failed to stop pktmon") } - return nil } diff --git a/pkg/plugin/pktmon/pktmon_windows_test.go b/pkg/plugin/pktmon/pktmon_windows_test.go index 753cd4439c..c78a1b1fe5 100644 --- a/pkg/plugin/pktmon/pktmon_windows_test.go +++ b/pkg/plugin/pktmon/pktmon_windows_test.go @@ -4,19 +4,67 @@ import ( "context" "testing" - "github.com/microsoft/retina/pkg/config" + observerv1 "github.com/cilium/cilium/api/v1/observer" + "github.com/microsoft/retina/pkg/log" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) +type TestGRPCManager struct { + streamErrorIndex int + streamErrors []error +} + +func (p *TestGRPCManager) SetupStream() error { + return nil +} + +func (p *TestGRPCManager) RunPktMonServer(ctx context.Context) error { + <-ctx.Done() + return nil +} + +func (p *TestGRPCManager) StartStream(ctx context.Context) error { + return nil +} + +func (p *TestGRPCManager) ReceiveFromStream() (*observerv1.GetFlowsResponse, error) { + err := p.streamErrors[p.streamErrorIndex] + p.streamErrorIndex++ + return nil, err +} + +func (p *TestGRPCManager) Stop() error { + return nil +} + func TestStart(t *testing.T) { - // TestStart tests the Start function. - t.Run("TestStart", func(t *testing.T) { - // Create a new Plugin. - p := New(&config.Config{}) - // Start the Plugin. - err := p.Start(context.Background()) - // Check if the error is nil. - if err != nil { - t.Errorf("got %v, want nil", err) + opts := log.GetDefaultLogOpts() + log.SetupZapLogger(opts) + p := &Plugin{ + l: log.Logger().Named("test"), + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + p.grpcManager = &TestGRPCManager{ + streamErrors: []error{ + status.Errorf(codes.Unavailable, "frame too large"), + status.Errorf(codes.Internal, "unexpected EOF"), + status.Errorf(codes.Internal, "exceeding the limit"), + status.Errorf(codes.Internal, "cannot parse invalid wire-format data"), + status.Errorf(codes.Internal, "string field contains invalid UTF-8"), + status.Errorf(codes.Canceled, "context canceled"), + }, + } + + // Start the Plugin. + err := p.Start(ctx) + // Check if the error is nil. + if stat, ok := status.FromError(err); ok { + if stat.Code() != codes.Canceled { + t.Errorf("expected %v, got %v", codes.Canceled, stat.Code()) } - }) + } } From d24986ecfd3c7eec6712812c712b33388be873a2 Mon Sep 17 00:00:00 2001 From: Mathew Merrick Date: Thu, 23 Jan 2025 14:28:23 -0800 Subject: [PATCH 3/3] address lints --- pkg/plugin/pktmon/pktmon_grpc_windows.go | 22 ++++++++++------------ pkg/plugin/pktmon/pktmon_windows.go | 2 +- pkg/plugin/pktmon/pktmon_windows_test.go | 9 ++++++--- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/pkg/plugin/pktmon/pktmon_grpc_windows.go b/pkg/plugin/pktmon/pktmon_grpc_windows.go index 06300a27e3..c63e068b5a 100644 --- a/pkg/plugin/pktmon/pktmon_grpc_windows.go +++ b/pkg/plugin/pktmon/pktmon_grpc_windows.go @@ -18,9 +18,7 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -var ( - ErrNilStream = errors.New("stream is nil") -) +var ErrNilStream = errors.New("stream is nil") type GRPCManager interface { SetupStream() error @@ -31,7 +29,7 @@ type GRPCManager interface { Stop() error } -type PktmonGRPCManager struct { +type WindowsGRPCManager struct { grpcClient *GRPCClient stream observerv1.Observer_GetFlowsClient l *log.ZapLogger @@ -41,7 +39,7 @@ type PktmonGRPCManager struct { errWriter *zapio.Writer } -func (p *PktmonGRPCManager) SetupStream() error { +func (p *WindowsGRPCManager) SetupStream() error { var err error fn := func() error { p.l.Info("creating pktmon client") @@ -60,7 +58,7 @@ func (p *PktmonGRPCManager) SetupStream() error { return nil } -func (p *PktmonGRPCManager) StartStream(ctx context.Context) error { +func (p *WindowsGRPCManager) StartStream(ctx context.Context) error { if p.grpcClient == nil { return errors.Wrapf(ErrNilGrpcClient, "unable to start stream") } @@ -81,12 +79,12 @@ func (p *PktmonGRPCManager) StartStream(ctx context.Context) error { return nil } -func (p *PktmonGRPCManager) ReceiveFromStream() (*observerv1.GetFlowsResponse, error) { +func (p *WindowsGRPCManager) ReceiveFromStream() (*observerv1.GetFlowsResponse, error) { if p.stream == nil { return nil, errors.Wrapf(ErrNilStream, "unable to receive from stream") } - return p.stream.Recv() + return p.stream.Recv() //nolint:wrapcheck // wrapcheck is disabled because we want to return the error as is } func newGRPCClient() (*GRPCClient, error) { @@ -112,15 +110,15 @@ func newGRPCClient() (*GRPCClient, error) { retryPolicyStr := string(bytes) - conn, err := grpc.Dial(fmt.Sprintf("%s:%s", "unix", socket), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicyStr)) + conn, err := grpc.NewClient(fmt.Sprintf("%s:%s", "unix", socket), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicyStr)) if err != nil { - return nil, errors.Wrapf(err, "failed to dial pktmon server:") + return nil, errors.Wrapf(err, "failed to dial pktmon server") } return &GRPCClient{observerv1.NewObserverClient(conn)}, nil } -func (p *PktmonGRPCManager) RunPktMonServer(ctx context.Context) error { +func (p *WindowsGRPCManager) RunPktMonServer(ctx context.Context) error { p.stdWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.InfoLevel} defer p.stdWriter.Close() p.errWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.ErrorLevel} @@ -152,7 +150,7 @@ func (p *PktmonGRPCManager) RunPktMonServer(ctx context.Context) error { return errors.Wrapf(ErrUnexpectedExit, "pktmon server exited unexpectedly") } -func (p *PktmonGRPCManager) Stop() error { +func (p *WindowsGRPCManager) Stop() error { if p.pktmonCmd != nil { err := p.pktmonCmd.Process.Kill() if err != nil { diff --git a/pkg/plugin/pktmon/pktmon_windows.go b/pkg/plugin/pktmon/pktmon_windows.go index edd0311400..6688596d80 100644 --- a/pkg/plugin/pktmon/pktmon_windows.go +++ b/pkg/plugin/pktmon/pktmon_windows.go @@ -53,7 +53,7 @@ func New(*kcfg.Config) registry.Plugin { } func (p *Plugin) Init() error { - p.grpcManager = &PktmonGRPCManager{ + p.grpcManager = &WindowsGRPCManager{ l: p.l, } return nil diff --git a/pkg/plugin/pktmon/pktmon_windows_test.go b/pkg/plugin/pktmon/pktmon_windows_test.go index c78a1b1fe5..1451957739 100644 --- a/pkg/plugin/pktmon/pktmon_windows_test.go +++ b/pkg/plugin/pktmon/pktmon_windows_test.go @@ -6,6 +6,7 @@ import ( observerv1 "github.com/cilium/cilium/api/v1/observer" "github.com/microsoft/retina/pkg/log" + "github.com/stretchr/testify/require" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -24,7 +25,7 @@ func (p *TestGRPCManager) RunPktMonServer(ctx context.Context) error { return nil } -func (p *TestGRPCManager) StartStream(ctx context.Context) error { +func (p *TestGRPCManager) StartStream(_ context.Context) error { return nil } @@ -40,7 +41,9 @@ func (p *TestGRPCManager) Stop() error { func TestStart(t *testing.T) { opts := log.GetDefaultLogOpts() - log.SetupZapLogger(opts) + _, err := log.SetupZapLogger(opts) + require.NoError(t, err) + p := &Plugin{ l: log.Logger().Named("test"), } @@ -60,7 +63,7 @@ func TestStart(t *testing.T) { } // Start the Plugin. - err := p.Start(ctx) + err = p.Start(ctx) // Check if the error is nil. if stat, ok := status.FromError(err); ok { if stat.Code() != codes.Canceled {