Skip to content

Commit a455a68

Browse files
committed
update to use cilium parser with no-op getters
1 parent 5bc2c73 commit a455a68

File tree

2 files changed

+43
-260
lines changed

2 files changed

+43
-260
lines changed

pkg/plugin/cilium/cilium_linux.go

+2-7
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,9 @@ func (c *cilium) monitorLoop(ctx context.Context) error {
117117
c.l.Error("Failed to decode payload", zap.Error(err))
118118
return err
119119
}
120-
fl, err := c.p.Decode(&pl)
120+
ev, err := c.p.Decode(&pl)
121121
if err == nil {
122-
c.l.Debug("Decoded flow", zap.Any("flow", fl))
123-
event := &v1.Event{
124-
Event: fl,
125-
Timestamp: fl.GetTime(),
126-
}
127-
c.externalChannel <- event
122+
c.externalChannel <- ev
128123
} else {
129124
c.l.Warn("Failed to decode to flow", zap.Error(err))
130125
}

pkg/plugin/cilium/parser.go

+41-253
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,28 @@
11
package cilium
22

33
import (
4-
"bytes"
5-
"encoding/binary"
64
"errors"
75
"fmt"
8-
"net"
96
"time"
107

11-
"github.com/cilium/cilium/api/v1/flow"
12-
"github.com/cilium/cilium/pkg/byteorder"
8+
v1 "github.com/cilium/cilium/pkg/hubble/api/v1"
9+
observerTypes "github.com/cilium/cilium/pkg/hubble/observer/types"
10+
hp "github.com/cilium/cilium/pkg/hubble/parser"
1311
parserErrors "github.com/cilium/cilium/pkg/hubble/parser/errors"
14-
"github.com/cilium/cilium/pkg/monitor"
12+
hptestutils "github.com/cilium/cilium/pkg/hubble/testutils"
1513
monitorAPI "github.com/cilium/cilium/pkg/monitor/api"
1614
"github.com/cilium/cilium/pkg/monitor/payload"
1715
"github.com/google/gopacket"
1816
"github.com/google/gopacket/layers"
19-
"github.com/microsoft/retina/pkg/utils"
17+
"github.com/google/uuid"
18+
"github.com/sirupsen/logrus"
2019
"go.uber.org/zap"
2120
)
2221

2322
const ErrNotImplemented = "Error, not implemented for type: %s"
2423

2524
var (
26-
ErrEmptyData = errors.New("empty data")
27-
ErrNoPacketData = errors.New("no packet data")
28-
ErrNoLayer4 = errors.New("no layer 4")
25+
ErrEmptyData = errors.New("empty data")
2926
)
3027

3128
func (p *parser) Init() {
@@ -47,190 +44,60 @@ func (p *parser) Init() {
4744
// - MessageTypeAgent: api.AgentNotify
4845
//
4946
// Perf Events
50-
// - MessageTypeDrop: monitor.DropNotify
47+
// - MessageTypeDrop: monitor.DropNotify
5148
// - MessageTypeDebug: monitor.DebugMsg
52-
// - MessageTypeCapture: monitor.DebugCapture
49+
// - MessageTypeCapture: monitor.DebugCapture
5350
// - MessageTypeTrace: monitor.TraceNotify
5451
// - MessageTypePolicyVerdict: monitor.PolicyVerdictNotify
5552
//
5653
// Reference hubble/parser/threefour
57-
func (p *parser) Decode(pl *payload.Payload) (*flow.Flow, error) {
54+
func (p *parser) Decode(pl *payload.Payload) (*v1.Event, error) {
5855
switch pl.Type {
5956
case payload.EventSample:
6057
data := pl.Data
6158
if len(data) == 0 {
6259
return nil, ErrEmptyData
6360
}
6461

65-
var eventType uint8 = data[0]
66-
var packetOffset int
67-
var obsPoint flow.TraceObservationPoint
68-
var dropNotify *monitor.DropNotify
69-
var traceNotify *monitor.TraceNotify
70-
var policyVerdict *monitor.PolicyVerdictNotify
71-
// var debugCap *monitor.DebugCapture
72-
var eventSubType uint8
73-
// var authType flow.AuthType
74-
// prefix := fmt.Sprintf("CPU %02d:", pl.CPU)
75-
76-
switch eventType {
77-
// Agent Events
78-
case monitorAPI.MessageTypeAccessLog:
79-
// These are l7 events
80-
// We have dns from ig. We are missing other such as kafka, icmp, etc
81-
// buf := bytes.NewBuffer(data[1:])
82-
// dec := gob.NewDecoder(buf)
83-
// lr := monitor.LogRecordNotify{}
84-
// if err := dec.Decode(&lr); err != nil {
85-
// p.l.Error("Failed to decode log record notify", zap.Error(err))
86-
// return parserErrors.ErrEventSkipped
87-
// }
88-
// p.l.Info("Access log event")
89-
// lr.DumpJSON()
90-
return nil, fmt.Errorf(ErrNotImplemented, monitorAPI.MessageTypeAccessLog)
91-
case monitorAPI.MessageTypeAgent:
92-
// buf := bytes.NewBuffer(data[1:])
93-
// dec := gob.NewDecoder(buf)
94-
// an := monitorAPI.AgentNotify{}
95-
// if err := dec.Decode(&an); err != nil {
96-
// p.l.Error("Failed to decode agent notify", zap.Error(err))
97-
// return parserErrors.ErrEventSkipped
98-
// }
99-
// p.l.Info("Agent event")
100-
// an.DumpJSON()
101-
return nil, fmt.Errorf(ErrNotImplemented, monitorAPI.MessageTypeAgent)
102-
// PerfEvents .. L34
103-
case monitorAPI.MessageTypeDrop:
104-
dn := monitor.DropNotify{}
105-
if err := binary.Read(bytes.NewReader(data), byteorder.Native, &dn); err != nil {
106-
p.l.Error("Failed to decode drop notify", zap.Error(err))
107-
return nil, parserErrors.ErrEventSkipped
108-
}
109-
eventSubType = dn.SubType
110-
packetOffset = monitor.DropNotifyLen
111-
dropNotify = &dn
112-
case monitorAPI.MessageTypeTrace:
113-
tn := monitor.TraceNotify{}
114-
if err := monitor.DecodeTraceNotify(data, &tn); err != nil {
115-
p.l.Error("Failed to decode trace notify", zap.Error(err))
116-
return nil, parserErrors.ErrEventSkipped
117-
}
118-
eventSubType = tn.ObsPoint
119-
if tn.ObsPoint == 0 {
120-
obsPoint = flow.TraceObservationPoint_TO_ENDPOINT
121-
} else {
122-
obsPoint = flow.TraceObservationPoint(tn.ObsPoint)
123-
}
124-
packetOffset = (int)(tn.DataOffset())
125-
traceNotify = &tn
126-
case monitorAPI.MessageTypePolicyVerdict:
127-
pn := monitor.PolicyVerdictNotify{}
128-
if err := binary.Read(bytes.NewReader(data), byteorder.Native, &pn); err != nil {
129-
p.l.Error("Failed to decode policy verdict notify", zap.Error(err))
130-
return nil, parserErrors.ErrEventSkipped
131-
}
132-
eventSubType = pn.SubType
133-
packetOffset = monitor.PolicyVerdictNotifyLen
134-
// authType = flow.AuthType(pn.AuthType)
135-
policyVerdict = &pn
136-
// p.l.Info("Policy verdict event")
137-
// pn.DumpInfo(data, false)
138-
case monitorAPI.MessageTypeCapture:
139-
// dc := monitor.DebugCapture{}
140-
// if err := binary.Read(bytes.NewReader(data), byteorder.Native, &dc); err != nil {
141-
// p.l.Error("Failed to decode debug capture", zap.Error(err))
142-
// }
143-
return nil, fmt.Errorf(ErrNotImplemented, monitorAPI.MessageTypeNameCapture)
144-
// PerfEvents
145-
case monitorAPI.MessageTypeDebug:
146-
// dm := monitor.DebugMsg{}
147-
// if err := binary.Read(bytes.NewReader(data), byteorder.Native, &dm); err != nil {
148-
// p.l.Error("Failed to decode debug message", zap.Error(err))
149-
// }
150-
return nil, fmt.Errorf(ErrNotImplemented, monitorAPI.MessageTypeNameDebug)
151-
case monitorAPI.MessageTypeTraceSock:
152-
// tn := monitor.TraceSockNotify{}
153-
// if err := binary.Read(bytes.NewReader(data), byteorder.Native, &tn); err != nil {
154-
// p.l.Error("Failed to decode trace sock notify", zap.Error(err))
155-
// }
156-
return nil, fmt.Errorf(ErrNotImplemented, monitorAPI.MessageTypeNameTraceSock)
157-
case monitorAPI.MessageTypeRecCapture:
158-
// no op for now
159-
// rc := monitor.RecorderCapture{}
160-
// if err := binary.Read(bytes.NewReader(data), byteorder.Native, &rc); err != nil {
161-
// p.l.Error("Failed to decode recorder capture", zap.Error(err))
162-
// }
163-
return nil, fmt.Errorf(ErrNotImplemented, monitorAPI.MessageTypeNameRecCapture)
164-
default:
165-
p.l.Debug("Unknown event", zap.Any("data", data))
166-
return nil, parserErrors.ErrUnknownEventType
62+
p, err := hp.New(logrus.WithField("cilium", "parser"),
63+
// We use noOp getters here since we will use our own custom parser in hubble
64+
&hptestutils.NoopEndpointGetter,
65+
&hptestutils.NoopIdentityGetter,
66+
&hptestutils.NoopDNSGetter,
67+
&hptestutils.NoopIPGetter,
68+
&hptestutils.NoopServiceGetter,
69+
&hptestutils.NoopLinkGetter,
70+
&hptestutils.NoopPodMetadataGetter,
71+
)
72+
if err != nil {
73+
return nil, err
16774
}
75+
var eventType uint8 = data[0]
16876

169-
p.packet.Lock()
170-
defer p.packet.Unlock()
171-
// Decode the layers
172-
if len(data) <= packetOffset {
173-
p.l.Error("No packet data found")
174-
return nil, ErrNoPacketData
77+
monEvent := &observerTypes.MonitorEvent{
78+
Timestamp: time.Now(),
79+
UUID: uuid.New(),
17580
}
176-
177-
if len(data[packetOffset:]) > 0 {
178-
err := p.packet.decLayer.DecodeLayers(data[packetOffset:], &p.packet.Layers)
81+
switch eventType {
82+
case monitorAPI.MessageTypeAccessLog, monitorAPI.MessageTypeAgent:
83+
// AgentEvents correlate to cilium agent events
84+
// This also includes access logs, which are Log Records
85+
// Log Records can be DNS traces for CNP related pods
86+
monEvent.Payload = &observerTypes.AgentEvent{}
87+
return nil, fmt.Errorf(ErrNotImplemented, monitorAPI.MessageTypeNameAgent)
88+
case monitorAPI.MessageTypeDrop, monitorAPI.MessageTypeTrace, monitorAPI.MessageTypePolicyVerdict, monitorAPI.MessageTypeCapture:
89+
perfEvent := &observerTypes.PerfEvent{}
90+
perfEvent.Data = data
91+
perfEvent.CPU = pl.CPU
92+
monEvent.Payload = perfEvent
93+
event, err := p.Decode(monEvent)
17994
if err != nil {
18095
return nil, err
18196
}
182-
} else {
183-
// Truncate layers to avoid accidental re-use.
184-
p.packet.Layers = p.packet.Layers[:0]
185-
}
186-
eth, ip, l4, srcIP, dstIP, srcPort, dstPort := decodeLayers(p.packet)
187-
188-
var protocol uint8
189-
if l4 != nil {
190-
switch l4.Protocol.(type) {
191-
case *flow.Layer4_TCP:
192-
protocol = 6
193-
case *flow.Layer4_UDP:
194-
protocol = 17
195-
}
196-
} else {
197-
return nil, ErrNoLayer4
198-
}
199-
200-
newFl := utils.ToFlow(
201-
time.Now().Unix(),
202-
*srcIP,
203-
*dstIP,
204-
srcPort,
205-
dstPort,
206-
protocol,
207-
uint32(obsPoint),
208-
getVerdict(dropNotify, traceNotify, policyVerdict),
209-
)
210-
newFl.IP.IpVersion = ip.IpVersion
211-
newFl.EventType = &flow.CiliumEventType{
212-
Type: int32(eventType),
213-
SubType: int32(eventSubType),
214-
}
215-
newFl.Ethernet = eth
216-
var drVal uint32
217-
switch {
218-
case dropNotify != nil:
219-
drVal = uint32(dropNotify.SubType)
220-
case policyVerdict != nil && policyVerdict.Verdict < 0:
221-
// if the flow was dropped, verdict equals the negative of the drop reason
222-
drVal = uint32(-policyVerdict.Verdict)
97+
return event, nil
22398
default:
224-
drVal = 0
99+
return nil, parserErrors.ErrUnknownEventType
225100
}
226-
newFl.DropReason = drVal
227-
newFl.DropReasonDesc = flow.DropReason(newFl.DropReason)
228-
// Decode ICMPv4/v6
229-
// Decode SCTP
230-
// Decode ProxyPort
231-
// Decode NIC
232-
// Decode IsReply
233-
return newFl, nil
234101
case payload.RecordLost:
235102
p.l.Warn("Record lost for cilium event", zap.Uint64("lost", pl.Lost))
236103
return nil, fmt.Errorf("Record lost for cilium event: %d", pl.Lost)
@@ -239,82 +106,3 @@ func (p *parser) Decode(pl *payload.Payload) (*flow.Flow, error) {
239106
return nil, parserErrors.ErrUnknownEventType
240107
}
241108
}
242-
243-
func decodeLayers(packet *packet) (
244-
ethernet *flow.Ethernet,
245-
ip *flow.IP,
246-
l4 *flow.Layer4,
247-
sourceIP, destinationIP *net.IP,
248-
sourcePort, destinationPort uint32,
249-
// tcp summary
250-
) {
251-
for _, typ := range packet.Layers {
252-
// summary = typ.String()
253-
switch typ {
254-
case layers.LayerTypeEthernet:
255-
ethernet = &flow.Ethernet{
256-
Source: packet.Ethernet.SrcMAC.String(),
257-
Destination: packet.Ethernet.DstMAC.String(),
258-
}
259-
case layers.LayerTypeIPv4:
260-
sourceIP = &packet.IPv4.SrcIP
261-
destinationIP = &packet.IPv4.DstIP
262-
ip = &flow.IP{
263-
Source: sourceIP.String(),
264-
Destination: destinationIP.String(),
265-
IpVersion: flow.IPVersion_IPv4,
266-
}
267-
case layers.LayerTypeTCP:
268-
l4 = &flow.Layer4{
269-
Protocol: &flow.Layer4_TCP{
270-
TCP: &flow.TCP{
271-
SourcePort: uint32(packet.TCP.SrcPort),
272-
DestinationPort: uint32(packet.TCP.DstPort),
273-
Flags: &flow.TCPFlags{
274-
FIN: packet.TCP.FIN, SYN: packet.TCP.SYN, RST: packet.TCP.RST,
275-
PSH: packet.TCP.PSH, ACK: packet.TCP.ACK, URG: packet.TCP.URG,
276-
ECE: packet.TCP.ECE, CWR: packet.TCP.CWR, NS: packet.TCP.NS,
277-
},
278-
},
279-
},
280-
}
281-
sourcePort = uint32(packet.TCP.SrcPort)
282-
destinationPort = uint32(packet.TCP.DstPort)
283-
// TODO: TCP Summary
284-
case layers.LayerTypeUDP:
285-
l4 = &flow.Layer4{
286-
Protocol: &flow.Layer4_UDP{
287-
UDP: &flow.UDP{
288-
SourcePort: uint32(packet.UDP.SrcPort),
289-
DestinationPort: uint32(packet.UDP.DstPort),
290-
},
291-
},
292-
}
293-
sourcePort = uint32(packet.UDP.SrcPort)
294-
destinationPort = uint32(packet.UDP.DstPort)
295-
}
296-
}
297-
298-
return
299-
}
300-
301-
func getVerdict(dn *monitor.DropNotify, tn *monitor.TraceNotify, pn *monitor.PolicyVerdictNotify) flow.Verdict {
302-
switch {
303-
case dn != nil:
304-
return flow.Verdict_DROPPED
305-
case tn != nil:
306-
return flow.Verdict_FORWARDED
307-
case pn != nil:
308-
if pn.Verdict < 0 {
309-
return flow.Verdict_DROPPED
310-
}
311-
if pn.Verdict > 0 {
312-
return flow.Verdict_REDIRECTED
313-
}
314-
if pn.IsTrafficAudited() {
315-
return flow.Verdict_AUDIT
316-
}
317-
return flow.Verdict_FORWARDED
318-
}
319-
return flow.Verdict_VERDICT_UNKNOWN
320-
}

0 commit comments

Comments
 (0)