Skip to content

Commit 64e8c6a

Browse files
committed
feat: dns scenario and other improvements
Signed-off-by: Hunter Gregory <[email protected]>
1 parent a6c7c40 commit 64e8c6a

File tree

12 files changed

+437
-80
lines changed

12 files changed

+437
-80
lines changed

ai/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
- Change into this *ai/* folder.
66
- `go mod tidy ; go mod vendor`
7-
- Set your `kubeconfigPath` in *main.go*.
7+
- Modify the `defaultConfig` values in *main.go*
88
- If using Azure OpenAI:
99
- Make sure you're logged into your account/subscription in your terminal.
1010
- Specify environment variables for Deployment name and Endpoint URL. Get deployment from e.g. [https://oai.azure.com/portal/deployment](https://oai.azure.com/portal/deployment) and Endpoint from e.g. Deployment > Playground > Code.

ai/main.go

+74-13
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,108 @@
11
package main
22

33
import (
4+
"fmt"
5+
"os/user"
6+
47
"github.com/microsoft/retina/ai/pkg/chat"
58
"github.com/microsoft/retina/ai/pkg/lm"
9+
"github.com/microsoft/retina/ai/pkg/scenarios"
10+
"github.com/microsoft/retina/ai/pkg/scenarios/drops"
611

712
"github.com/sirupsen/logrus"
813
"k8s.io/client-go/kubernetes"
914
"k8s.io/client-go/tools/clientcmd"
1015
)
1116

12-
const kubeconfigPath = "/home/hunter/.kube/config"
17+
// TODO incorporate this code into a CLI tool someday
18+
19+
type config struct {
20+
// currently supports "echo" or "AOAI"
21+
model string
22+
23+
// optional. defaults to ~/.kube/config
24+
kubeconfigPath string
25+
26+
// retrieved flows are currently written to ./flows.json
27+
useFlowsFromFile bool
1328

14-
// const kubeconfigPath = "C:\\Users\\hgregory\\.kube\\config"
29+
// eventually, the below should be optional once user input is implemented
30+
question string
31+
history lm.ChatHistory
32+
33+
// eventually, the below should be optional once scenario selection is implemented
34+
scenario *scenarios.Definition
35+
parameters map[string]string
36+
}
37+
38+
var defaultConfig = &config{
39+
model: "echo", // echo or AOAI
40+
useFlowsFromFile: false,
41+
question: "What's wrong with my app?",
42+
history: nil,
43+
scenario: drops.Definition, // drops.Definition or dns.Definition
44+
parameters: map[string]string{
45+
scenarios.Namespace1.Name: "default",
46+
// scenarios.PodPrefix1.Name: "toolbox-pod",
47+
// scenarios.Namespace2.Name: "default",
48+
// scenarios.PodPrefix2.Name: "toolbox-pod",
49+
// dns.DNSQuery.Name: "google.com",
50+
// scenarios.Nodes.Name: "[node1,node2]",
51+
},
52+
}
1553

1654
func main() {
55+
run(defaultConfig)
56+
}
57+
58+
func run(cfg *config) {
1759
log := logrus.New()
1860
// log.SetLevel(logrus.DebugLevel)
1961

2062
log.Info("starting app...")
2163

2264
// retrieve configs
23-
config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
65+
if cfg.kubeconfigPath == "" {
66+
usr, err := user.Current()
67+
if err != nil {
68+
log.WithError(err).Fatal("failed to get current user")
69+
}
70+
cfg.kubeconfigPath = usr.HomeDir + "/.kube/config"
71+
}
72+
73+
kconfig, err := clientcmd.BuildConfigFromFlags("", cfg.kubeconfigPath)
2474
if err != nil {
2575
log.WithError(err).Fatal("failed to get kubeconfig")
2676
}
2777

28-
clientset, err := kubernetes.NewForConfig(config)
78+
clientset, err := kubernetes.NewForConfig(kconfig)
2979
if err != nil {
3080
log.WithError(err).Fatal("failed to create clientset")
3181
}
3282
log.Info("retrieved kubeconfig and clientset")
3383

3484
// configure LM (language model)
35-
// model := lm.NewEchoModel()
36-
// log.Info("initialized echo model")
37-
model, err := lm.NewAzureOpenAI()
38-
if err != nil {
39-
log.WithError(err).Fatal("failed to create Azure OpenAI model")
85+
var model lm.Model
86+
switch cfg.model {
87+
case "echo":
88+
model = lm.NewEchoModel()
89+
log.Info("initialized echo model")
90+
case "AOAI":
91+
model, err = lm.NewAzureOpenAI()
92+
if err != nil {
93+
log.WithError(err).Fatal("failed to create Azure OpenAI model")
94+
}
95+
log.Info("initialized Azure OpenAI model")
96+
default:
97+
log.Fatalf("unsupported model: %s", cfg.model)
4098
}
41-
log.Info("initialized Azure OpenAI model")
4299

43-
bot := chat.NewBot(log, config, clientset, model)
44-
if err := bot.Loop(); err != nil {
45-
log.WithError(err).Fatal("error running chat loop")
100+
bot := chat.NewBot(log, kconfig, clientset, model, cfg.useFlowsFromFile)
101+
newHistory, err := bot.HandleScenario(cfg.question, cfg.history, cfg.scenario, cfg.parameters)
102+
if err != nil {
103+
log.WithError(err).Fatal("error handling scenario")
46104
}
105+
106+
log.Info("handled scenario")
107+
fmt.Println(newHistory[len(newHistory)-1].Assistant)
47108
}

ai/pkg/chat/chat.go

+57-44
Original file line numberDiff line numberDiff line change
@@ -7,41 +7,67 @@ import (
77
"github.com/microsoft/retina/ai/pkg/lm"
88
flowretrieval "github.com/microsoft/retina/ai/pkg/retrieval/flows"
99
"github.com/microsoft/retina/ai/pkg/scenarios"
10-
"github.com/microsoft/retina/ai/pkg/scenarios/dns"
11-
"github.com/microsoft/retina/ai/pkg/scenarios/drops"
1210

1311
"github.com/sirupsen/logrus"
1412
"k8s.io/client-go/kubernetes"
1513
"k8s.io/client-go/rest"
1614
)
1715

18-
var (
19-
definitions = []*scenarios.Definition{
20-
drops.Definition,
21-
dns.Definition,
22-
}
23-
)
24-
2516
type Bot struct {
26-
log logrus.FieldLogger
27-
config *rest.Config
28-
clientset *kubernetes.Clientset
29-
model lm.Model
17+
log logrus.FieldLogger
18+
config *rest.Config
19+
clientset *kubernetes.Clientset
20+
model lm.Model
21+
flowRetriever *flowretrieval.Retriever
3022
}
3123

3224
// input log, config, clientset, model
33-
func NewBot(log logrus.FieldLogger, config *rest.Config, clientset *kubernetes.Clientset, model lm.Model) *Bot {
34-
return &Bot{
35-
log: log.WithField("component", "chat"),
36-
config: config,
37-
clientset: clientset,
38-
model: model,
25+
func NewBot(log logrus.FieldLogger, config *rest.Config, clientset *kubernetes.Clientset, model lm.Model, useFlowsFromFile bool) *Bot {
26+
b := &Bot{
27+
log: log.WithField("component", "chat"),
28+
config: config,
29+
clientset: clientset,
30+
model: model,
31+
flowRetriever: flowretrieval.NewRetriever(log, config, clientset),
32+
}
33+
34+
if useFlowsFromFile {
35+
b.flowRetriever.UseFile()
3936
}
37+
38+
return b
4039
}
4140

41+
func (b *Bot) HandleScenario(question string, history lm.ChatHistory, definition *scenarios.Definition, parameters map[string]string) (lm.ChatHistory, error) {
42+
if definition == nil {
43+
return history, fmt.Errorf("no scenario selected")
44+
}
45+
46+
cfg := &scenarios.Config{
47+
Log: b.log,
48+
Config: b.config,
49+
Clientset: b.clientset,
50+
Model: b.model,
51+
FlowRetriever: b.flowRetriever,
52+
}
53+
54+
ctx := context.TODO()
55+
response, err := definition.Handle(ctx, cfg, parameters, question, history)
56+
if err != nil {
57+
return history, fmt.Errorf("error handling scenario: %w", err)
58+
}
59+
60+
history = append(history, lm.MessagePair{
61+
User: question,
62+
Assistant: response,
63+
})
64+
65+
return history, nil
66+
}
67+
68+
// FIXME get user input and implement scenario selection
4269
func (b *Bot) Loop() error {
4370
var history lm.ChatHistory
44-
flowRetriever := flowretrieval.NewRetriever(b.log, b.config, b.clientset)
4571

4672
for {
4773
// TODO get user input
@@ -53,39 +79,26 @@ func (b *Bot) Loop() error {
5379
return fmt.Errorf("error selecting scenario: %w", err)
5480
}
5581

56-
// cfg.FlowRetriever.UseFile()
57-
58-
cfg := &scenarios.Config{
59-
Log: b.log,
60-
Config: b.config,
61-
Clientset: b.clientset,
62-
Model: b.model,
63-
FlowRetriever: flowRetriever,
64-
}
65-
66-
ctx := context.TODO()
67-
response, err := definition.Handle(ctx, cfg, params, question, history)
82+
newHistory, err := b.HandleScenario(question, history, definition, params)
6883
if err != nil {
6984
return fmt.Errorf("error handling scenario: %w", err)
7085
}
7186

72-
fmt.Println(response)
87+
fmt.Println(newHistory[len(newHistory)-1].Assistant)
7388

74-
// TODO keep chat loop going
75-
break
89+
history = newHistory
7690
}
77-
78-
return nil
7991
}
8092

93+
// FIXME fix prompts
8194
func (b *Bot) selectScenario(question string, history lm.ChatHistory) (*scenarios.Definition, map[string]string, error) {
82-
// TODO use chat interface
83-
// FIXME hard-coding the scenario and params for now
84-
d := definitions[0]
85-
params := map[string]string{
86-
scenarios.Namespace1.Name: "default",
87-
scenarios.Namespace2.Name: "default",
95+
ctx := context.TODO()
96+
response, err := b.model.Generate(ctx, selectionSystemPrompt, nil, selectionPrompt(question, history))
97+
if err != nil {
98+
return nil, nil, fmt.Errorf("error generating response: %w", err)
8899
}
89100

90-
return d, params, nil
101+
// TODO parse response and return scenario definition and parameters
102+
_ = response
103+
return nil, nil, nil
91104
}

ai/pkg/chat/prompt.go

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package chat
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
7+
"github.com/microsoft/retina/ai/pkg/lm"
8+
"github.com/microsoft/retina/ai/pkg/scenarios"
9+
"github.com/microsoft/retina/ai/pkg/scenarios/dns"
10+
"github.com/microsoft/retina/ai/pkg/scenarios/drops"
11+
)
12+
13+
const selectionSystemPrompt = "Select a scenario"
14+
15+
var (
16+
definitions = []*scenarios.Definition{
17+
drops.Definition,
18+
dns.Definition,
19+
}
20+
)
21+
22+
func selectionPrompt(question string, history lm.ChatHistory) string {
23+
// TODO include parameters etc. and reference the user chat as context
24+
var sb strings.Builder
25+
sb.WriteString("Select a scenario:\n")
26+
for i, d := range definitions {
27+
sb.WriteString(fmt.Sprintf("%d. %s\n", i+1, d.Name))
28+
}
29+
return sb.String()
30+
}

ai/pkg/parse/flows/parser.go

+4
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ func (p *Parser) Parse(flows []*flowpb.Flow) {
3434
}
3535

3636
func (p *Parser) addFlow(f *flowpb.Flow) error {
37+
if f == nil {
38+
return nil
39+
}
40+
3741
src := f.GetSource()
3842
dst := f.GetDestination()
3943
if src == nil || dst == nil {

ai/pkg/retrieval/flows/client/client.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func New() (*Client, error) {
3131
tlsDialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
3232

3333
// FIXME make address part of a config
34-
addr := ":5555"
34+
addr := ":5557"
3535
connection, err := grpc.NewClient(addr, tlsDialOption, connectDialOption)
3636
if err != nil {
3737
return nil, fmt.Errorf("failed to dial %s: %w", addr, err)

ai/pkg/retrieval/flows/retriever.go

+12-5
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,13 @@ func (r *Retriever) Observe(ctx context.Context, req *observerpb.GetFlowsRequest
8383
defer portForwardCancel()
8484

8585
// FIXME make ports part of a config
86-
cmd := exec.CommandContext(portForwardCtx, "kubectl", "port-forward", "-n", "kube-system", "svc/hubble-relay", "5555:80")
86+
cmd := exec.CommandContext(portForwardCtx, "kubectl", "port-forward", "-n", "kube-system", "svc/hubble-relay", "5557:80")
8787
if err := cmd.Start(); err != nil {
8888
return nil, fmt.Errorf("failed to start port-forward. %v", err)
8989
}
9090

9191
// observe flows
92-
observeCtx, observeCancel := context.WithTimeout(ctx, 30*time.Second)
92+
observeCtx, observeCancel := context.WithTimeout(ctx, 15*time.Second)
9393
defer observeCancel()
9494

9595
maxFlows := req.Number
@@ -120,18 +120,25 @@ func (r *Retriever) observeFlowsGRPC(ctx context.Context, req *observerpb.GetFlo
120120
}
121121

122122
r.flows = make([]*flowpb.Flow, 0)
123+
var errReceiving error
123124
for {
124125
select {
125126
case <-ctx.Done():
126127
r.log.Info("context cancelled")
127128
return r.flows, nil
128129
default:
130+
if errReceiving != nil {
131+
// error receiving and context not done
132+
// TODO handle error instead of returning error
133+
return nil, fmt.Errorf("failed to receive flow. %v", err)
134+
}
135+
129136
r.log.WithField("flowCount", len(r.flows)).Debug("processing flow")
130137

131138
getFlowResponse, err := stream.Recv()
132139
if err != nil {
133-
// TODO handle error instead of returning error
134-
return nil, fmt.Errorf("failed to receive flow. %v", err)
140+
errReceiving = err
141+
continue
135142
}
136143

137144
f := getFlowResponse.GetFlow()
@@ -150,7 +157,7 @@ func (r *Retriever) observeFlowsGRPC(ctx context.Context, req *observerpb.GetFlo
150157
// handleFlow logic is inspired by a snippet from Hubble UI
151158
// https://github.com/cilium/hubble-ui/blob/a06e19ba65299c63a58034a360aeedde9266ec01/backend/internal/flow_stream/flow_stream.go#L360-L395
152159
func (r *Retriever) handleFlow(f *flowpb.Flow) {
153-
if f.GetL4() == nil || f.GetSource() == nil || f.GetDestination() == nil {
160+
if (f.GetL7() == nil && f.GetL4() == nil) || f.GetSource() == nil || f.GetDestination() == nil {
154161
return
155162
}
156163

ai/pkg/scenarios/common.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ var (
2727
Name: "namespace2",
2828
DataType: "string",
2929
Description: "Namespace 2",
30-
Optional: false,
30+
Optional: true,
3131
Regex: k8sNameRegex,
3232
}
3333

0 commit comments

Comments
 (0)