diff --git a/cmd/hubble/daemon_linux.go b/cmd/hubble/daemon_linux.go index 7bc416377e9..2c4c82a6173 100644 --- a/cmd/hubble/daemon_linux.go +++ b/cmd/hubble/daemon_linux.go @@ -19,14 +19,11 @@ import ( "github.com/cilium/cilium/pkg/hive/cell" v1 "github.com/cilium/cilium/pkg/hubble/api/v1" - observerTypes "github.com/cilium/cilium/pkg/hubble/observer/types" "github.com/cilium/cilium/pkg/ipcache" "github.com/cilium/cilium/pkg/k8s" k8sClient "github.com/cilium/cilium/pkg/k8s/client" "github.com/cilium/cilium/pkg/k8s/watchers" - "github.com/cilium/cilium/pkg/monitor" monitoragent "github.com/cilium/cilium/pkg/monitor/agent" - monitorAPI "github.com/cilium/cilium/pkg/monitor/api" "github.com/cilium/cilium/pkg/node" "github.com/cilium/workerpool" @@ -148,18 +145,7 @@ func (d *Daemon) generateEvents(ctx context.Context) { return case event := <-d.eventChan: d.log.WithField("event", event).Debug("Sending event to monitor agent") - var err error - switch event.Event.(type) { - case monitor.LogRecordNotify: - err = d.monitorAgent.SendEvent(monitorAPI.MessageTypeAccessLog, event.Event) - case monitorAPI.AgentNotifyMessage: - err = d.monitorAgent.SendEvent(monitorAPI.MessageTypeAgent, event.Event) - case observerTypes.PerfEvent: - // send 1 here to signal bpf event - err = d.monitorAgent.SendEvent(1, event.Event) - default: - err = d.monitorAgent.SendEvent(0, event.Event) - } + err := d.monitorAgent.SendEvent(0, event) if err != nil { d.log.WithError(err).Error("Unable to send event to monitor agent") } diff --git a/pkg/hubble/common/decoder.go b/pkg/hubble/common/decoder.go index 1b98586fcd5..805d0c79aad 100644 --- a/pkg/hubble/common/decoder.go +++ b/pkg/hubble/common/decoder.go @@ -1 +1 @@ -package common \ No newline at end of file +package common diff --git a/pkg/monitoragent/monitoragent_linux.go b/pkg/monitoragent/monitoragent_linux.go index c224c984987..8a16addcf51 100644 --- a/pkg/monitoragent/monitoragent_linux.go +++ b/pkg/monitoragent/monitoragent_linux.go @@ -8,7 +8,6 @@ import ( "fmt" "github.com/cilium/cilium/api/v1/models" - observerTypes "github.com/cilium/cilium/pkg/hubble/observer/types" "github.com/cilium/cilium/pkg/lock" "github.com/cilium/cilium/pkg/monitor/agent/consumer" "github.com/cilium/cilium/pkg/monitor/agent/listener" @@ -65,19 +64,7 @@ func (a *monitorAgent) SendEvent(typ int, event interface{}) error { // While we want to avoid marshalling events if there are no active // listeners, there's no need to check for active consumers ahead of time. - log.Info("SendEvent called") - // perfEvents are bpf datapath events (reserved for 0-128), where 0 = unspecified - if typ > 0 && typ <= 128 { - log.Info("SendEvent called with PERF event") - event, ok := event.(observerTypes.PerfEvent) - if !ok { - return fmt.Errorf("unexpected event type for perf event") - } - a.sendPerfEvent(event) - } else { - log.Info("SendEvent called with AGENT event") - a.notifyAgentEvent(typ, event) - } + a.notifyAgentEvent(typ, event) // do not marshal notifications if there are no active listeners if !a.hasListeners() { @@ -111,20 +98,6 @@ func (a *monitorAgent) SendEvent(typ int, event interface{}) error { return nil } -func (a *monitorAgent) sendPerfEvent(event observerTypes.PerfEvent) { - a.Lock() - defer a.Unlock() - a.notifyPerfEventLocked(event.Data, event.CPU) -} - -// notifyPerfEventLocked notifies all consumers about a perf event. -// The caller must hold the monitor lock. -func (a *monitorAgent) notifyPerfEventLocked(data []byte, cpu int) { - for mc := range a.consumers { - mc.NotifyPerfEvent(data, cpu) - } -} - func (a *monitorAgent) RegisterNewListener(newListener listener.MonitorListener) { if a == nil || newListener == nil { return diff --git a/pkg/plugin/cilium/cilium_linux.go b/pkg/plugin/cilium/cilium_linux.go index 47fa4b3b155..c471b18716d 100644 --- a/pkg/plugin/cilium/cilium_linux.go +++ b/pkg/plugin/cilium/cilium_linux.go @@ -5,7 +5,6 @@ package cilium import ( - "bytes" "context" "encoding/gob" "errors" @@ -14,16 +13,12 @@ import ( "time" v1 "github.com/cilium/cilium/pkg/hubble/api/v1" - observerTypes "github.com/cilium/cilium/pkg/hubble/observer/types" - "github.com/cilium/cilium/pkg/monitor" - monitorAPI "github.com/cilium/cilium/pkg/monitor/api" "github.com/cilium/cilium/pkg/monitor/payload" kcfg "github.com/microsoft/retina/pkg/config" "github.com/microsoft/retina/pkg/enricher" "github.com/microsoft/retina/pkg/log" "github.com/microsoft/retina/pkg/plugin/api" "go.uber.org/zap" - "google.golang.org/protobuf/types/known/timestamppb" ) const ( @@ -51,6 +46,10 @@ func (c *cilium) Compile(ctx context.Context) error { } func (c *cilium) Init() error { + c.p = &parser{ + l: c.l, + } + c.p.Init() c.l.Info("Initialized cilium plugin") return nil } @@ -118,94 +117,17 @@ func (c *cilium) monitorLoop(ctx context.Context) error { c.l.Error("Failed to decode payload", zap.Error(err)) return err } - // monitorAgent.SendEvents - is used to notify for Agent Events (Access Log (proxy) and Agent Notify (cilium agent events - crud for ep, policy, svc)) - - // hubble monitorConsumer.sendEvent -- (NotifyPerfEvent) this func sends a monitorEvent to the consumer from hubble monitor. - // specifically, the hubble consumer adds the event to the observer's event channel - - // Agent Events - // - MessageTypeAccessLog: accesslog.LogRecord - // - MessageTypeAgent: api.AgentNotify - // Perf Events - // - MessageTypeDrop: monitor.DropNotify - // - MessageTypeDebug: monitor.DebugMsg - // - MessageTypeCapture: monitor.DebugCapture - // - MessageTypeTrace: monitor.TraceNotify - // - MessageTypePolicyVerdict: monitor.PolicyVerdictNotify - - c.l.Debug("Received cilium event", zap.Int("type", pl.Type), zap.Any("payload", pl.Data)) - switch pl.Type { - case payload.EventSample: - data := pl.Data - messageType := data[0] - switch messageType { - // Agent Events - case monitorAPI.MessageTypeAccessLog: - buf := bytes.NewBuffer(data[1:]) - dec := gob.NewDecoder(buf) - lr := monitor.LogRecordNotify{} - if err := dec.Decode(&lr); err != nil { - c.l.Error("Failed to decode log record notify", zap.Error(err)) - continue - } - c.externalChannel <- &v1.Event{Timestamp: timestamppb.Now(), Event: lr} - case monitorAPI.MessageTypeAgent: - buf := bytes.NewBuffer(data[1:]) - dec := gob.NewDecoder(buf) - an := monitorAPI.AgentNotifyMessage{} - if err := dec.Decode(&an); err != nil { - c.l.Error("Failed to decode agent notify", zap.Error(err)) - continue - } - c.externalChannel <- &v1.Event{Timestamp: timestamppb.Now(), Event: an} - // Perf events - // case monitorAPI.MessageTypeDrop: - // dn := monitor.DropNotify{} - // if err := binary.Read(bytes.NewReader(data), byteorder.Native, &dn); err != nil { - // c.l.Error("Failed to decode drop notify", zap.Error(err)) - // continue - // } - // c.l.Info("Drop event", zap.Any("data", dn)) - // case monitorAPI.MessageTypeTrace: - // tn := monitor.TraceNotify{} - // if err := monitor.DecodeTraceNotify(data, &tn); err != nil { - // c.l.Error("Failed to decode trace notify", zap.Error(err)) - // continue - // } - // c.l.Info("Trace event", zap.Any("data", tn)) - // case monitorAPI.MessageTypePolicyVerdict: - // pn := monitor.PolicyVerdictNotify{} - // if err := binary.Read(bytes.NewReader(data), byteorder.Native, &pn); err != nil { - // c.l.Error("Failed to decode policy verdict notify", zap.Error(err)) - // continue - // } - // c.l.Info("Policy verdict event", zap.Any("data", pn)) - // case monitorAPI.MessageTypeDebug: - // c.l.Info("Debug event", zap.Any("data", data)) - // case monitorAPI.MessageTypeCapture: - // c.l.Info("Capture event", zap.Any("data", data)) - // case monitorAPI.MessageTypeRecCapture: - // c.l.Info("Recorder capture event", zap.Any("data", data)) - // case monitorAPI.MessageTypeTraceSock: - // c.l.Info("Trace sock event", zap.Any("data", data)) - // default: - // c.l.Info("Unknown event", zap.Any("data", data)) - default: - ev := observerTypes.PerfEvent{ - CPU: pl.CPU, - Data: pl.Data, - } - c.externalChannel <- &v1.Event{Timestamp: timestamppb.Now(), Event: ev} + fl, err := c.p.Decode(&pl) + if err == nil { + c.l.Debug("Decoded flow", zap.Any("flow", fl)) + event := &v1.Event{ + Event: fl, + Timestamp: fl.GetTime(), } - case payload.RecordLost: - c.l.Warn("Record lost for cilium event", zap.Uint64("lost", pl.Lost)) - default: - c.l.Warn("Unknown event type", zap.Int("type", pl.Type)) - continue + c.externalChannel <- event + } else { + c.l.Warn("Failed to decode to flow", zap.Error(err)) } - // if newMA, ok := c.monitorAgent.(AgentV2); ok { - // newMA.SendPerfEvent(record) - // } } } } diff --git a/pkg/plugin/cilium/parser.go b/pkg/plugin/cilium/parser.go new file mode 100644 index 00000000000..a4ef8d87557 --- /dev/null +++ b/pkg/plugin/cilium/parser.go @@ -0,0 +1,320 @@ +package cilium + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "net" + "time" + + "github.com/cilium/cilium/api/v1/flow" + "github.com/cilium/cilium/pkg/byteorder" + parserErrors "github.com/cilium/cilium/pkg/hubble/parser/errors" + "github.com/cilium/cilium/pkg/monitor" + monitorAPI "github.com/cilium/cilium/pkg/monitor/api" + "github.com/cilium/cilium/pkg/monitor/payload" + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/microsoft/retina/pkg/utils" + "go.uber.org/zap" +) + +const ErrNotImplemented = "Error, not implemented for type: %s" + +var ( + ErrEmptyData = errors.New("empty data") + ErrNoPacketData = errors.New("no packet data") + ErrNoLayer4 = errors.New("no layer 4") +) + +func (p *parser) Init() { + packet := &packet{} + packet.decLayer = gopacket.NewDecodingLayerParser( + layers.LayerTypeEthernet, &packet.Ethernet, + &packet.IPv4, &packet.IPv6, + &packet.TCP, &packet.UDP, + ) + packet.decLayer.IgnoreUnsupported = true + p.packet = packet +} + +// monitorAgent.SendEvents - is used to notify for Agent Events (Access Log (proxy) and Agent Notify (cilium agent events - crud for ep, policy, svc)) +// hubble monitorConsumer.sendEvent -- (NotifyPerfEvent) this func sends a monitorEvent to the consumer from hubble monitor. +// specifically, the hubble consumer adds the event to the observer's event channel +// Agent Events +// - MessageTypeAccessLog: accesslog.LogRecord +// - MessageTypeAgent: api.AgentNotify +// +// Perf Events +// - MessageTypeDrop: monitor.DropNotify +// - MessageTypeDebug: monitor.DebugMsg +// - MessageTypeCapture: monitor.DebugCapture +// - MessageTypeTrace: monitor.TraceNotify +// - MessageTypePolicyVerdict: monitor.PolicyVerdictNotify +// +// Reference hubble/parser/threefour +func (p *parser) Decode(pl *payload.Payload) (*flow.Flow, error) { + switch pl.Type { + case payload.EventSample: + data := pl.Data + if len(data) == 0 { + return nil, ErrEmptyData + } + + var eventType uint8 = data[0] + var packetOffset int + var obsPoint flow.TraceObservationPoint + var dropNotify *monitor.DropNotify + var traceNotify *monitor.TraceNotify + var policyVerdict *monitor.PolicyVerdictNotify + // var debugCap *monitor.DebugCapture + var eventSubType uint8 + // var authType flow.AuthType + // prefix := fmt.Sprintf("CPU %02d:", pl.CPU) + + switch eventType { + // Agent Events + case monitorAPI.MessageTypeAccessLog: + // These are l7 events + // We have dns from ig. We are missing other such as kafka, icmp, etc + // buf := bytes.NewBuffer(data[1:]) + // dec := gob.NewDecoder(buf) + // lr := monitor.LogRecordNotify{} + // if err := dec.Decode(&lr); err != nil { + // p.l.Error("Failed to decode log record notify", zap.Error(err)) + // return parserErrors.ErrEventSkipped + // } + // p.l.Info("Access log event") + // lr.DumpJSON() + return nil, fmt.Errorf(ErrNotImplemented, monitorAPI.MessageTypeAccessLog) + case monitorAPI.MessageTypeAgent: + // buf := bytes.NewBuffer(data[1:]) + // dec := gob.NewDecoder(buf) + // an := monitorAPI.AgentNotify{} + // if err := dec.Decode(&an); err != nil { + // p.l.Error("Failed to decode agent notify", zap.Error(err)) + // return parserErrors.ErrEventSkipped + // } + // p.l.Info("Agent event") + // an.DumpJSON() + return nil, fmt.Errorf(ErrNotImplemented, monitorAPI.MessageTypeAgent) + // PerfEvents .. L34 + case monitorAPI.MessageTypeDrop: + dn := monitor.DropNotify{} + if err := binary.Read(bytes.NewReader(data), byteorder.Native, &dn); err != nil { + p.l.Error("Failed to decode drop notify", zap.Error(err)) + return nil, parserErrors.ErrEventSkipped + } + eventSubType = dn.SubType + packetOffset = monitor.DropNotifyLen + dropNotify = &dn + case monitorAPI.MessageTypeTrace: + tn := monitor.TraceNotify{} + if err := monitor.DecodeTraceNotify(data, &tn); err != nil { + p.l.Error("Failed to decode trace notify", zap.Error(err)) + return nil, parserErrors.ErrEventSkipped + } + eventSubType = tn.ObsPoint + if tn.ObsPoint == 0 { + obsPoint = flow.TraceObservationPoint_TO_ENDPOINT + } else { + obsPoint = flow.TraceObservationPoint(tn.ObsPoint) + } + packetOffset = (int)(tn.DataOffset()) + traceNotify = &tn + case monitorAPI.MessageTypePolicyVerdict: + pn := monitor.PolicyVerdictNotify{} + if err := binary.Read(bytes.NewReader(data), byteorder.Native, &pn); err != nil { + p.l.Error("Failed to decode policy verdict notify", zap.Error(err)) + return nil, parserErrors.ErrEventSkipped + } + eventSubType = pn.SubType + packetOffset = monitor.PolicyVerdictNotifyLen + // authType = flow.AuthType(pn.AuthType) + policyVerdict = &pn + // p.l.Info("Policy verdict event") + // pn.DumpInfo(data, false) + case monitorAPI.MessageTypeCapture: + // dc := monitor.DebugCapture{} + // if err := binary.Read(bytes.NewReader(data), byteorder.Native, &dc); err != nil { + // p.l.Error("Failed to decode debug capture", zap.Error(err)) + // } + return nil, fmt.Errorf(ErrNotImplemented, monitorAPI.MessageTypeNameCapture) + // PerfEvents + case monitorAPI.MessageTypeDebug: + // dm := monitor.DebugMsg{} + // if err := binary.Read(bytes.NewReader(data), byteorder.Native, &dm); err != nil { + // p.l.Error("Failed to decode debug message", zap.Error(err)) + // } + return nil, fmt.Errorf(ErrNotImplemented, monitorAPI.MessageTypeNameDebug) + case monitorAPI.MessageTypeTraceSock: + // tn := monitor.TraceSockNotify{} + // if err := binary.Read(bytes.NewReader(data), byteorder.Native, &tn); err != nil { + // p.l.Error("Failed to decode trace sock notify", zap.Error(err)) + // } + return nil, fmt.Errorf(ErrNotImplemented, monitorAPI.MessageTypeNameTraceSock) + case monitorAPI.MessageTypeRecCapture: + // no op for now + // rc := monitor.RecorderCapture{} + // if err := binary.Read(bytes.NewReader(data), byteorder.Native, &rc); err != nil { + // p.l.Error("Failed to decode recorder capture", zap.Error(err)) + // } + return nil, fmt.Errorf(ErrNotImplemented, monitorAPI.MessageTypeNameRecCapture) + default: + p.l.Debug("Unknown event", zap.Any("data", data)) + return nil, parserErrors.ErrUnknownEventType + } + + p.packet.Lock() + defer p.packet.Unlock() + // Decode the layers + if len(data) <= packetOffset { + p.l.Error("No packet data found") + return nil, ErrNoPacketData + } + + if len(data[packetOffset:]) > 0 { + err := p.packet.decLayer.DecodeLayers(data[packetOffset:], &p.packet.Layers) + if err != nil { + return nil, err + } + } else { + // Truncate layers to avoid accidental re-use. + p.packet.Layers = p.packet.Layers[:0] + } + eth, ip, l4, srcIP, dstIP, srcPort, dstPort := decodeLayers(p.packet) + + var protocol uint8 + if l4 != nil { + switch l4.Protocol.(type) { + case *flow.Layer4_TCP: + protocol = 6 + case *flow.Layer4_UDP: + protocol = 17 + } + } else { + return nil, ErrNoLayer4 + } + + newFl := utils.ToFlow( + time.Now().Unix(), + *srcIP, + *dstIP, + srcPort, + dstPort, + protocol, + uint32(obsPoint), + getVerdict(dropNotify, traceNotify, policyVerdict), + ) + newFl.IP.IpVersion = ip.IpVersion + newFl.EventType = &flow.CiliumEventType{ + Type: int32(eventType), + SubType: int32(eventSubType), + } + newFl.Ethernet = eth + var drVal uint32 + switch { + case dropNotify != nil: + drVal = uint32(dropNotify.SubType) + case policyVerdict != nil && policyVerdict.Verdict < 0: + // if the flow was dropped, verdict equals the negative of the drop reason + drVal = uint32(-policyVerdict.Verdict) + default: + drVal = 0 + } + newFl.DropReason = drVal + newFl.DropReasonDesc = flow.DropReason(newFl.DropReason) + // Decode ICMPv4/v6 + // Decode SCTP + // Decode ProxyPort + // Decode NIC + // Decode IsReply + return newFl, nil + case payload.RecordLost: + p.l.Warn("Record lost for cilium event", zap.Uint64("lost", pl.Lost)) + return nil, fmt.Errorf("Record lost for cilium event: %d", pl.Lost) + default: + p.l.Warn("Unknown event type", zap.Int("type", pl.Type)) + return nil, parserErrors.ErrUnknownEventType + } +} + +func decodeLayers(packet *packet) ( + ethernet *flow.Ethernet, + ip *flow.IP, + l4 *flow.Layer4, + sourceIP, destinationIP *net.IP, + sourcePort, destinationPort uint32, + // tcp summary +) { + for _, typ := range packet.Layers { + // summary = typ.String() + switch typ { + case layers.LayerTypeEthernet: + ethernet = &flow.Ethernet{ + Source: packet.Ethernet.SrcMAC.String(), + Destination: packet.Ethernet.DstMAC.String(), + } + case layers.LayerTypeIPv4: + sourceIP = &packet.IPv4.SrcIP + destinationIP = &packet.IPv4.DstIP + ip = &flow.IP{ + Source: sourceIP.String(), + Destination: destinationIP.String(), + IpVersion: flow.IPVersion_IPv4, + } + case layers.LayerTypeTCP: + l4 = &flow.Layer4{ + Protocol: &flow.Layer4_TCP{ + TCP: &flow.TCP{ + SourcePort: uint32(packet.TCP.SrcPort), + DestinationPort: uint32(packet.TCP.DstPort), + Flags: &flow.TCPFlags{ + FIN: packet.TCP.FIN, SYN: packet.TCP.SYN, RST: packet.TCP.RST, + PSH: packet.TCP.PSH, ACK: packet.TCP.ACK, URG: packet.TCP.URG, + ECE: packet.TCP.ECE, CWR: packet.TCP.CWR, NS: packet.TCP.NS, + }, + }, + }, + } + sourcePort = uint32(packet.TCP.SrcPort) + destinationPort = uint32(packet.TCP.DstPort) + // TODO: TCP Summary + case layers.LayerTypeUDP: + l4 = &flow.Layer4{ + Protocol: &flow.Layer4_UDP{ + UDP: &flow.UDP{ + SourcePort: uint32(packet.UDP.SrcPort), + DestinationPort: uint32(packet.UDP.DstPort), + }, + }, + } + sourcePort = uint32(packet.UDP.SrcPort) + destinationPort = uint32(packet.UDP.DstPort) + } + } + + return +} + +func getVerdict(dn *monitor.DropNotify, tn *monitor.TraceNotify, pn *monitor.PolicyVerdictNotify) flow.Verdict { + switch { + case dn != nil: + return flow.Verdict_DROPPED + case tn != nil: + return flow.Verdict_FORWARDED + case pn != nil: + if pn.Verdict < 0 { + return flow.Verdict_DROPPED + } + if pn.Verdict > 0 { + return flow.Verdict_REDIRECTED + } + if pn.IsTrafficAudited() { + return flow.Verdict_AUDIT + } + return flow.Verdict_FORWARDED + } + return flow.Verdict_VERDICT_UNKNOWN +} diff --git a/pkg/plugin/cilium/types_linux.go b/pkg/plugin/cilium/types_linux.go index 901a3e61e5a..5a2920f265b 100644 --- a/pkg/plugin/cilium/types_linux.go +++ b/pkg/plugin/cilium/types_linux.go @@ -6,6 +6,9 @@ import ( "net" v1 "github.com/cilium/cilium/pkg/hubble/api/v1" + "github.com/cilium/proxy/pkg/lock" + "github.com/google/gopacket" + "github.com/google/gopacket/layers" kcfg "github.com/microsoft/retina/pkg/config" "github.com/microsoft/retina/pkg/enricher" "github.com/microsoft/retina/pkg/log" @@ -22,4 +25,25 @@ type cilium struct { enricher enricher.EnricherInterface externalChannel chan *v1.Event connection net.Conn + p *parser +} + +type parser struct { + l *log.ZapLogger + packet *packet +} + +// re-usable packet to avoid reallocating gopacket datastructures +type packet struct { + lock.Mutex + decLayer *gopacket.DecodingLayerParser + Layers []gopacket.LayerType + layers.Ethernet + layers.IPv4 + layers.IPv6 + // layers.ICMPv4 + // layers.ICMPv6 + layers.TCP + layers.UDP + // layers.SCTP }