Skip to content

Commit

Permalink
feat: use scenario params and option to parse flows file
Browse files Browse the repository at this point in the history
Signed-off-by: Hunter Gregory <[email protected]>
  • Loading branch information
huntergregory committed Aug 5, 2024
1 parent 44a0760 commit 4455721
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 30 deletions.
9 changes: 5 additions & 4 deletions ai/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const kubeconfigPath = "/home/hunter/.kube/config"

func main() {
log := logrus.New()
log.SetLevel(logrus.DebugLevel)
// log.SetLevel(logrus.DebugLevel)

log.Info("starting app...")

Expand All @@ -36,6 +36,7 @@ func main() {

// configure LM (language model)
// model := lm.NewEchoModel()
// log.Info("initialized echo model")
model, err := lm.NewAzureOpenAI()
if err != nil {
log.WithError(err).Fatal("failed to create Azure OpenAI model")
Expand All @@ -52,9 +53,9 @@ func handleChat(log logrus.FieldLogger, config *rest.Config, clientset *kubernet

h := flowscenario.NewHandler(log, config, clientset, model)
params := &flowscenario.ScenarioParams{
Scenario: flowscenario.AnyScenario,
Namespace1: "frontend",
Namespace2: "backend",
Scenario: flowscenario.DropScenario,
Namespace1: "default",
Namespace2: "default",
}

ctx := context.TODO()
Expand Down
101 changes: 76 additions & 25 deletions ai/pkg/retrieval/flows/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package flows

import (
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
"time"

"github.com/microsoft/retina/ai/pkg/retrieval/flows/client"
"github.com/microsoft/retina/ai/pkg/util"

flowpb "github.com/cilium/cilium/api/v1/flow"
observerpb "github.com/cilium/cilium/api/v1/observer"
Expand All @@ -20,12 +21,13 @@ import (
)

type Retriever struct {
log logrus.FieldLogger
config *rest.Config
clientset *kubernetes.Clientset
initialized bool
client *client.Client
flows []*flowpb.Flow
log logrus.FieldLogger
config *rest.Config
clientset *kubernetes.Clientset
initialized bool
client *client.Client
readFromFile bool
flows []*flowpb.Flow
}

func NewRetriever(log logrus.FieldLogger, config *rest.Config, clientset *kubernetes.Clientset) *Retriever {
Expand All @@ -36,7 +38,16 @@ func NewRetriever(log logrus.FieldLogger, config *rest.Config, clientset *kubern
}
}

func (r *Retriever) UseFile() {
r.readFromFile = true
}

func (r *Retriever) Init() error {
if r.readFromFile {
r.log.Info("using flows from file")
return nil
}

client, err := client.New()
if err != nil {
return fmt.Errorf("failed to create grpc client. %v", err)
Expand All @@ -49,17 +60,22 @@ func (r *Retriever) Init() error {
return nil
}

func (r *Retriever) Observe(ctx context.Context, maxFlows int) ([]*flowpb.Flow, error) {
func (r *Retriever) Observe(ctx context.Context, req *observerpb.GetFlowsRequest) ([]*flowpb.Flow, error) {
if r.readFromFile {
flows, err := readFlowsFromFile("flows.json")
if err != nil {
return nil, fmt.Errorf("failed to read flows from file. %v", err)
}

return flows, nil
}

if !r.initialized {
if err := r.Init(); err != nil {
return nil, fmt.Errorf("failed to initialize. %v", err)
}
}

// translate parameters to flow request
// TODO: use parameters
req := flowsRequest()

// port-forward to hubble-relay
portForwardCtx, portForwardCancel := context.WithCancel(ctx)
defer portForwardCancel()
Expand All @@ -73,7 +89,10 @@ func (r *Retriever) Observe(ctx context.Context, maxFlows int) ([]*flowpb.Flow,
// observe flows
observeCtx, observeCancel := context.WithTimeout(ctx, 30*time.Second)
defer observeCancel()
flows, err := r.observeFlowsGRPC(observeCtx, req, maxFlows)

// FIXME don't use maxFlows anymore? check for EOF? then remove this constant: MaxFlowsToAnalyze
maxFlows := req.Number
flows, err := r.observeFlowsGRPC(observeCtx, req, int(maxFlows))
if err != nil {
return nil, fmt.Errorf("failed to observe flows over grpc. %v", err)
}
Expand All @@ -84,19 +103,13 @@ func (r *Retriever) Observe(ctx context.Context, maxFlows int) ([]*flowpb.Flow,
_ = cmd.Wait()
r.log.Info("stopped port-forward")

return flows, nil
}

func flowsRequest() *observerpb.GetFlowsRequest {
return &observerpb.GetFlowsRequest{
Number: util.MaxFlowsFromHubbleRelay,
Follow: false,
Whitelist: []*flowpb.FlowFilter{},
Blacklist: nil,
Since: nil,
Until: nil,
First: false,
r.log.Info("saving flows to JSON")
if err := saveFlowsToJSON(flows, "flows.json"); err != nil {
r.log.WithError(err).Error("failed to save flows to JSON")
return nil, err
}

return flows, nil
}

func (r *Retriever) observeFlowsGRPC(ctx context.Context, req *observerpb.GetFlowsRequest, maxFlows int) ([]*flowpb.Flow, error) {
Expand Down Expand Up @@ -164,3 +177,41 @@ func (r *Retriever) handleFlow(f *flowpb.Flow) {

r.flows = append(r.flows, f)
}

func saveFlowsToJSON(flows []*flowpb.Flow, filename string) error {
for _, f := range flows {
// to avoid getting an error:
// failed to encode JSON: json: error calling MarshalJSON for type *flow.Flow: proto:\u00a0google.protobuf.Any: unable to resolve \"type.googleapis.com/utils.RetinaMetadata\": not found
f.Extensions = nil
}

file, err := os.Create(filename)
if err != nil {
return fmt.Errorf("failed to create file: %w", err)
}
defer file.Close()

encoder := json.NewEncoder(file)
encoder.SetIndent("", " ") // optional: to make the JSON output pretty
if err := encoder.Encode(flows); err != nil {
return fmt.Errorf("failed to encode JSON: %w", err)
}

return nil
}

func readFlowsFromFile(filename string) ([]*flowpb.Flow, error) {
file, err := os.Open(filename)
if err != nil {
return nil, fmt.Errorf("failed to open file: %w", err)
}
defer file.Close()

var flows []*flowpb.Flow
decoder := json.NewDecoder(file)
if err := decoder.Decode(&flows); err != nil {
return nil, fmt.Errorf("failed to decode JSON: %w", err)
}

return flows, nil
}
110 changes: 109 additions & 1 deletion ai/pkg/scenarios/flows/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
flowretrieval "github.com/microsoft/retina/ai/pkg/retrieval/flows"
"github.com/microsoft/retina/ai/pkg/util"

flowpb "github.com/cilium/cilium/api/v1/flow"
observerpb "github.com/cilium/cilium/api/v1/observer"
"github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -55,11 +57,14 @@ func (h *Handler) Handle(ctx context.Context, question string, chat lm.ChatHisto
h.log.Info("handling flows scenario...")

// get flows
// h.r.UseFile()

if err := h.r.Init(); err != nil {
return "", fmt.Errorf("error initializing flow retriever: %w", err)
}

flows, err := h.r.Observe(ctx, util.MaxFlowsToAnalyze)
req := flowsRequest(params)
flows, err := h.r.Observe(ctx, req)
if err != nil {
return "", fmt.Errorf("error observing flows: %w", err)
}
Expand All @@ -86,3 +91,106 @@ func (h *Handler) Handle(ctx context.Context, question string, chat lm.ChatHisto

return resp, nil
}

// TODO DNS should not have a destination Pod (except maybe a specific coredns pod)
func flowsRequest(params *ScenarioParams) *observerpb.GetFlowsRequest {
req := &observerpb.GetFlowsRequest{
Number: util.MaxFlowsFromHubbleRelay,
Follow: true,
}

if len(params.Nodes) == 0 {
params.Nodes = nil
}

protocol := []string{"TCP", "UDP"}
if params.Scenario == DnsScenario {
protocol = []string{"DNS"}
}

if params.Namespace1 == "" && params.PodPrefix1 == "" && params.Namespace2 == "" && params.PodPrefix2 == "" {
req.Whitelist = []*flowpb.FlowFilter{
{
NodeName: params.Nodes,
Protocol: protocol,
},
}

return req
}

var prefix1 []string
if params.Namespace1 != "" || params.PodPrefix1 != "" {
prefix1 = append(prefix1, fmt.Sprintf("%s/%s", params.Namespace1, params.PodPrefix1))
}

var prefix2 []string
if params.Namespace2 != "" || params.PodPrefix2 != "" {
prefix2 = append(prefix2, fmt.Sprintf("%s/%s", params.Namespace2, params.PodPrefix2))
}

filterDirection1 := &flowpb.FlowFilter{
NodeName: params.Nodes,
SourcePod: prefix1,
DestinationPod: prefix2,
Protocol: protocol,
}

filterDirection2 := &flowpb.FlowFilter{
NodeName: params.Nodes,
SourcePod: prefix2,
DestinationPod: prefix1,
Protocol: protocol,
}

// filterPod1ToIP := &flowpb.FlowFilter{
// NodeName: params.Nodes,
// SourcePod: prefix1,
// DestinationIp: []string{"10.224.1.214"},
// Protocol: protocol,
// }

// filterPod1FromIP := &flowpb.FlowFilter{
// NodeName: params.Nodes,
// SourceIp: []string{"10.224.1.214"},
// DestinationPod: prefix1,
// Protocol: protocol,
// }

// includes services
// world := []string{"reserved:world"}

// filterPod1ToWorld := &flowpb.FlowFilter{
// NodeName: params.Nodes,
// SourcePod: prefix1,
// DestinationLabel: world,
// Protocol: protocol,
// }

// filterPod1FromWorld := &flowpb.FlowFilter{
// NodeName: params.Nodes,
// SourceLabel: world,
// DestinationPod: prefix1,
// Protocol: protocol,
// }

req.Whitelist = []*flowpb.FlowFilter{
filterDirection1,
filterDirection2,
// filterPod1FromIP,
// filterPod1ToIP,
}

req.Whitelist = nil

req.Blacklist = []*flowpb.FlowFilter{
{
SourcePod: []string{"kube-system/"},
},
{
DestinationPod: []string{"kube-system/"},
},
}

return req
}

0 comments on commit 4455721

Please sign in to comment.