diff --git a/pkg/plugin/pktmon/pktmon_grpc_windows.go b/pkg/plugin/pktmon/pktmon_grpc_windows.go new file mode 100644 index 0000000000..c63e068b5a --- /dev/null +++ b/pkg/plugin/pktmon/pktmon_grpc_windows.go @@ -0,0 +1,161 @@ +package pktmon + +import ( + "context" + "encoding/json" + "fmt" + "os" + "os/exec" + + "github.com/pkg/errors" + + observerv1 "github.com/cilium/cilium/api/v1/observer" + "github.com/microsoft/retina/pkg/log" + "github.com/microsoft/retina/pkg/utils" + "go.uber.org/zap" + "go.uber.org/zap/zapio" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +var ErrNilStream = errors.New("stream is nil") + +type GRPCManager interface { + SetupStream() error + StartStream(ctx context.Context) error + ReceiveFromStream() (*observerv1.GetFlowsResponse, error) + + RunPktMonServer(ctx context.Context) error + Stop() error +} + +type WindowsGRPCManager struct { + grpcClient *GRPCClient + stream observerv1.Observer_GetFlowsClient + l *log.ZapLogger + + pktmonCmd *exec.Cmd + stdWriter *zapio.Writer + errWriter *zapio.Writer +} + +func (p *WindowsGRPCManager) SetupStream() error { + var err error + fn := func() error { + p.l.Info("creating pktmon client") + p.grpcClient, err = newGRPCClient() + if err != nil { + return errors.Wrapf(err, "failed to create pktmon client before getting flows") + } + + return nil + } + err = utils.Retry(fn, connectionRetryAttempts) + if err != nil { + return errors.Wrapf(err, "failed to create pktmon client") + } + + return nil +} + +func (p *WindowsGRPCManager) StartStream(ctx context.Context) error { + if p.grpcClient == nil { + return errors.Wrapf(ErrNilGrpcClient, "unable to start stream") + } + + var err error + fn := func() error { + p.stream, err = p.grpcClient.GetFlows(ctx, &observerv1.GetFlowsRequest{}) + if err != nil { + return errors.Wrapf(err, "failed to open pktmon stream") + } + return nil + } + err = utils.Retry(fn, connectionRetryAttempts) + if err != nil { + return errors.Wrapf(err, "failed to create pktmon client") + } + + return nil +} + +func (p *WindowsGRPCManager) ReceiveFromStream() (*observerv1.GetFlowsResponse, error) { + if p.stream == nil { + return nil, errors.Wrapf(ErrNilStream, "unable to receive from stream") + } + + return p.stream.Recv() //nolint:wrapcheck // wrapcheck is disabled because we want to return the error as is +} + +func newGRPCClient() (*GRPCClient, error) { + retryPolicy := map[string]any{ + "methodConfig": []map[string]any{ + { + "waitForReady": true, + "retryPolicy": map[string]any{ + "MaxAttempts": connectionRetryAttempts, + "InitialBackoff": ".01s", + "MaxBackoff": ".01s", + "BackoffMultiplier": 1.0, + "RetryableStatusCodes": []string{"UNAVAILABLE"}, + }, + }, + }, + } + + bytes, err := json.Marshal(retryPolicy) + if err != nil { + return nil, errors.Wrapf(err, "failed to marshal retry policy") + } + + retryPolicyStr := string(bytes) + + conn, err := grpc.NewClient(fmt.Sprintf("%s:%s", "unix", socket), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicyStr)) + if err != nil { + return nil, errors.Wrapf(err, "failed to dial pktmon server") + } + + return &GRPCClient{observerv1.NewObserverClient(conn)}, nil +} + +func (p *WindowsGRPCManager) RunPktMonServer(ctx context.Context) error { + p.stdWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.InfoLevel} + defer p.stdWriter.Close() + p.errWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.ErrorLevel} + defer p.errWriter.Close() + + pwd, err := os.Getwd() + if err != nil { + return errors.Wrapf(err, "failed to get current working directory for pktmon") + } + + cmd := pwd + "\\" + "controller-pktmon.exe" + + p.pktmonCmd = exec.CommandContext(ctx, cmd) + p.pktmonCmd.Dir = pwd + p.pktmonCmd.Args = append(p.pktmonCmd.Args, "--socketpath", socket) + p.pktmonCmd.Env = os.Environ() + p.pktmonCmd.Stdout = p.stdWriter + p.pktmonCmd.Stderr = p.errWriter + + p.l.Info("calling start on pktmon stream server", zap.String("cmd", p.pktmonCmd.String())) + + // block this thread, and should it ever return, it's a problem + err = p.pktmonCmd.Run() + if err != nil { + return errors.Wrapf(err, "pktmon server exited when it should not have") + } + + // we never want to return happy from this + return errors.Wrapf(ErrUnexpectedExit, "pktmon server exited unexpectedly") +} + +func (p *WindowsGRPCManager) Stop() error { + if p.pktmonCmd != nil { + err := p.pktmonCmd.Process.Kill() + if err != nil { + return errors.Wrapf(err, "failed to kill pktmon server during stop") + } + } + return nil +} diff --git a/pkg/plugin/pktmon/pktmon_windows.go b/pkg/plugin/pktmon/pktmon_windows.go index 4b0759f292..6688596d80 100644 --- a/pkg/plugin/pktmon/pktmon_windows.go +++ b/pkg/plugin/pktmon/pktmon_windows.go @@ -2,10 +2,7 @@ package pktmon import ( "context" - "encoding/json" - "fmt" - "os" - "os/exec" + "strings" "github.com/pkg/errors" @@ -18,10 +15,8 @@ import ( "github.com/microsoft/retina/pkg/plugin/registry" "github.com/microsoft/retina/pkg/utils" "go.uber.org/zap" - "go.uber.org/zap/zapio" "golang.org/x/sync/errgroup" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -43,12 +38,8 @@ type Plugin struct { enricher enricher.EnricherInterface externalChannel chan *v1.Event l *log.ZapLogger - pktmonCmd *exec.Cmd - stdWriter *zapio.Writer - errWriter *zapio.Writer - grpcClient *GRPCClient - stream observerv1.Observer_GetFlowsClient + grpcManager GRPCManager } func init() { @@ -62,6 +53,9 @@ func New(*kcfg.Config) registry.Plugin { } func (p *Plugin) Init() error { + p.grpcManager = &WindowsGRPCManager{ + l: p.l, + } return nil } @@ -73,69 +67,6 @@ type GRPCClient struct { observerv1.ObserverClient } -func newGRPCClient() (*GRPCClient, error) { - retryPolicy := map[string]any{ - "methodConfig": []map[string]any{ - { - "waitForReady": true, - "retryPolicy": map[string]any{ - "MaxAttempts": connectionRetryAttempts, - "InitialBackoff": ".01s", - "MaxBackoff": ".01s", - "BackoffMultiplier": 1.0, - "RetryableStatusCodes": []string{"UNAVAILABLE"}, - }, - }, - }, - } - - bytes, err := json.Marshal(retryPolicy) - if err != nil { - return nil, errors.Wrapf(err, "failed to marshal retry policy") - } - - retryPolicyStr := string(bytes) - - conn, err := grpc.Dial(fmt.Sprintf("%s:%s", "unix", socket), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicyStr)) - if err != nil { - return nil, errors.Wrapf(err, "failed to dial pktmon server:") - } - - return &GRPCClient{observerv1.NewObserverClient(conn)}, nil -} - -func (p *Plugin) RunPktMonServer(ctx context.Context) error { - p.stdWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.InfoLevel} - defer p.stdWriter.Close() - p.errWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.ErrorLevel} - defer p.errWriter.Close() - - pwd, err := os.Getwd() - if err != nil { - return errors.Wrapf(err, "failed to get current working directory for pktmon") - } - - cmd := pwd + "\\" + "controller-pktmon.exe" - - p.pktmonCmd = exec.CommandContext(ctx, cmd) - p.pktmonCmd.Dir = pwd - p.pktmonCmd.Args = append(p.pktmonCmd.Args, "--socketpath", socket) - p.pktmonCmd.Env = os.Environ() - p.pktmonCmd.Stdout = p.stdWriter - p.pktmonCmd.Stderr = p.errWriter - - p.l.Info("calling start on pktmon stream server", zap.String("cmd", p.pktmonCmd.String())) - - // block this thread, and should it ever return, it's a problem - err = p.pktmonCmd.Run() - if err != nil { - return errors.Wrapf(err, "pktmon server exited when it should not have") - } - - // we never want to return happy from this - return errors.Wrapf(ErrUnexpectedExit, "pktmon server exited unexpectedly") -} - func (p *Plugin) Start(ctx context.Context) error { p.enricher = enricher.Instance() if p.enricher == nil { @@ -145,14 +76,14 @@ func (p *Plugin) Start(ctx context.Context) error { g, ctx := errgroup.WithContext(ctx) g.Go(func() error { - err := p.RunPktMonServer(ctx) + err := p.grpcManager.RunPktMonServer(ctx) if err != nil { return errors.Wrapf(err, "pktmon server exited") } return nil }) - err := p.SetupStream() + err := p.grpcManager.SetupStream() if err != nil { return errors.Wrapf(err, "failed to setup initial pktmon stream") } @@ -161,62 +92,42 @@ func (p *Plugin) Start(ctx context.Context) error { g.Go(func() error { for { err := p.GetFlow(ctx) - if _, ok := status.FromError(err); ok { - p.l.Error("failed to get flow, retriable:", zap.Error(err)) - continue + if stat, ok := status.FromError(err); ok { + + if stat.Code() == codes.Unavailable && strings.Contains(err.Error(), "frame too large") { // so far it's the only retriable error + p.l.Error("failed to get flow, frame issue:", zap.Error(err)) + continue + } + + // commonly seen with: + // {"error":"failed to receive pktmon event: rpc error: code = Internal desc = unexpected EOF"} + // {"error":"failed to receive pktmon event: rpc error: code = Internal desc = received 65576-bytes data exceeding the limit 65535 bytes"} + // {"error":"failed to receive pktmon event: rpc error: code = Internal desc = grpc: failed to unmarshal the received message: proto: cannot parse invalid wire-format data"} + // {"error":"failed to receive pktmon event: rpc error: code = Internal desc = grpc: failed to unmarshal the received message: string field contains invalid UTF-8"} + // These errors don't impact subsequent messages received, and don't impact session connectivity, + // so we can log them and ignore them for further investigation without restarting + if stat.Code() == codes.Internal && + (strings.Contains(err.Error(), "unexpected EOF") || + strings.Contains(err.Error(), "exceeding the limit") || + strings.Contains(err.Error(), "cannot parse invalid wire-format data") || + strings.Contains(err.Error(), "string field contains invalid UTF-8")) { + + p.l.Error("failed to get flow, internal error:", zap.Error(err)) + continue + } } - return errors.Wrapf(err, "failed to get flow, unrecoverable") + return errors.Wrapf(err, "failed to get flow") } }) return g.Wait() } -func (p *Plugin) SetupStream() error { - var err error - fn := func() error { - p.l.Info("creating pktmon client") - p.grpcClient, err = newGRPCClient() - if err != nil { - return errors.Wrapf(err, "failed to create pktmon client before getting flows") - } - - return nil - } - err = utils.Retry(fn, connectionRetryAttempts) - if err != nil { - return errors.Wrapf(err, "failed to create pktmon client") - } - - return nil -} - -func (p *Plugin) StartStream(ctx context.Context) error { - if p.grpcClient == nil { - return errors.Wrapf(ErrNilGrpcClient, "unable to start stream") - } - - var err error - fn := func() error { - p.stream, err = p.grpcClient.GetFlows(ctx, &observerv1.GetFlowsRequest{}) - if err != nil { - return errors.Wrapf(err, "failed to open pktmon stream") - } - return nil - } - err = utils.Retry(fn, connectionRetryAttempts) - if err != nil { - return errors.Wrapf(err, "failed to create pktmon client") - } - - return nil -} - func (p *Plugin) GetFlow(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - err := p.StartStream(ctx) + err := p.grpcManager.StartStream(ctx) if err != nil { return errors.Wrapf(err, "failed to setup pktmon stream") } @@ -226,7 +137,7 @@ func (p *Plugin) GetFlow(ctx context.Context) error { case <-ctx.Done(): return errors.Wrapf(ctx.Err(), "pktmon plugin context done") default: - event, err := p.stream.Recv() + event, err := p.grpcManager.ReceiveFromStream() if err != nil { return errors.Wrapf(err, "failed to receive pktmon event") } @@ -268,13 +179,9 @@ func (p *Plugin) SetupChannel(ch chan *v1.Event) error { } func (p *Plugin) Stop() error { - if p.pktmonCmd != nil { - err := p.pktmonCmd.Process.Kill() - if err != nil { - return errors.Wrapf(err, "failed to kill pktmon server during stop") - } + if p.grpcManager != nil { + return errors.Wrapf(p.grpcManager.Stop(), "failed to stop pktmon") } - return nil } diff --git a/pkg/plugin/pktmon/pktmon_windows_test.go b/pkg/plugin/pktmon/pktmon_windows_test.go new file mode 100644 index 0000000000..1451957739 --- /dev/null +++ b/pkg/plugin/pktmon/pktmon_windows_test.go @@ -0,0 +1,73 @@ +package pktmon + +import ( + "context" + "testing" + + observerv1 "github.com/cilium/cilium/api/v1/observer" + "github.com/microsoft/retina/pkg/log" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type TestGRPCManager struct { + streamErrorIndex int + streamErrors []error +} + +func (p *TestGRPCManager) SetupStream() error { + return nil +} + +func (p *TestGRPCManager) RunPktMonServer(ctx context.Context) error { + <-ctx.Done() + return nil +} + +func (p *TestGRPCManager) StartStream(_ context.Context) error { + return nil +} + +func (p *TestGRPCManager) ReceiveFromStream() (*observerv1.GetFlowsResponse, error) { + err := p.streamErrors[p.streamErrorIndex] + p.streamErrorIndex++ + return nil, err +} + +func (p *TestGRPCManager) Stop() error { + return nil +} + +func TestStart(t *testing.T) { + opts := log.GetDefaultLogOpts() + _, err := log.SetupZapLogger(opts) + require.NoError(t, err) + + p := &Plugin{ + l: log.Logger().Named("test"), + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + p.grpcManager = &TestGRPCManager{ + streamErrors: []error{ + status.Errorf(codes.Unavailable, "frame too large"), + status.Errorf(codes.Internal, "unexpected EOF"), + status.Errorf(codes.Internal, "exceeding the limit"), + status.Errorf(codes.Internal, "cannot parse invalid wire-format data"), + status.Errorf(codes.Internal, "string field contains invalid UTF-8"), + status.Errorf(codes.Canceled, "context canceled"), + }, + } + + // Start the Plugin. + err = p.Start(ctx) + // Check if the error is nil. + if stat, ok := status.FromError(err); ok { + if stat.Code() != codes.Canceled { + t.Errorf("expected %v, got %v", codes.Canceled, stat.Code()) + } + } +}