From a94c64282bc8dfa6d81478431ff230291011c61f Mon Sep 17 00:00:00 2001 From: Hunter Gregory <42728408+huntergregory@users.noreply.github.com> Date: Thu, 18 Jul 2024 12:40:24 -0700 Subject: [PATCH 1/8] feat: automated data retrieval and analysis Signed-off-by: Hunter Gregory <42728408+huntergregory@users.noreply.github.com> --- .gitignore | 2 +- ai/go.mod | 61 +++++++ ai/go.sum | 173 ++++++++++++++++++++ ai/main.go | 65 ++++++++ ai/pkg/analysis/flows/analyzer.go | 26 +++ ai/pkg/analysis/flows/parser.go | 90 ++++++++++ ai/pkg/analysis/flows/prompt.go | 13 ++ ai/pkg/analysis/flows/types.go | 30 ++++ ai/pkg/lm/azure-openai.go | 99 +++++++++++ ai/pkg/lm/echo.go | 23 +++ ai/pkg/lm/model.go | 14 ++ ai/pkg/retrieval/flows/client/client.go | 44 +++++ ai/pkg/retrieval/flows/retriever.go | 165 +++++++++++++++++++ ai/pkg/scenarios/flows/handler.go | 87 ++++++++++ ai/test/integration/lm/azure-openai_test.go | 30 ++++ 15 files changed, 921 insertions(+), 1 deletion(-) create mode 100644 ai/go.mod create mode 100644 ai/go.sum create mode 100644 ai/main.go create mode 100644 ai/pkg/analysis/flows/analyzer.go create mode 100644 ai/pkg/analysis/flows/parser.go create mode 100644 ai/pkg/analysis/flows/prompt.go create mode 100644 ai/pkg/analysis/flows/types.go create mode 100644 ai/pkg/lm/azure-openai.go create mode 100644 ai/pkg/lm/echo.go create mode 100644 ai/pkg/lm/model.go create mode 100644 ai/pkg/retrieval/flows/client/client.go create mode 100644 ai/pkg/retrieval/flows/retriever.go create mode 100644 ai/pkg/scenarios/flows/handler.go create mode 100644 ai/test/integration/lm/azure-openai_test.go diff --git a/.gitignore b/.gitignore index 27e2f7832c..630958033a 100644 --- a/.gitignore +++ b/.gitignore @@ -15,7 +15,7 @@ *.out # Dependency directories (remove the comment below to include it) -# vendor/ +vendor/ # Go workspace file go.work diff --git a/ai/go.mod b/ai/go.mod new file mode 100644 index 0000000000..e8637780d5 --- /dev/null +++ b/ai/go.mod @@ -0,0 +1,61 @@ +module github.com/microsoft/retina/ai + +go 1.22.3 + +require ( + github.com/Azure/azure-sdk-for-go/sdk/ai/azopenai v0.6.0 + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.13.0 + github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 + github.com/cilium/cilium v1.15.7 + github.com/cilium/hubble-ui/backend v0.0.0-20240603143312-a06e19ba6529 + github.com/sirupsen/logrus v1.9.3 + google.golang.org/grpc v1.65.0 + k8s.io/client-go v0.30.3 +) + +require ( + github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect + github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/emicklei/go-restful/v3 v3.12.1 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-openapi/jsonpointer v0.21.0 // indirect + github.com/go-openapi/jsonreference v0.21.0 // indirect + github.com/go-openapi/swag v0.23.0 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang-jwt/jwt/v5 v5.2.1 // indirect + github.com/golang/protobuf v1.5.4 // indirect + github.com/google/gnostic-models v0.6.8 // indirect + github.com/google/gofuzz v1.2.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/imdario/mergo v0.3.16 // indirect + github.com/josharian/intern v1.0.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect + github.com/mailru/easyjson v0.7.7 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect + github.com/spf13/pflag v1.0.6-0.20210604193023-d5e0c0615ace // indirect + golang.org/x/crypto v0.25.0 // indirect + golang.org/x/net v0.27.0 // indirect + golang.org/x/oauth2 v0.20.0 // indirect + golang.org/x/sys v0.22.0 // indirect + golang.org/x/term v0.22.0 // indirect + golang.org/x/text v0.16.0 // indirect + golang.org/x/time v0.5.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect + google.golang.org/protobuf v1.34.1 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/api v0.30.3 // indirect + k8s.io/apimachinery v0.30.3 // indirect + k8s.io/klog/v2 v2.120.1 // indirect + k8s.io/kube-openapi v0.0.0-20240521193020-835d969ad83a // indirect + k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0 // indirect + sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect + sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect + sigs.k8s.io/yaml v1.4.0 // indirect +) diff --git a/ai/go.sum b/ai/go.sum new file mode 100644 index 0000000000..0565622d6b --- /dev/null +++ b/ai/go.sum @@ -0,0 +1,173 @@ +github.com/Azure/azure-sdk-for-go/sdk/ai/azopenai v0.6.0 h1:FQOmDxJj1If0D0khZR00MDa2Eb+k9BBsSaK7cEbLwkk= +github.com/Azure/azure-sdk-for-go/sdk/ai/azopenai v0.6.0/go.mod h1:X0+PSrHOZdTjkiEhgv53HS5gplbzVVl2jd6hQRYSS3c= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.13.0 h1:GJHeeA2N7xrG3q30L2UXDyuWRzDM900/65j70wcM4Ww= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.13.0/go.mod h1:l38EPgmsp71HHLq9j7De57JcKOWPyhrsW1Awm1JS6K0= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 h1:tfLQ34V6F7tVSwoTf/4lH5sE0o6eCJuNDTmH09nDpbc= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0/go.mod h1:9kIvujWAA58nmPmWB1m23fyWic1kYZMxD9CxaWn4Qpg= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xPBn1663uRv2t2q/ESv9seY= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= +github.com/cilium/cilium v1.15.7 h1:7LwGfAW/fR/VFcm6zlESjE2Ut5vJWe+kdWq3RNJrNRc= +github.com/cilium/cilium v1.15.7/go.mod h1:6Ml8eeyWjMJKDeadutWhn5NibMps0H+yLOgfKBoHTUs= +github.com/cilium/hubble-ui/backend v0.0.0-20240603143312-a06e19ba6529 h1:Ilb4HpEErh/A48torRF6lnYVRD18hF6yt4aKuH/DZFo= +github.com/cilium/hubble-ui/backend v0.0.0-20240603143312-a06e19ba6529/go.mod h1:rOV4LLOdYuQK/XPYZyUuDeUgeadVLx/LqmGvYmPWDiY= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/emicklei/go-restful/v3 v3.12.1 h1:PJMDIM/ak7btuL8Ex0iYET9hxM3CI2sjZtzpL63nKAU= +github.com/emicklei/go-restful/v3 v3.12.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ= +github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY= +github.com/go-openapi/jsonreference v0.21.0 h1:Rs+Y7hSXT83Jacb7kFyjn4ijOuVGSvOdF2+tg1TRrwQ= +github.com/go-openapi/jsonreference v0.21.0/go.mod h1:LmZmgsrTkVg9LG4EaHeY8cBDslNPMo06cago5JNLkm4= +github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+GrE= +github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ= +github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= +github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= +github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= +github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg= +github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= +github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= +github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo/v2 v2.17.2 h1:7eMhcy3GimbsA3hEnVKdw/PQM9XN9krpKVXsZdph0/g= +github.com/onsi/ginkgo/v2 v2.17.2/go.mod h1:nP2DPOQoNsQmsVyv5rDA8JkXQoCs6goXIvr/PRJ1eCc= +github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= +github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/spf13/pflag v1.0.6-0.20210604193023-d5e0c0615ace h1:9PNP1jnUjRhfmGMlkXHjYPishpcw4jpSt/V/xYY3FMA= +github.com/spf13/pflag v1.0.6-0.20210604193023-d5e0c0615ace/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= +golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= +golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/oauth2 v0.20.0 h1:4mQdhULixXKP1rwYBW0vAijoXnkTG0BLCDRzfe1idMo= +golang.org/x/oauth2 v0.20.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.22.0 h1:BbsgPEJULsl2fV/AT3v15Mjva5yXKQDyKf+TbDz7QJk= +golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 h1:Zy9XzmMEflZ/MAaA7vNcoebnRAld7FsPW1EeBB7V0m8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= +google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= +google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +k8s.io/api v0.30.3 h1:ImHwK9DCsPA9uoU3rVh4QHAHHK5dTSv1nxJUapx8hoQ= +k8s.io/api v0.30.3/go.mod h1:GPc8jlzoe5JG3pb0KJCSLX5oAFIW3/qNJITlDj8BH04= +k8s.io/apimachinery v0.30.3 h1:q1laaWCmrszyQuSQCfNB8cFgCuDAoPszKY4ucAjDwHc= +k8s.io/apimachinery v0.30.3/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc= +k8s.io/client-go v0.30.3 h1:bHrJu3xQZNXIi8/MoxYtZBBWQQXwy16zqJwloXXfD3k= +k8s.io/client-go v0.30.3/go.mod h1:8d4pf8vYu665/kUbsxWAQ/JDBNWqfFeZnvFiVdmx89U= +k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw= +k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= +k8s.io/kube-openapi v0.0.0-20240521193020-835d969ad83a h1:zD1uj3Jf+mD4zmA7W+goE5TxDkI7OGJjBNBzq5fJtLA= +k8s.io/kube-openapi v0.0.0-20240521193020-835d969ad83a/go.mod h1:UxDHUPsUwTOOxSU+oXURfFBcAS6JwiRXTYqYwfuGowc= +k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0 h1:jgGTlFYnhF1PM1Ax/lAlxUPE+KfCIXHaathvJg1C3ak= +k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= +sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= +sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= +sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= +sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= +sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= diff --git a/ai/main.go b/ai/main.go new file mode 100644 index 0000000000..e6a02114e2 --- /dev/null +++ b/ai/main.go @@ -0,0 +1,65 @@ +package main + +import ( + "context" + + "github.com/microsoft/retina/ai/pkg/lm" + flowscenario "github.com/microsoft/retina/ai/pkg/scenarios/flows" + + "github.com/sirupsen/logrus" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +const kubeconfigPath = "C:\\Users\\hgregory\\.kube\\config" // "/home/hunter/.kube/config" + +func main() { + log := logrus.New() + log.SetLevel(logrus.DebugLevel) + + log.Info("starting app...") + + // retrieve configs + config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) + if err != nil { + log.WithError(err).Fatal("failed to get kubeconfig") + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + log.WithError(err).Fatal("failed to create clientset") + } + log.Info("retrieved kubeconfig and clientset") + + // configure LM (language model) + // model := lm.NewEchoModel() + model, err := lm.NewAzureOpenAI() + if err != nil { + log.WithError(err).Fatal("failed to create Azure OpenAI model") + } + log.Info("initialized Azure OpenAI model") + + handleChat(log, config, clientset, model) +} + +// pretend there's input from chat interface +func handleChat(log logrus.FieldLogger, config *rest.Config, clientset *kubernetes.Clientset, model lm.Model) { + question := "what's wrong with my app?" + var chat lm.ChatHistory + + h := flowscenario.NewHandler(log, config, clientset, model) + params := &flowscenario.ScenarioParams{ + Scenario: flowscenario.AnyScenario, + Namespace1: "frontend", + Namespace2: "backend", + } + + ctx := context.TODO() + response, err := h.Handle(ctx, question, chat, params) + if err != nil { + log.WithError(err).Fatal("error running flows scenario") + } + + _ = response +} diff --git a/ai/pkg/analysis/flows/analyzer.go b/ai/pkg/analysis/flows/analyzer.go new file mode 100644 index 0000000000..7a30ab2450 --- /dev/null +++ b/ai/pkg/analysis/flows/analyzer.go @@ -0,0 +1,26 @@ +package flows + +import ( + "context" + "fmt" + + "github.com/microsoft/retina/ai/pkg/lm" + "github.com/sirupsen/logrus" +) + +type Analyzer struct { + log logrus.FieldLogger + model lm.Model +} + +func NewAnalyzer(log logrus.FieldLogger, model lm.Model) *Analyzer { + return &Analyzer{ + log: logrus.WithField("component", "flow-analyzer"), + model: model, + } +} + +func (a *Analyzer) Analyze(ctx context.Context, query string, chat lm.ChatHistory, summary FlowSummary) (string, error) { + message := fmt.Sprintf(messagePromptTemplate, query, summary.FormatForLM()) + return a.model.Generate(ctx, systemPrompt, chat, message) +} diff --git a/ai/pkg/analysis/flows/parser.go b/ai/pkg/analysis/flows/parser.go new file mode 100644 index 0000000000..6c22498fcc --- /dev/null +++ b/ai/pkg/analysis/flows/parser.go @@ -0,0 +1,90 @@ +package flows + +import ( + "fmt" + "sort" + + flowpb "github.com/cilium/cilium/api/v1/flow" + "github.com/sirupsen/logrus" +) + +type Parser struct { + log logrus.FieldLogger + summary FlowSummary +} + +func NewParser(log logrus.FieldLogger) *Parser { + return &Parser{ + log: log.WithField("component", "flow-parser"), + summary: make(map[string]*Connection), + } +} + +func (p *Parser) Summary() FlowSummary { + return p.summary +} + +func (p *Parser) Parse(flows []*flowpb.Flow) { + for _, flow := range flows { + err := p.addFlow(flow) + if err != nil { + p.log.WithError(err).WithField("flow", flow).Error("failed to add flow") + } + } +} + +func (p *Parser) addFlow(f *flowpb.Flow) error { + src := f.GetSource() + dst := f.GetDestination() + if src == nil || dst == nil { + // empty flow. ignore + return nil + } + + srcName, err := endpointName(src) + if err != nil { + return fmt.Errorf("error getting source name: %w", err) + } + + dstName, err := endpointName(dst) + if err != nil { + return fmt.Errorf("error getting destination name: %w", err) + } + + // Ensure pod1 is alphabetically before pod2 + pods := []string{srcName, dstName} + sort.Strings(pods) + pod1, pod2 := pods[0], pods[1] + key := pod1 + "#" + pod2 + + conn, exists := p.summary[key] + if !exists { + conn = &Connection{ + Pod1: pod1, + Pod2: pod2, + Key: key, + Flows: []*flowpb.Flow{}, + } + p.summary[key] = conn + } + + conn.Flows = append(conn.Flows, f) + return nil +} + +func endpointName(ep *flowpb.Endpoint) (string, error) { + name := ep.GetPodName() + if name != "" { + return name, nil + } + + lbls := ep.GetLabels() + if len(lbls) == 0 { + return "", ErrNoEndpointName + } + // should be a reserved label like: + // reserved:world + // reserved:host + // reserved:kube-apiserver + return ep.GetLabels()[0], nil +} diff --git a/ai/pkg/analysis/flows/prompt.go b/ai/pkg/analysis/flows/prompt.go new file mode 100644 index 0000000000..cd265a875f --- /dev/null +++ b/ai/pkg/analysis/flows/prompt.go @@ -0,0 +1,13 @@ +package flows + +const ( + systemPrompt = `You are an assistant with expertise in Kubernetes Networking. The user is debugging networking issues on their Pods and/or Nodes. Provide a succinct summary identifying any issues in the "summary of network flow logs" provided by the user.` + + // first parameter is the user's question + // second parameter is the user's network flow logs + messagePromptTemplate = `%s + + "summary of network flow logs": +%s +` +) diff --git a/ai/pkg/analysis/flows/types.go b/ai/pkg/analysis/flows/types.go new file mode 100644 index 0000000000..da837b8026 --- /dev/null +++ b/ai/pkg/analysis/flows/types.go @@ -0,0 +1,30 @@ +package flows + +import ( + "errors" + "fmt" + "strings" + + flowpb "github.com/cilium/cilium/api/v1/flow" +) + +var ErrNoEndpointName = errors.New("no endpoint name") + +type Connection struct { + Pod1 string + Pod2 string + Key string + Flows []*flowpb.Flow +} + +type FlowSummary map[string]*Connection + +func (fs FlowSummary) FormatForLM() string { + connStrings := make([]string, 0, len(fs)) + for _, conn := range fs { + connString := fmt.Sprintf("Connection: %s -> %s, Number of Flows: %d", conn.Pod1, conn.Pod2, len(conn.Flows)) + connStrings = append(connStrings, connString) + } + + return strings.Join(connStrings, "\n") +} diff --git a/ai/pkg/lm/azure-openai.go b/ai/pkg/lm/azure-openai.go new file mode 100644 index 0000000000..5800b287cf --- /dev/null +++ b/ai/pkg/lm/azure-openai.go @@ -0,0 +1,99 @@ +package lm + +import ( + "context" + "fmt" + "os" + "regexp" + + "github.com/Azure/azure-sdk-for-go/sdk/ai/azopenai" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/azidentity" +) + +const ( + endpointPattern = `^https://[a-zA-Z0-9-]+\.openai\.azure\.com/?$` + deploymentPattern = `^[a-zA-Z0-9-]+$` +) + +var ( + endpointRegex = regexp.MustCompile(endpointPattern) + deploymentRegex = regexp.MustCompile(deploymentPattern) + ErrNoCompletions = fmt.Errorf("no completions returned") + ErrNoMessage = fmt.Errorf("no message included in completion") +) + +type AzureOpenAI struct { + modelDeployment string + client *azopenai.Client +} + +// $env:AOAI_COMPLETIONS_ENDPOINT = Read-Host 'Enter AOAI_COMPLETIONS_ENDPOINT' +// $env:AOAI_DEPLOYMENT_NAME = Read-Host 'Enter AOAI_DEPLOYMENT_NAME' + +func NewAzureOpenAI() (*AzureOpenAI, error) { + aoai := &AzureOpenAI{} + + // Ex: "https://.openai.azure.com" + azureOpenAIEndpoint := os.Getenv("AOAI_COMPLETIONS_ENDPOINT") + if !endpointRegex.MatchString(azureOpenAIEndpoint) { + return nil, fmt.Errorf("invalid Azure OpenAI endpoint. must follow pattern: %s", endpointPattern) + } + + modelDeployment := os.Getenv("AOAI_DEPLOYMENT_NAME") + if !deploymentRegex.MatchString(modelDeployment) { + return nil, fmt.Errorf("invalid Azure OpenAI deployment name. must follow pattern: %s", deploymentPattern) + } + aoai.modelDeployment = modelDeployment + + cred, err := azidentity.NewDefaultAzureCredential(nil) + if err != nil { + return nil, fmt.Errorf("failed to get Azure credentials: %w", err) + } + + // NOTE: this constructor creates a client that connects to an Azure OpenAI endpoint. + // To connect to the public OpenAI endpoint, use azopenai.NewClientForOpenAI (requires an OpenAI API key). + client, err := azopenai.NewClient(azureOpenAIEndpoint, cred, nil) + if err != nil { + return nil, fmt.Errorf("failed to create Azure OpenAI client: %w", err) + } + aoai.client = client + + return aoai, nil +} + +func (m *AzureOpenAI) Generate(ctx context.Context, systemPrompt string, chat ChatHistory, message string) (string, error) { + messages := []azopenai.ChatRequestMessageClassification{ + &azopenai.ChatRequestSystemMessage{Content: to.Ptr(systemPrompt)}, + } + for _, pair := range chat { + messages = append(messages, &azopenai.ChatRequestUserMessage{Content: azopenai.NewChatRequestUserMessageContent(pair.User)}) + messages = append(messages, &azopenai.ChatRequestAssistantMessage{Content: to.Ptr(pair.Assistant)}) + } + messages = append(messages, &azopenai.ChatRequestUserMessage{Content: azopenai.NewChatRequestUserMessageContent(message)}) + + chatOptions := azopenai.ChatCompletionsOptions{ + Messages: messages, + MaxTokens: to.Ptr(int32(2048)), + N: to.Ptr(int32(1)), + Temperature: to.Ptr(float32(0.0)), + DeploymentName: &m.modelDeployment, + } + resp, err := m.client.GetChatCompletions(ctx, chatOptions, nil) + + if err != nil { + return "", fmt.Errorf("failed to get completions: %w", err) + } + + if len(resp.Choices) == 0 { + return "", ErrNoCompletions + } + + choice := resp.Choices[0] + if choice.Message == nil || choice.Message.Content == nil { + return "", ErrNoMessage + } + + // TODO check ContentFilterResultsForChoice. And CompletionsFinishReason? + return *choice.Message.Content, nil +} diff --git a/ai/pkg/lm/echo.go b/ai/pkg/lm/echo.go new file mode 100644 index 0000000000..24f53b37b4 --- /dev/null +++ b/ai/pkg/lm/echo.go @@ -0,0 +1,23 @@ +package lm + +import ( + "context" + "fmt" + "strings" +) + +// EchoModel is a mock model that echoes the prompt back +type EchoModel struct{} + +func NewEchoModel() *EchoModel { + return &EchoModel{} +} + +func (m *EchoModel) Generate(ctx context.Context, systemPrompt string, chat ChatHistory, message string) (string, error) { + chatStrings := make([]string, 0, len(chat)) + for _, pair := range chat { + chatStrings = append(chatStrings, fmt.Sprintf("USER: %s\nASSISTANT: %s\n", pair.User, pair.Assistant)) + } + resp := fmt.Sprintf("systemPrompt: %s\nhistory: %s\nmessage: %s", systemPrompt, strings.Join(chatStrings, "\n"), message) + return resp, nil +} diff --git a/ai/pkg/lm/model.go b/ai/pkg/lm/model.go new file mode 100644 index 0000000000..b848e30fb7 --- /dev/null +++ b/ai/pkg/lm/model.go @@ -0,0 +1,14 @@ +package lm + +import "context" + +type MessagePair struct { + User string + Assistant string +} + +type ChatHistory []MessagePair + +type Model interface { + Generate(ctx context.Context, systemPrompt string, chat ChatHistory, message string) (string, error) +} diff --git a/ai/pkg/retrieval/flows/client/client.go b/ai/pkg/retrieval/flows/client/client.go new file mode 100644 index 0000000000..f0137dadc2 --- /dev/null +++ b/ai/pkg/retrieval/flows/client/client.go @@ -0,0 +1,44 @@ +package client + +import ( + "fmt" + "time" + + observerv1 "github.com/cilium/cilium/api/v1/observer" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" + "google.golang.org/grpc/credentials/insecure" +) + +type Client struct { + observerv1.ObserverClient +} + +func New() (*Client, error) { + // TODO rethink the dial opts + // starting with opts seen at https://github.com/cilium/hubble-ui/blob/a06e19ba65299c63a58034a360aeedde9266ec01/backend/internal/relay_client/connection_props.go#L34-L70 + connectParams := grpc.ConnectParams{ + Backoff: backoff.Config{ + BaseDelay: 1.0 * time.Second, + Multiplier: 1.6, + Jitter: 0.2, + MaxDelay: 7 * time.Second, + }, + MinConnectTimeout: 5 * time.Second, + } + connectDialOption := grpc.WithConnectParams(connectParams) + + tlsDialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + + // FIXME make address part of a config + addr := ":5555" + connection, err := grpc.NewClient(addr, tlsDialOption, connectDialOption) + if err != nil { + return nil, fmt.Errorf("failed to dial %s: %w", addr, err) + } + + client := &Client{ + ObserverClient: observerv1.NewObserverClient(connection), + } + return client, nil +} diff --git a/ai/pkg/retrieval/flows/retriever.go b/ai/pkg/retrieval/flows/retriever.go new file mode 100644 index 0000000000..31f05c8735 --- /dev/null +++ b/ai/pkg/retrieval/flows/retriever.go @@ -0,0 +1,165 @@ +package flows + +import ( + "context" + "fmt" + "os/exec" + "time" + + "github.com/microsoft/retina/ai/pkg/retrieval/flows/client" + + flowpb "github.com/cilium/cilium/api/v1/flow" + observerpb "github.com/cilium/cilium/api/v1/observer" + "github.com/cilium/hubble-ui/backend/domain/labels" + "github.com/cilium/hubble-ui/backend/domain/service" + "github.com/sirupsen/logrus" + "google.golang.org/grpc" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +type Retriever struct { + log logrus.FieldLogger + config *rest.Config + clientset *kubernetes.Clientset + initialized bool + client *client.Client + flows []*flowpb.Flow +} + +func NewRetriever(log logrus.FieldLogger, config *rest.Config, clientset *kubernetes.Clientset) *Retriever { + return &Retriever{ + log: log.WithField("component", "flow-retriever"), + config: config, + clientset: clientset, + } +} + +func (r *Retriever) Init() error { + client, err := client.New() + if err != nil { + return fmt.Errorf("failed to create grpc client. %v", err) + } + + r.log.Info("initialized grpc client") + + r.client = client + r.initialized = true + return nil +} + +func (r *Retriever) Observe(ctx context.Context, maxFlows int) ([]*flowpb.Flow, error) { + if !r.initialized { + if err := r.Init(); err != nil { + return nil, fmt.Errorf("failed to initialize. %v", err) + } + } + + // translate parameters to flow request + // TODO: use parameters + req := flowsRequest() + + // port-forward to hubble-relay + portForwardCtx, portForwardCancel := context.WithCancel(ctx) + defer portForwardCancel() + + // FIXME make ports part of a config + cmd := exec.CommandContext(portForwardCtx, "kubectl", "port-forward", "-n", "kube-system", "svc/hubble-relay", "5555:80") + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("failed to start port-forward. %v", err) + } + + // observe flows + observeCtx, observeCancel := context.WithTimeout(ctx, 30*time.Second) + defer observeCancel() + flows, err := r.observeFlowsGRPC(observeCtx, req, maxFlows) + if err != nil { + return nil, fmt.Errorf("failed to observe flows over grpc. %v", err) + } + + // stop the port-forward + portForwardCancel() + // will error with "exit status 1" because of context cancellation + _ = cmd.Wait() + r.log.Info("stopped port-forward") + + return flows, nil +} + +func flowsRequest() *observerpb.GetFlowsRequest { + return &observerpb.GetFlowsRequest{ + Number: 200, + Follow: false, + Whitelist: []*flowpb.FlowFilter{}, + Blacklist: nil, + Since: nil, + Until: nil, + First: false, + } +} + +func (r *Retriever) observeFlowsGRPC(ctx context.Context, req *observerpb.GetFlowsRequest, maxFlows int) ([]*flowpb.Flow, error) { + stream, err := r.client.GetFlows(ctx, req, grpc.WaitForReady(true)) + if err != nil { + return nil, fmt.Errorf("failed to get flow stream. %v", err) + } + + r.flows = make([]*flowpb.Flow, 0) + for { + select { + case <-ctx.Done(): + r.log.Info("context cancelled") + return r.flows, nil + default: + r.log.WithField("flowCount", len(r.flows)).Debug("processing flow") + + getFlowResponse, err := stream.Recv() + if err != nil { + // TODO handle error instead of returning error + return nil, fmt.Errorf("failed to receive flow. %v", err) + } + + f := getFlowResponse.GetFlow() + if f == nil { + continue + } + + r.handleFlow(f) + if len(r.flows) >= maxFlows { + return r.flows, nil + } + } + } +} + +// handleFlow logic is inspired by a snippet from Hubble UI +// https://github.com/cilium/hubble-ui/blob/a06e19ba65299c63a58034a360aeedde9266ec01/backend/internal/flow_stream/flow_stream.go#L360-L395 +func (r *Retriever) handleFlow(f *flowpb.Flow) { + if f.GetL4() == nil || f.GetSource() == nil || f.GetDestination() == nil { + return + } + + sourceId, destId := service.IdsFromFlowProto(f) + if sourceId == "0" || destId == "0" { + r.log.Warn("invalid (zero) identity in source / dest services") + // TODO print offending flow? + return + } + + // TODO: workaround to hide flows/services which are showing as "World", + // but actually they are k8s services without initialized pods. + // Appropriate fix is to construct and show special service map cards + // and show these flows in special way inside flows table. + if f.GetDestination() != nil { + destService := f.GetDestinationService() + destLabelsProps := labels.Props(f.GetDestination().GetLabels()) + destNames := f.GetDestinationNames() + isDestOutside := destLabelsProps.IsWorld || len(destNames) > 0 + + if destService != nil && isDestOutside { + return + } + } + + r.flows = append(r.flows, f) +} diff --git a/ai/pkg/scenarios/flows/handler.go b/ai/pkg/scenarios/flows/handler.go new file mode 100644 index 0000000000..285a1ce4fc --- /dev/null +++ b/ai/pkg/scenarios/flows/handler.go @@ -0,0 +1,87 @@ +package flows + +import ( + "context" + "fmt" + "time" + + flowanalysis "github.com/microsoft/retina/ai/pkg/analysis/flows" + "github.com/microsoft/retina/ai/pkg/lm" + flowretrieval "github.com/microsoft/retina/ai/pkg/retrieval/flows" + + "github.com/sirupsen/logrus" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +type FlowScenario string + +const ( + AnyScenario FlowScenario = "Any" + DropScenario FlowScenario = "Drops" + DnsScenario FlowScenario = "DNS" +) + +type ScenarioParams struct { + Scenario FlowScenario + + // parameters (all optional?) + DnsQuery string + Nodes []string + Namespace1 string + PodPrefix1 string + Namespace2 string + PodPrefix2 string +} + +type Handler struct { + log logrus.FieldLogger + r *flowretrieval.Retriever + p *flowanalysis.Parser + a *flowanalysis.Analyzer +} + +func NewHandler(log logrus.FieldLogger, config *rest.Config, clientset *kubernetes.Clientset, model lm.Model) *Handler { + return &Handler{ + log: log.WithField("component", "flow-handler"), + r: flowretrieval.NewRetriever(log, config, clientset), + p: flowanalysis.NewParser(log), + a: flowanalysis.NewAnalyzer(log, model), + } +} + +func (h *Handler) Handle(ctx context.Context, question string, chat lm.ChatHistory, params *ScenarioParams) (string, error) { + h.log.Info("handling flows scenario...") + + // get flows + if err := h.r.Init(); err != nil { + return "", fmt.Errorf("error initializing flow retriever: %w", err) + } + + flows, err := h.r.Observe(ctx, 50) + if err != nil { + return "", fmt.Errorf("error observing flows: %w", err) + } + h.log.Info("observed flows") + + // analyze flows + h.p.Parse(flows) + summary := h.p.Summary() + + analyzeCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + resp, err := h.a.Analyze(analyzeCtx, question, chat, summary) + if err != nil { + return "", fmt.Errorf("error analyzing flows: %w", err) + } + h.log.Info("analyzed flows") + + // temporary printing + fmt.Println("flow summary:") + fmt.Println(summary.FormatForLM()) + fmt.Println() + fmt.Println("response:") + fmt.Println(resp) + + return resp, nil +} diff --git a/ai/test/integration/lm/azure-openai_test.go b/ai/test/integration/lm/azure-openai_test.go new file mode 100644 index 0000000000..746a3526f3 --- /dev/null +++ b/ai/test/integration/lm/azure-openai_test.go @@ -0,0 +1,30 @@ +package lmtest + +import ( + "context" + "fmt" + "testing" + + "github.com/microsoft/retina/ai/pkg/lm" + "github.com/sirupsen/logrus" +) + +func TestAzureOpenAICompletion(t *testing.T) { + log := logrus.New() + + // configure LM (language model) + // model := lm.NewEchoModel() + model, err := lm.NewAzureOpenAI() + if err != nil { + log.WithError(err).Fatal("failed to create Azure OpenAI model") + } + log.Info("initialized Azure OpenAI model") + + resp, err := model.Generate(context.TODO(), `You are an assistant with expertise in Kubernetes Networking. The user is debugging networking issues on their Pods and/or Nodes. Provide a succinct summary identifying any issues in the "summary of network flow logs" provided by the user.`, nil, `summary of network flow logs: + abc`) + if err != nil { + log.WithError(err).Fatal("error calling llm") + } + log.Info("called llm") + fmt.Println(resp) +} From 50432737bc263c049819944afa5032ad1b18e55f Mon Sep 17 00:00:00 2001 From: Hunter Gregory <42728408+huntergregory@users.noreply.github.com> Date: Fri, 19 Jul 2024 10:30:10 -0700 Subject: [PATCH 2/8] docs: README for usage Signed-off-by: Hunter Gregory <42728408+huntergregory@users.noreply.github.com> --- ai/README.md | 21 +++++++++++++++++++++ ai/main.go | 4 +++- ai/pkg/lm/azure-openai.go | 9 ++++++--- 3 files changed, 30 insertions(+), 4 deletions(-) create mode 100644 ai/README.md diff --git a/ai/README.md b/ai/README.md new file mode 100644 index 0000000000..ccd4401080 --- /dev/null +++ b/ai/README.md @@ -0,0 +1,21 @@ +# Retina AI + +## Usage + +- Change into this *ai/* folder. +- `go mod tidy ; go mod vendor` +- Set your `kubeconfigPath` in *main.go*. +- If using Azure OpenAI: + - Make sure you're logged into your account/subscription in your terminal. + - Specify environment variables for Deployment name and Endpoint URL. Get deployment from e.g. [https://oai.azure.com/portal/deployment](https://oai.azure.com/portal/deployment) and Endpoint from e.g. Deployment > Playground > Code. + - Linux: + - `read -p "Enter AOAI_COMPLETIONS_ENDPOINT: " AOAI_COMPLETIONS_ENDPOINT && export AOAI_COMPLETIONS_ENDPOINT=$AOAI_COMPLETIONS_ENDPOINT` + - `read -p "Enter AOAI_DEPLOYMENT_NAME: " AOAI_DEPLOYMENT_NAME && export AOAI_DEPLOYMENT_NAME=$AOAI_DEPLOYMENT_NAME` + - Windows: + - `$env:AOAI_COMPLETIONS_ENDPOINT = Read-Host 'Enter AOAI_COMPLETIONS_ENDPOINT'` + - `$env:AOAI_DEPLOYMENT_NAME = Read-Host 'Enter AOAI_DEPLOYMENT_NAME'` +- `go run main.go` + +## Development + +Modify prompts in the folders within *pkg/analysis/* (e.g. *pkg/analysis/flows/prompt.go* or *analyzer.go*) diff --git a/ai/main.go b/ai/main.go index e6a02114e2..7bece13705 100644 --- a/ai/main.go +++ b/ai/main.go @@ -12,7 +12,9 @@ import ( "k8s.io/client-go/tools/clientcmd" ) -const kubeconfigPath = "C:\\Users\\hgregory\\.kube\\config" // "/home/hunter/.kube/config" +const kubeconfigPath = "/home/hunter/.kube/config" + +// const kubeconfigPath = "C:\\Users\\hgregory\\.kube\\config" func main() { log := logrus.New() diff --git a/ai/pkg/lm/azure-openai.go b/ai/pkg/lm/azure-openai.go index 5800b287cf..0eb43e5186 100644 --- a/ai/pkg/lm/azure-openai.go +++ b/ai/pkg/lm/azure-openai.go @@ -28,19 +28,22 @@ type AzureOpenAI struct { client *azopenai.Client } -// $env:AOAI_COMPLETIONS_ENDPOINT = Read-Host 'Enter AOAI_COMPLETIONS_ENDPOINT' -// $env:AOAI_DEPLOYMENT_NAME = Read-Host 'Enter AOAI_DEPLOYMENT_NAME' - func NewAzureOpenAI() (*AzureOpenAI, error) { aoai := &AzureOpenAI{} // Ex: "https://.openai.azure.com" azureOpenAIEndpoint := os.Getenv("AOAI_COMPLETIONS_ENDPOINT") + if azureOpenAIEndpoint == "" { + return nil, fmt.Errorf("set endpoint with environment variable AOAI_COMPLETIONS_ENDPOINT") + } if !endpointRegex.MatchString(azureOpenAIEndpoint) { return nil, fmt.Errorf("invalid Azure OpenAI endpoint. must follow pattern: %s", endpointPattern) } modelDeployment := os.Getenv("AOAI_DEPLOYMENT_NAME") + if modelDeployment == "" { + return nil, fmt.Errorf("set model deployment name with environment variable AOAI_DEPLOYMENT_NAME") + } if !deploymentRegex.MatchString(modelDeployment) { return nil, fmt.Errorf("invalid Azure OpenAI deployment name. must follow pattern: %s", deploymentPattern) } From c8528c8841765b671c1ea307bbefb3f6e48c1b63 Mon Sep 17 00:00:00 2001 From: Hunter Gregory <42728408+huntergregory@users.noreply.github.com> Date: Fri, 19 Jul 2024 10:34:25 -0700 Subject: [PATCH 3/8] flow summary shows drops/forwards and increase flow limit Signed-off-by: Hunter Gregory <42728408+huntergregory@users.noreply.github.com> --- ai/pkg/analysis/flows/types.go | 26 ++++++++++++++++++++++++++ ai/pkg/retrieval/flows/retriever.go | 3 ++- ai/pkg/scenarios/flows/handler.go | 3 ++- 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/ai/pkg/analysis/flows/types.go b/ai/pkg/analysis/flows/types.go index da837b8026..4ffffcc218 100644 --- a/ai/pkg/analysis/flows/types.go +++ b/ai/pkg/analysis/flows/types.go @@ -20,11 +20,37 @@ type Connection struct { type FlowSummary map[string]*Connection func (fs FlowSummary) FormatForLM() string { + // FIXME hacky right now + forwards := fs.connStrings(flowpb.Verdict_FORWARDED) + drops := fs.connStrings(flowpb.Verdict_DROPPED) + other := fs.connStrings(flowpb.Verdict_VERDICT_UNKNOWN) + + return fmt.Sprintf("SUCCESSFUL CONNECTIONS:\n%s\n\nDROPPED CONNECTIONS:\n%s\n\nOTHER CONNECTIONS:\n%s", forwards, drops, other) +} + +func (fs FlowSummary) connStrings(verdict flowpb.Verdict) string { connStrings := make([]string, 0, len(fs)) for _, conn := range fs { + match := false + for _, f := range conn.Flows { + // FIXME hacky right now + if f.GetVerdict() == verdict || (verdict == flowpb.Verdict_VERDICT_UNKNOWN && f.GetVerdict() != flowpb.Verdict_FORWARDED && f.GetVerdict() != flowpb.Verdict_DROPPED) { + match = true + break + } + } + + if !match { + continue + } + connString := fmt.Sprintf("Connection: %s -> %s, Number of Flows: %d", conn.Pod1, conn.Pod2, len(conn.Flows)) connStrings = append(connStrings, connString) } + if len(connStrings) == 0 { + return "none" + } + return strings.Join(connStrings, "\n") } diff --git a/ai/pkg/retrieval/flows/retriever.go b/ai/pkg/retrieval/flows/retriever.go index 31f05c8735..e22ff14747 100644 --- a/ai/pkg/retrieval/flows/retriever.go +++ b/ai/pkg/retrieval/flows/retriever.go @@ -7,6 +7,7 @@ import ( "time" "github.com/microsoft/retina/ai/pkg/retrieval/flows/client" + "github.com/microsoft/retina/ai/pkg/util" flowpb "github.com/cilium/cilium/api/v1/flow" observerpb "github.com/cilium/cilium/api/v1/observer" @@ -88,7 +89,7 @@ func (r *Retriever) Observe(ctx context.Context, maxFlows int) ([]*flowpb.Flow, func flowsRequest() *observerpb.GetFlowsRequest { return &observerpb.GetFlowsRequest{ - Number: 200, + Number: util.MaxFlowsFromHubbleRelay, Follow: false, Whitelist: []*flowpb.FlowFilter{}, Blacklist: nil, diff --git a/ai/pkg/scenarios/flows/handler.go b/ai/pkg/scenarios/flows/handler.go index 285a1ce4fc..8c182115ad 100644 --- a/ai/pkg/scenarios/flows/handler.go +++ b/ai/pkg/scenarios/flows/handler.go @@ -8,6 +8,7 @@ import ( flowanalysis "github.com/microsoft/retina/ai/pkg/analysis/flows" "github.com/microsoft/retina/ai/pkg/lm" flowretrieval "github.com/microsoft/retina/ai/pkg/retrieval/flows" + "github.com/microsoft/retina/ai/pkg/util" "github.com/sirupsen/logrus" "k8s.io/client-go/kubernetes" @@ -58,7 +59,7 @@ func (h *Handler) Handle(ctx context.Context, question string, chat lm.ChatHisto return "", fmt.Errorf("error initializing flow retriever: %w", err) } - flows, err := h.r.Observe(ctx, 50) + flows, err := h.r.Observe(ctx, util.MaxFlowsToAnalyze) if err != nil { return "", fmt.Errorf("error observing flows: %w", err) } From 52b096085af4fc0c621eb309d6d19cae1a68fa21 Mon Sep 17 00:00:00 2001 From: Hunter Gregory <42728408+huntergregory@users.noreply.github.com> Date: Thu, 25 Jul 2024 11:39:50 -0700 Subject: [PATCH 4/8] fix: formatting Signed-off-by: Hunter Gregory <42728408+huntergregory@users.noreply.github.com> --- ai/pkg/analysis/flows/parser.go | 2 +- ai/pkg/analysis/flows/prompt.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ai/pkg/analysis/flows/parser.go b/ai/pkg/analysis/flows/parser.go index 6c22498fcc..2862e5a0c6 100644 --- a/ai/pkg/analysis/flows/parser.go +++ b/ai/pkg/analysis/flows/parser.go @@ -86,5 +86,5 @@ func endpointName(ep *flowpb.Endpoint) (string, error) { // reserved:world // reserved:host // reserved:kube-apiserver - return ep.GetLabels()[0], nil + return lbls[0], nil } diff --git a/ai/pkg/analysis/flows/prompt.go b/ai/pkg/analysis/flows/prompt.go index cd265a875f..fac5113207 100644 --- a/ai/pkg/analysis/flows/prompt.go +++ b/ai/pkg/analysis/flows/prompt.go @@ -7,7 +7,7 @@ const ( // second parameter is the user's network flow logs messagePromptTemplate = `%s - "summary of network flow logs": +"summary of network flow logs": %s ` ) From d2e27d55943882b4dcd626e451b45e1c455fdd73 Mon Sep 17 00:00:00 2001 From: Hunter Gregory <42728408+huntergregory@users.noreply.github.com> Date: Thu, 25 Jul 2024 11:41:11 -0700 Subject: [PATCH 5/8] feat: hack for tracking rst and synack Signed-off-by: Hunter Gregory <42728408+huntergregory@users.noreply.github.com> --- ai/pkg/analysis/flows/types.go | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/ai/pkg/analysis/flows/types.go b/ai/pkg/analysis/flows/types.go index 4ffffcc218..3fa5a4f66b 100644 --- a/ai/pkg/analysis/flows/types.go +++ b/ai/pkg/analysis/flows/types.go @@ -44,7 +44,29 @@ func (fs FlowSummary) connStrings(verdict flowpb.Verdict) string { continue } - connString := fmt.Sprintf("Connection: %s -> %s, Number of Flows: %d", conn.Pod1, conn.Pod2, len(conn.Flows)) + connString := "" + if verdict == flowpb.Verdict_FORWARDED && conn.Flows[0].L4.GetTCP() != nil { + successful := false + rst := false + for _, f := range conn.Flows { + if f.GetVerdict() == flowpb.Verdict_FORWARDED && f.L4.GetTCP().GetFlags().GetSYN() && f.L4.GetTCP().GetFlags().GetACK() { + successful = true + continue + } + + if f.GetVerdict() == flowpb.Verdict_FORWARDED && f.L4.GetTCP().GetFlags().GetRST() { + rst = true + continue + } + } + _ = successful + connString = fmt.Sprintf("Connection: %s -> %s, Number of Flows: %d. Was Reset: %v", conn.Pod1, conn.Pod2, len(conn.Flows), rst) + + } else { + + connString = fmt.Sprintf("Connection: %s -> %s, Number of Flows: %d", conn.Pod1, conn.Pod2, len(conn.Flows)) + } + connStrings = append(connStrings, connString) } From ed24f3a84335ef71ddc8fb796d8bbfc99a6aa7d2 Mon Sep 17 00:00:00 2001 From: Hunter Gregory <42728408+huntergregory@users.noreply.github.com> Date: Thu, 25 Jul 2024 11:44:08 -0700 Subject: [PATCH 6/8] feat: use scenario params and option to parse flows file Signed-off-by: Hunter Gregory <42728408+huntergregory@users.noreply.github.com> --- ai/main.go | 9 ++- ai/pkg/retrieval/flows/retriever.go | 101 ++++++++++++++++++------- ai/pkg/scenarios/flows/handler.go | 110 +++++++++++++++++++++++++++- 3 files changed, 190 insertions(+), 30 deletions(-) diff --git a/ai/main.go b/ai/main.go index 7bece13705..d058fb19a0 100644 --- a/ai/main.go +++ b/ai/main.go @@ -18,7 +18,7 @@ const kubeconfigPath = "/home/hunter/.kube/config" func main() { log := logrus.New() - log.SetLevel(logrus.DebugLevel) + // log.SetLevel(logrus.DebugLevel) log.Info("starting app...") @@ -36,6 +36,7 @@ func main() { // configure LM (language model) // model := lm.NewEchoModel() + // log.Info("initialized echo model") model, err := lm.NewAzureOpenAI() if err != nil { log.WithError(err).Fatal("failed to create Azure OpenAI model") @@ -52,9 +53,9 @@ func handleChat(log logrus.FieldLogger, config *rest.Config, clientset *kubernet h := flowscenario.NewHandler(log, config, clientset, model) params := &flowscenario.ScenarioParams{ - Scenario: flowscenario.AnyScenario, - Namespace1: "frontend", - Namespace2: "backend", + Scenario: flowscenario.DropScenario, + Namespace1: "default", + Namespace2: "default", } ctx := context.TODO() diff --git a/ai/pkg/retrieval/flows/retriever.go b/ai/pkg/retrieval/flows/retriever.go index e22ff14747..aeb0922c0b 100644 --- a/ai/pkg/retrieval/flows/retriever.go +++ b/ai/pkg/retrieval/flows/retriever.go @@ -2,12 +2,13 @@ package flows import ( "context" + "encoding/json" "fmt" + "os" "os/exec" "time" "github.com/microsoft/retina/ai/pkg/retrieval/flows/client" - "github.com/microsoft/retina/ai/pkg/util" flowpb "github.com/cilium/cilium/api/v1/flow" observerpb "github.com/cilium/cilium/api/v1/observer" @@ -20,12 +21,13 @@ import ( ) type Retriever struct { - log logrus.FieldLogger - config *rest.Config - clientset *kubernetes.Clientset - initialized bool - client *client.Client - flows []*flowpb.Flow + log logrus.FieldLogger + config *rest.Config + clientset *kubernetes.Clientset + initialized bool + client *client.Client + readFromFile bool + flows []*flowpb.Flow } func NewRetriever(log logrus.FieldLogger, config *rest.Config, clientset *kubernetes.Clientset) *Retriever { @@ -36,7 +38,16 @@ func NewRetriever(log logrus.FieldLogger, config *rest.Config, clientset *kubern } } +func (r *Retriever) UseFile() { + r.readFromFile = true +} + func (r *Retriever) Init() error { + if r.readFromFile { + r.log.Info("using flows from file") + return nil + } + client, err := client.New() if err != nil { return fmt.Errorf("failed to create grpc client. %v", err) @@ -49,17 +60,22 @@ func (r *Retriever) Init() error { return nil } -func (r *Retriever) Observe(ctx context.Context, maxFlows int) ([]*flowpb.Flow, error) { +func (r *Retriever) Observe(ctx context.Context, req *observerpb.GetFlowsRequest) ([]*flowpb.Flow, error) { + if r.readFromFile { + flows, err := readFlowsFromFile("flows.json") + if err != nil { + return nil, fmt.Errorf("failed to read flows from file. %v", err) + } + + return flows, nil + } + if !r.initialized { if err := r.Init(); err != nil { return nil, fmt.Errorf("failed to initialize. %v", err) } } - // translate parameters to flow request - // TODO: use parameters - req := flowsRequest() - // port-forward to hubble-relay portForwardCtx, portForwardCancel := context.WithCancel(ctx) defer portForwardCancel() @@ -73,7 +89,10 @@ func (r *Retriever) Observe(ctx context.Context, maxFlows int) ([]*flowpb.Flow, // observe flows observeCtx, observeCancel := context.WithTimeout(ctx, 30*time.Second) defer observeCancel() - flows, err := r.observeFlowsGRPC(observeCtx, req, maxFlows) + + // FIXME don't use maxFlows anymore? check for EOF? then remove this constant: MaxFlowsToAnalyze + maxFlows := req.Number + flows, err := r.observeFlowsGRPC(observeCtx, req, int(maxFlows)) if err != nil { return nil, fmt.Errorf("failed to observe flows over grpc. %v", err) } @@ -84,19 +103,13 @@ func (r *Retriever) Observe(ctx context.Context, maxFlows int) ([]*flowpb.Flow, _ = cmd.Wait() r.log.Info("stopped port-forward") - return flows, nil -} - -func flowsRequest() *observerpb.GetFlowsRequest { - return &observerpb.GetFlowsRequest{ - Number: util.MaxFlowsFromHubbleRelay, - Follow: false, - Whitelist: []*flowpb.FlowFilter{}, - Blacklist: nil, - Since: nil, - Until: nil, - First: false, + r.log.Info("saving flows to JSON") + if err := saveFlowsToJSON(flows, "flows.json"); err != nil { + r.log.WithError(err).Error("failed to save flows to JSON") + return nil, err } + + return flows, nil } func (r *Retriever) observeFlowsGRPC(ctx context.Context, req *observerpb.GetFlowsRequest, maxFlows int) ([]*flowpb.Flow, error) { @@ -164,3 +177,41 @@ func (r *Retriever) handleFlow(f *flowpb.Flow) { r.flows = append(r.flows, f) } + +func saveFlowsToJSON(flows []*flowpb.Flow, filename string) error { + for _, f := range flows { + // to avoid getting an error: + // failed to encode JSON: json: error calling MarshalJSON for type *flow.Flow: proto:\u00a0google.protobuf.Any: unable to resolve \"type.googleapis.com/utils.RetinaMetadata\": not found + f.Extensions = nil + } + + file, err := os.Create(filename) + if err != nil { + return fmt.Errorf("failed to create file: %w", err) + } + defer file.Close() + + encoder := json.NewEncoder(file) + encoder.SetIndent("", " ") // optional: to make the JSON output pretty + if err := encoder.Encode(flows); err != nil { + return fmt.Errorf("failed to encode JSON: %w", err) + } + + return nil +} + +func readFlowsFromFile(filename string) ([]*flowpb.Flow, error) { + file, err := os.Open(filename) + if err != nil { + return nil, fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + var flows []*flowpb.Flow + decoder := json.NewDecoder(file) + if err := decoder.Decode(&flows); err != nil { + return nil, fmt.Errorf("failed to decode JSON: %w", err) + } + + return flows, nil +} diff --git a/ai/pkg/scenarios/flows/handler.go b/ai/pkg/scenarios/flows/handler.go index 8c182115ad..71d64fd5e0 100644 --- a/ai/pkg/scenarios/flows/handler.go +++ b/ai/pkg/scenarios/flows/handler.go @@ -10,6 +10,8 @@ import ( flowretrieval "github.com/microsoft/retina/ai/pkg/retrieval/flows" "github.com/microsoft/retina/ai/pkg/util" + flowpb "github.com/cilium/cilium/api/v1/flow" + observerpb "github.com/cilium/cilium/api/v1/observer" "github.com/sirupsen/logrus" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -55,11 +57,14 @@ func (h *Handler) Handle(ctx context.Context, question string, chat lm.ChatHisto h.log.Info("handling flows scenario...") // get flows + // h.r.UseFile() + if err := h.r.Init(); err != nil { return "", fmt.Errorf("error initializing flow retriever: %w", err) } - flows, err := h.r.Observe(ctx, util.MaxFlowsToAnalyze) + req := flowsRequest(params) + flows, err := h.r.Observe(ctx, req) if err != nil { return "", fmt.Errorf("error observing flows: %w", err) } @@ -86,3 +91,106 @@ func (h *Handler) Handle(ctx context.Context, question string, chat lm.ChatHisto return resp, nil } + +// TODO DNS should not have a destination Pod (except maybe a specific coredns pod) +func flowsRequest(params *ScenarioParams) *observerpb.GetFlowsRequest { + req := &observerpb.GetFlowsRequest{ + Number: util.MaxFlowsFromHubbleRelay, + Follow: true, + } + + if len(params.Nodes) == 0 { + params.Nodes = nil + } + + protocol := []string{"TCP", "UDP"} + if params.Scenario == DnsScenario { + protocol = []string{"DNS"} + } + + if params.Namespace1 == "" && params.PodPrefix1 == "" && params.Namespace2 == "" && params.PodPrefix2 == "" { + req.Whitelist = []*flowpb.FlowFilter{ + { + NodeName: params.Nodes, + Protocol: protocol, + }, + } + + return req + } + + var prefix1 []string + if params.Namespace1 != "" || params.PodPrefix1 != "" { + prefix1 = append(prefix1, fmt.Sprintf("%s/%s", params.Namespace1, params.PodPrefix1)) + } + + var prefix2 []string + if params.Namespace2 != "" || params.PodPrefix2 != "" { + prefix2 = append(prefix2, fmt.Sprintf("%s/%s", params.Namespace2, params.PodPrefix2)) + } + + filterDirection1 := &flowpb.FlowFilter{ + NodeName: params.Nodes, + SourcePod: prefix1, + DestinationPod: prefix2, + Protocol: protocol, + } + + filterDirection2 := &flowpb.FlowFilter{ + NodeName: params.Nodes, + SourcePod: prefix2, + DestinationPod: prefix1, + Protocol: protocol, + } + + // filterPod1ToIP := &flowpb.FlowFilter{ + // NodeName: params.Nodes, + // SourcePod: prefix1, + // DestinationIp: []string{"10.224.1.214"}, + // Protocol: protocol, + // } + + // filterPod1FromIP := &flowpb.FlowFilter{ + // NodeName: params.Nodes, + // SourceIp: []string{"10.224.1.214"}, + // DestinationPod: prefix1, + // Protocol: protocol, + // } + + // includes services + // world := []string{"reserved:world"} + + // filterPod1ToWorld := &flowpb.FlowFilter{ + // NodeName: params.Nodes, + // SourcePod: prefix1, + // DestinationLabel: world, + // Protocol: protocol, + // } + + // filterPod1FromWorld := &flowpb.FlowFilter{ + // NodeName: params.Nodes, + // SourceLabel: world, + // DestinationPod: prefix1, + // Protocol: protocol, + // } + + req.Whitelist = []*flowpb.FlowFilter{ + filterDirection1, + filterDirection2, + // filterPod1FromIP, + // filterPod1ToIP, + } + + req.Whitelist = nil + + req.Blacklist = []*flowpb.FlowFilter{ + { + SourcePod: []string{"kube-system/"}, + }, + { + DestinationPod: []string{"kube-system/"}, + }, + } + + return req +} From 3c95618ee9b9eaf45d55ebbdd81203a86e2c1dc4 Mon Sep 17 00:00:00 2001 From: Hunter Gregory <42728408+huntergregory@users.noreply.github.com> Date: Mon, 5 Aug 2024 13:48:04 -0700 Subject: [PATCH 7/8] refactor: foundations for chatbot (code divided into scenario selection, scenario handling, and data retrieval) Signed-off-by: Hunter Gregory <42728408+huntergregory@users.noreply.github.com> --- ai/README.md | 4 - ai/main.go | 29 +- ai/pkg/analysis/flows/analyzer.go | 26 -- ai/pkg/analysis/flows/types.go | 78 ------ ai/pkg/chat/chat.go | 91 +++++++ ai/pkg/lm/azure-openai.go | 4 +- ai/pkg/lm/echo.go | 6 +- ai/pkg/lm/model.go | 2 +- ai/pkg/{analysis => parse}/flows/parser.go | 16 +- ai/pkg/parse/flows/types.go | 149 +++++++++++ ai/pkg/retrieval/flows/retriever.go | 3 +- ai/pkg/scenarios/common.go | 49 ++++ ai/pkg/scenarios/dns/dns.go | 39 +++ ai/pkg/scenarios/drops/drops.go | 251 ++++++++++++++++++ .../flows => scenarios/drops}/prompt.go | 2 +- ai/pkg/scenarios/flows/handler.go | 196 -------------- ai/pkg/scenarios/types.go | 95 +++++++ 17 files changed, 695 insertions(+), 345 deletions(-) delete mode 100644 ai/pkg/analysis/flows/analyzer.go delete mode 100644 ai/pkg/analysis/flows/types.go create mode 100644 ai/pkg/chat/chat.go rename ai/pkg/{analysis => parse}/flows/parser.go (83%) create mode 100644 ai/pkg/parse/flows/types.go create mode 100644 ai/pkg/scenarios/common.go create mode 100644 ai/pkg/scenarios/dns/dns.go create mode 100644 ai/pkg/scenarios/drops/drops.go rename ai/pkg/{analysis/flows => scenarios/drops}/prompt.go (96%) delete mode 100644 ai/pkg/scenarios/flows/handler.go create mode 100644 ai/pkg/scenarios/types.go diff --git a/ai/README.md b/ai/README.md index ccd4401080..5dafe7fb7e 100644 --- a/ai/README.md +++ b/ai/README.md @@ -15,7 +15,3 @@ - `$env:AOAI_COMPLETIONS_ENDPOINT = Read-Host 'Enter AOAI_COMPLETIONS_ENDPOINT'` - `$env:AOAI_DEPLOYMENT_NAME = Read-Host 'Enter AOAI_DEPLOYMENT_NAME'` - `go run main.go` - -## Development - -Modify prompts in the folders within *pkg/analysis/* (e.g. *pkg/analysis/flows/prompt.go* or *analyzer.go*) diff --git a/ai/main.go b/ai/main.go index d058fb19a0..d891c05cc3 100644 --- a/ai/main.go +++ b/ai/main.go @@ -1,14 +1,11 @@ package main import ( - "context" - + "github.com/microsoft/retina/ai/pkg/chat" "github.com/microsoft/retina/ai/pkg/lm" - flowscenario "github.com/microsoft/retina/ai/pkg/scenarios/flows" "github.com/sirupsen/logrus" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ) @@ -43,26 +40,8 @@ func main() { } log.Info("initialized Azure OpenAI model") - handleChat(log, config, clientset, model) -} - -// pretend there's input from chat interface -func handleChat(log logrus.FieldLogger, config *rest.Config, clientset *kubernetes.Clientset, model lm.Model) { - question := "what's wrong with my app?" - var chat lm.ChatHistory - - h := flowscenario.NewHandler(log, config, clientset, model) - params := &flowscenario.ScenarioParams{ - Scenario: flowscenario.DropScenario, - Namespace1: "default", - Namespace2: "default", + bot := chat.NewBot(log, config, clientset, model) + if err := bot.Loop(); err != nil { + log.WithError(err).Fatal("error running chat loop") } - - ctx := context.TODO() - response, err := h.Handle(ctx, question, chat, params) - if err != nil { - log.WithError(err).Fatal("error running flows scenario") - } - - _ = response } diff --git a/ai/pkg/analysis/flows/analyzer.go b/ai/pkg/analysis/flows/analyzer.go deleted file mode 100644 index 7a30ab2450..0000000000 --- a/ai/pkg/analysis/flows/analyzer.go +++ /dev/null @@ -1,26 +0,0 @@ -package flows - -import ( - "context" - "fmt" - - "github.com/microsoft/retina/ai/pkg/lm" - "github.com/sirupsen/logrus" -) - -type Analyzer struct { - log logrus.FieldLogger - model lm.Model -} - -func NewAnalyzer(log logrus.FieldLogger, model lm.Model) *Analyzer { - return &Analyzer{ - log: logrus.WithField("component", "flow-analyzer"), - model: model, - } -} - -func (a *Analyzer) Analyze(ctx context.Context, query string, chat lm.ChatHistory, summary FlowSummary) (string, error) { - message := fmt.Sprintf(messagePromptTemplate, query, summary.FormatForLM()) - return a.model.Generate(ctx, systemPrompt, chat, message) -} diff --git a/ai/pkg/analysis/flows/types.go b/ai/pkg/analysis/flows/types.go deleted file mode 100644 index 3fa5a4f66b..0000000000 --- a/ai/pkg/analysis/flows/types.go +++ /dev/null @@ -1,78 +0,0 @@ -package flows - -import ( - "errors" - "fmt" - "strings" - - flowpb "github.com/cilium/cilium/api/v1/flow" -) - -var ErrNoEndpointName = errors.New("no endpoint name") - -type Connection struct { - Pod1 string - Pod2 string - Key string - Flows []*flowpb.Flow -} - -type FlowSummary map[string]*Connection - -func (fs FlowSummary) FormatForLM() string { - // FIXME hacky right now - forwards := fs.connStrings(flowpb.Verdict_FORWARDED) - drops := fs.connStrings(flowpb.Verdict_DROPPED) - other := fs.connStrings(flowpb.Verdict_VERDICT_UNKNOWN) - - return fmt.Sprintf("SUCCESSFUL CONNECTIONS:\n%s\n\nDROPPED CONNECTIONS:\n%s\n\nOTHER CONNECTIONS:\n%s", forwards, drops, other) -} - -func (fs FlowSummary) connStrings(verdict flowpb.Verdict) string { - connStrings := make([]string, 0, len(fs)) - for _, conn := range fs { - match := false - for _, f := range conn.Flows { - // FIXME hacky right now - if f.GetVerdict() == verdict || (verdict == flowpb.Verdict_VERDICT_UNKNOWN && f.GetVerdict() != flowpb.Verdict_FORWARDED && f.GetVerdict() != flowpb.Verdict_DROPPED) { - match = true - break - } - } - - if !match { - continue - } - - connString := "" - if verdict == flowpb.Verdict_FORWARDED && conn.Flows[0].L4.GetTCP() != nil { - successful := false - rst := false - for _, f := range conn.Flows { - if f.GetVerdict() == flowpb.Verdict_FORWARDED && f.L4.GetTCP().GetFlags().GetSYN() && f.L4.GetTCP().GetFlags().GetACK() { - successful = true - continue - } - - if f.GetVerdict() == flowpb.Verdict_FORWARDED && f.L4.GetTCP().GetFlags().GetRST() { - rst = true - continue - } - } - _ = successful - connString = fmt.Sprintf("Connection: %s -> %s, Number of Flows: %d. Was Reset: %v", conn.Pod1, conn.Pod2, len(conn.Flows), rst) - - } else { - - connString = fmt.Sprintf("Connection: %s -> %s, Number of Flows: %d", conn.Pod1, conn.Pod2, len(conn.Flows)) - } - - connStrings = append(connStrings, connString) - } - - if len(connStrings) == 0 { - return "none" - } - - return strings.Join(connStrings, "\n") -} diff --git a/ai/pkg/chat/chat.go b/ai/pkg/chat/chat.go new file mode 100644 index 0000000000..55e2d5daa1 --- /dev/null +++ b/ai/pkg/chat/chat.go @@ -0,0 +1,91 @@ +package chat + +import ( + "context" + "fmt" + + "github.com/microsoft/retina/ai/pkg/lm" + flowretrieval "github.com/microsoft/retina/ai/pkg/retrieval/flows" + "github.com/microsoft/retina/ai/pkg/scenarios" + "github.com/microsoft/retina/ai/pkg/scenarios/dns" + "github.com/microsoft/retina/ai/pkg/scenarios/drops" + + "github.com/sirupsen/logrus" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +var ( + definitions = []*scenarios.Definition{ + drops.Definition, + dns.Definition, + } +) + +type Bot struct { + log logrus.FieldLogger + config *rest.Config + clientset *kubernetes.Clientset + model lm.Model +} + +// input log, config, clientset, model +func NewBot(log logrus.FieldLogger, config *rest.Config, clientset *kubernetes.Clientset, model lm.Model) *Bot { + return &Bot{ + log: log.WithField("component", "chat"), + config: config, + clientset: clientset, + model: model, + } +} + +func (b *Bot) Loop() error { + var history lm.ChatHistory + flowRetriever := flowretrieval.NewRetriever(b.log, b.config, b.clientset) + + for { + // TODO get user input + question := "what's wrong with my app?" + + // select scenario and get parameters + definition, params, err := b.selectScenario(question, history) + if err != nil { + return fmt.Errorf("error selecting scenario: %w", err) + } + + // cfg.FlowRetriever.UseFile() + + cfg := &scenarios.Config{ + Log: b.log, + Config: b.config, + Clientset: b.clientset, + Model: b.model, + FlowRetriever: flowRetriever, + } + + ctx := context.TODO() + response, err := definition.Handle(ctx, cfg, params, question, history) + if err != nil { + return fmt.Errorf("error handling scenario: %w", err) + } + + fmt.Println(response) + + // TODO keep chat loop going + break + } + + return nil +} + +func (b *Bot) selectScenario(question string, history lm.ChatHistory) (*scenarios.Definition, map[string]string, error) { + // TODO use chat interface + // FIXME hard-coding the scenario and params for now + d := definitions[0] + params := map[string]string{ + scenarios.Namespace1.Name: "default", + scenarios.Namespace2.Name: "default", + } + + return d, params, nil +} diff --git a/ai/pkg/lm/azure-openai.go b/ai/pkg/lm/azure-openai.go index 0eb43e5186..0fd846c205 100644 --- a/ai/pkg/lm/azure-openai.go +++ b/ai/pkg/lm/azure-openai.go @@ -65,11 +65,11 @@ func NewAzureOpenAI() (*AzureOpenAI, error) { return aoai, nil } -func (m *AzureOpenAI) Generate(ctx context.Context, systemPrompt string, chat ChatHistory, message string) (string, error) { +func (m *AzureOpenAI) Generate(ctx context.Context, systemPrompt string, history ChatHistory, message string) (string, error) { messages := []azopenai.ChatRequestMessageClassification{ &azopenai.ChatRequestSystemMessage{Content: to.Ptr(systemPrompt)}, } - for _, pair := range chat { + for _, pair := range history { messages = append(messages, &azopenai.ChatRequestUserMessage{Content: azopenai.NewChatRequestUserMessageContent(pair.User)}) messages = append(messages, &azopenai.ChatRequestAssistantMessage{Content: to.Ptr(pair.Assistant)}) } diff --git a/ai/pkg/lm/echo.go b/ai/pkg/lm/echo.go index 24f53b37b4..e4c1143212 100644 --- a/ai/pkg/lm/echo.go +++ b/ai/pkg/lm/echo.go @@ -13,9 +13,9 @@ func NewEchoModel() *EchoModel { return &EchoModel{} } -func (m *EchoModel) Generate(ctx context.Context, systemPrompt string, chat ChatHistory, message string) (string, error) { - chatStrings := make([]string, 0, len(chat)) - for _, pair := range chat { +func (m *EchoModel) Generate(ctx context.Context, systemPrompt string, history ChatHistory, message string) (string, error) { + chatStrings := make([]string, 0, len(history)) + for _, pair := range history { chatStrings = append(chatStrings, fmt.Sprintf("USER: %s\nASSISTANT: %s\n", pair.User, pair.Assistant)) } resp := fmt.Sprintf("systemPrompt: %s\nhistory: %s\nmessage: %s", systemPrompt, strings.Join(chatStrings, "\n"), message) diff --git a/ai/pkg/lm/model.go b/ai/pkg/lm/model.go index b848e30fb7..7e9eaca3d9 100644 --- a/ai/pkg/lm/model.go +++ b/ai/pkg/lm/model.go @@ -10,5 +10,5 @@ type MessagePair struct { type ChatHistory []MessagePair type Model interface { - Generate(ctx context.Context, systemPrompt string, chat ChatHistory, message string) (string, error) + Generate(ctx context.Context, systemPrompt string, history ChatHistory, message string) (string, error) } diff --git a/ai/pkg/analysis/flows/parser.go b/ai/pkg/parse/flows/parser.go similarity index 83% rename from ai/pkg/analysis/flows/parser.go rename to ai/pkg/parse/flows/parser.go index 2862e5a0c6..bea8789791 100644 --- a/ai/pkg/analysis/flows/parser.go +++ b/ai/pkg/parse/flows/parser.go @@ -9,19 +9,19 @@ import ( ) type Parser struct { - log logrus.FieldLogger - summary FlowSummary + log logrus.FieldLogger + connections Connections } func NewParser(log logrus.FieldLogger) *Parser { return &Parser{ - log: log.WithField("component", "flow-parser"), - summary: make(map[string]*Connection), + log: log.WithField("component", "flow-parser"), + connections: make(map[string]*Connection), } } -func (p *Parser) Summary() FlowSummary { - return p.summary +func (p *Parser) Connections() Connections { + return p.connections } func (p *Parser) Parse(flows []*flowpb.Flow) { @@ -57,7 +57,7 @@ func (p *Parser) addFlow(f *flowpb.Flow) error { pod1, pod2 := pods[0], pods[1] key := pod1 + "#" + pod2 - conn, exists := p.summary[key] + conn, exists := p.connections[key] if !exists { conn = &Connection{ Pod1: pod1, @@ -65,7 +65,7 @@ func (p *Parser) addFlow(f *flowpb.Flow) error { Key: key, Flows: []*flowpb.Flow{}, } - p.summary[key] = conn + p.connections[key] = conn } conn.Flows = append(conn.Flows, f) diff --git a/ai/pkg/parse/flows/types.go b/ai/pkg/parse/flows/types.go new file mode 100644 index 0000000000..02f6a8e833 --- /dev/null +++ b/ai/pkg/parse/flows/types.go @@ -0,0 +1,149 @@ +package flows + +import ( + "errors" + + flowpb "github.com/cilium/cilium/api/v1/flow" +) + +var ( + ErrNoEndpointName = errors.New("no endpoint name") + ErrNilEndpoint = errors.New("nil endpoint") +) + +type Connection struct { + Pod1 string + Pod2 string + Key string + + // UDP *UdpSummary + // TCP *TcpSummary + Flows []*flowpb.Flow +} + +type Connections map[string]*Connection + +// func + +// type UdpSummary struct { +// MinLatency time.Duration +// MaxLatency time.Duration +// AvgLatency time.Duration +// TotalPackets int +// TotalBytes int +// } + +// type TcpSummary struct { +// MinLatency time.Duration +// MaxLatency time.Duration +// AvgLatency time.Duration +// TotalPackets int +// TotalBytes int +// *TcpFlagSummary +// } + +// type TcpFlagSummary struct { +// SynCount int +// AckCount int +// SynAckCount int +// FinCount int +// RstCount int +// } + +// type FlowSummary map[string]*Connection + +// func (fs FlowSummary) Aggregate() { +// for _, conn := range fs { +// udpTimestamps := make(map[string][]*timestamppb.Timestamp) +// tcpTimestamps := make(map[string][]*timestamppb.Timestamp) +// for _, f := range conn.Flows { +// l4 := f.GetL4() +// if l4 == nil { +// continue +// } + +// udp := l4.GetUDP() +// if udp != nil { +// if conn.UDP == nil { +// conn.UDP = &UdpSummary{} +// } + +// conn.UDP.TotalPackets += 1 + +// src, err := endpointName(f.GetSource()) +// if err != nil { +// // FIXME warn and continue +// log.Fatalf("bad src endpoint while aggregating: %w", err) +// } +// dst, err := endpointName(f.GetDestination()) +// if err != nil { +// // FIXME warn and continue +// log.Fatalf("bad dst endpoint while aggregating: %w", err) +// } + +// tuple := fmt.Sprintf("%s:%d -> %s:%d", src, udp.GetSourcePort(), dst, udp.GetDestinationPort()) + +// time := f.GetTime() +// if time == nil { +// // FIXME warn and continue +// log.Fatalf("nil time while aggregating") +// } + +// udpTimestamps[tuple] = append(udpTimestamps[tuple], f.GetTime()) +// } + +// tcp := l4.GetTCP() +// if tcp != nil { +// if conn.TCP == nil { +// conn.TCP = &TcpSummary{} +// } + +// conn.TCP.TotalPackets += 1 + +// if conn.TCP.TcpFlagSummary == nil { +// conn.TCP.TcpFlagSummary = &TcpFlagSummary{} +// } + +// flags := tcp.GetFlags() +// if flags == nil { +// // FIXME warn and continue +// log.Fatalf("nil flags while aggregating") +// } + +// switch { +// case flags.SYN && flags.ACK: +// conn.TCP.TcpFlagSummary.SynAckCount += 1 +// case flags.SYN: +// conn.TCP.TcpFlagSummary.SynCount += 1 +// case flags.ACK: +// conn.TCP.TcpFlagSummary.AckCount += 1 +// case flags.FIN: +// conn.TCP.TcpFlagSummary.FinCount += 1 +// case flags.RST: +// conn.TCP.TcpFlagSummary.RstCount += 1 +// } + +// src, err := endpointName(f.GetSource()) +// if err != nil { +// // FIXME warn and continue +// log.Fatalf("bad src endpoint while aggregating: %w", err) +// } +// dst, err := endpointName(f.GetDestination()) +// if err != nil { +// // FIXME warn and continue +// log.Fatalf("bad dst endpoint while aggregating: %w", err) +// } + +// tuple := fmt.Sprintf("%s:%d -> %s:%d", src, udp.GetSourcePort(), dst, udp.GetDestinationPort()) + +// time := f.GetTime() +// if time == nil { +// // FIXME warn and continue +// log.Fatalf("nil time while aggregating") +// } + +// tcpTimestamps[tuple] = append(tcpTimestamps[tuple], f.GetTime()) +// } +// } +// } +// } diff --git a/ai/pkg/retrieval/flows/retriever.go b/ai/pkg/retrieval/flows/retriever.go index aeb0922c0b..bb1cb18191 100644 --- a/ai/pkg/retrieval/flows/retriever.go +++ b/ai/pkg/retrieval/flows/retriever.go @@ -20,6 +20,8 @@ import ( "k8s.io/client-go/rest" ) +const MaxFlowsFromHubbleRelay = 30000 + type Retriever struct { log logrus.FieldLogger config *rest.Config @@ -90,7 +92,6 @@ func (r *Retriever) Observe(ctx context.Context, req *observerpb.GetFlowsRequest observeCtx, observeCancel := context.WithTimeout(ctx, 30*time.Second) defer observeCancel() - // FIXME don't use maxFlows anymore? check for EOF? then remove this constant: MaxFlowsToAnalyze maxFlows := req.Number flows, err := r.observeFlowsGRPC(observeCtx, req, int(maxFlows)) if err != nil { diff --git a/ai/pkg/scenarios/common.go b/ai/pkg/scenarios/common.go new file mode 100644 index 0000000000..4ec8713f9c --- /dev/null +++ b/ai/pkg/scenarios/common.go @@ -0,0 +1,49 @@ +package scenarios + +import "regexp" + +// common parameters +var ( + k8sNameRegex = regexp.MustCompile(`^[a-zA-Z][-a-zA-Z0-9]*$`) + nodesRegex = regexp.MustCompile(`^\[[a-zA-Z][-a-zA-Z0-9_,]*\]$`) + + Namespace1 = &ParameterSpec{ + Name: "namespace1", + DataType: "string", + Description: "Namespace 1", + Optional: false, + Regex: k8sNameRegex, + } + + PodPrefix1 = &ParameterSpec{ + Name: "podPrefix1", + DataType: "string", + Description: "Pod prefix 1", + Optional: true, + Regex: k8sNameRegex, + } + + Namespace2 = &ParameterSpec{ + Name: "namespace2", + DataType: "string", + Description: "Namespace 2", + Optional: false, + Regex: k8sNameRegex, + } + + PodPrefix2 = &ParameterSpec{ + Name: "podPrefix2", + DataType: "string", + Description: "Pod prefix 2", + Optional: true, + Regex: k8sNameRegex, + } + + Nodes = &ParameterSpec{ + Name: "nodes", + DataType: "[]string", + Description: "Nodes", + Optional: true, + Regex: nodesRegex, + } +) diff --git a/ai/pkg/scenarios/dns/dns.go b/ai/pkg/scenarios/dns/dns.go new file mode 100644 index 0000000000..36ab7424a0 --- /dev/null +++ b/ai/pkg/scenarios/dns/dns.go @@ -0,0 +1,39 @@ +package dns + +import ( + "context" + + "github.com/microsoft/retina/ai/pkg/lm" + "github.com/microsoft/retina/ai/pkg/scenarios" +) + +var ( + Definition = scenarios.NewDefinition("DNS", "DNS", parameterSpecs, &handler{}) + + dnsQuery = &scenarios.ParameterSpec{ + Name: "dnsQuery", + DataType: "string", + Description: "DNS query", + Optional: true, + } + + parameterSpecs = []*scenarios.ParameterSpec{ + scenarios.Namespace1, + scenarios.Namespace2, + dnsQuery, + } +) + +// mirrored with parameterSpecs +type params struct { + Namespace1 string + Namespace2 string + DNSQuery string +} + +type handler struct{} + +func (h *handler) Handle(ctx context.Context, cfg *scenarios.Config, typedParams map[string]any, question string, history lm.ChatHistory) (string, error) { + // TODO + return "", nil +} diff --git a/ai/pkg/scenarios/drops/drops.go b/ai/pkg/scenarios/drops/drops.go new file mode 100644 index 0000000000..8373748508 --- /dev/null +++ b/ai/pkg/scenarios/drops/drops.go @@ -0,0 +1,251 @@ +package drops + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/microsoft/retina/ai/pkg/lm" + flowparsing "github.com/microsoft/retina/ai/pkg/parse/flows" + flowretrieval "github.com/microsoft/retina/ai/pkg/retrieval/flows" + "github.com/microsoft/retina/ai/pkg/scenarios" + + flowpb "github.com/cilium/cilium/api/v1/flow" + observerpb "github.com/cilium/cilium/api/v1/observer" +) + +var ( + Definition = scenarios.NewDefinition("DROPS", "DROPS", parameterSpecs, &handler{}) + + parameterSpecs = []*scenarios.ParameterSpec{ + scenarios.Namespace1, + scenarios.PodPrefix1, + scenarios.Namespace2, + scenarios.PodPrefix2, + scenarios.Nodes, + } +) + +// mirrored with parameterSpecs +type params struct { + Namespace1 string + PodPrefix1 string + Namespace2 string + PodPrefix2 string + Nodes []string +} + +type handler struct{} + +func (h *handler) Handle(ctx context.Context, cfg *scenarios.Config, typedParams map[string]any, question string, history lm.ChatHistory) (string, error) { + l := cfg.Log.WithField("scenario", "drops") + l.Info("handling drops scenario...") + + if err := cfg.FlowRetriever.Init(); err != nil { + return "", fmt.Errorf("error initializing flow retriever: %w", err) + } + + params := ¶ms{ + Namespace1: anyToString(typedParams[scenarios.Namespace1.Name]), + PodPrefix1: anyToString(typedParams[scenarios.PodPrefix1.Name]), + Namespace2: anyToString(typedParams[scenarios.Namespace2.Name]), + PodPrefix2: anyToString(typedParams[scenarios.PodPrefix2.Name]), + Nodes: anyToStringSlice(typedParams[scenarios.Nodes.Name]), + } + + req := flowsRequest(params) + flows, err := cfg.FlowRetriever.Observe(ctx, req) + if err != nil { + return "", fmt.Errorf("error observing flows: %w", err) + } + l.Info("observed flows") + + // analyze flows + p := flowparsing.NewParser(l) + p.Parse(flows) + connections := p.Connections() + + formattedFlowLogs := formatFlowLogs(connections) + + message := fmt.Sprintf(messagePromptTemplate, question, formattedFlowLogs) + analyzeCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + resp, err := cfg.Model.Generate(analyzeCtx, systemPrompt, history, message) + if err != nil { + return "", fmt.Errorf("error analyzing flows: %w", err) + } + l.Info("analyzed flows") + + return resp, nil +} + +// cast to string without nil panics +func anyToString(a any) string { + if a == nil { + return "" + } + return a.(string) +} + +// cast to []string without nil panics +func anyToStringSlice(a any) []string { + if a == nil { + return nil + } + return a.([]string) +} + +func flowsRequest(params *params) *observerpb.GetFlowsRequest { + req := &observerpb.GetFlowsRequest{ + Number: flowretrieval.MaxFlowsFromHubbleRelay, + Follow: true, + } + + protocol := []string{"TCP", "UDP"} + + if params.Namespace1 == "" && params.PodPrefix1 == "" && params.Namespace2 == "" && params.PodPrefix2 == "" { + req.Whitelist = []*flowpb.FlowFilter{ + { + NodeName: params.Nodes, + Protocol: protocol, + }, + } + + return req + } + + var prefix1 []string + if params.Namespace1 != "" || params.PodPrefix1 != "" { + prefix1 = append(prefix1, fmt.Sprintf("%s/%s", params.Namespace1, params.PodPrefix1)) + } + + var prefix2 []string + if params.Namespace2 != "" || params.PodPrefix2 != "" { + prefix2 = append(prefix2, fmt.Sprintf("%s/%s", params.Namespace2, params.PodPrefix2)) + } + + filterDirection1 := &flowpb.FlowFilter{ + NodeName: params.Nodes, + SourcePod: prefix1, + DestinationPod: prefix2, + Protocol: protocol, + } + + filterDirection2 := &flowpb.FlowFilter{ + NodeName: params.Nodes, + SourcePod: prefix2, + DestinationPod: prefix1, + Protocol: protocol, + } + + // filterPod1ToIP := &flowpb.FlowFilter{ + // NodeName: params.Nodes, + // SourcePod: prefix1, + // DestinationIp: []string{"10.224.1.214"}, + // Protocol: protocol, + // } + + // filterPod1FromIP := &flowpb.FlowFilter{ + // NodeName: params.Nodes, + // SourceIp: []string{"10.224.1.214"}, + // DestinationPod: prefix1, + // Protocol: protocol, + // } + + // includes services + // world := []string{"reserved:world"} + + // filterPod1ToWorld := &flowpb.FlowFilter{ + // NodeName: params.Nodes, + // SourcePod: prefix1, + // DestinationLabel: world, + // Protocol: protocol, + // } + + // filterPod1FromWorld := &flowpb.FlowFilter{ + // NodeName: params.Nodes, + // SourceLabel: world, + // DestinationPod: prefix1, + // Protocol: protocol, + // } + + req.Whitelist = []*flowpb.FlowFilter{ + filterDirection1, + filterDirection2, + // filterPod1FromIP, + // filterPod1ToIP, + } + + req.Whitelist = nil + + req.Blacklist = []*flowpb.FlowFilter{ + { + SourcePod: []string{"kube-system/"}, + }, + { + DestinationPod: []string{"kube-system/"}, + }, + } + + return req +} + +func formatFlowLogs(connections flowparsing.Connections) string { + // FIXME hacky right now + forwards := connStrings(connections, flowpb.Verdict_FORWARDED) + + drops := connStrings(connections, flowpb.Verdict_DROPPED) + other := connStrings(connections, flowpb.Verdict_VERDICT_UNKNOWN) + + return fmt.Sprintf("SUCCESSFUL CONNECTIONS:\n%s\n\nDROPPED CONNECTIONS:\n%s\n\nOTHER CONNECTIONS:\n%s", forwards, drops, other) +} + +func connStrings(connections flowparsing.Connections, verdict flowpb.Verdict) string { + connStrings := make([]string, 0, len(connections)) + for _, conn := range connections { + match := false + for _, f := range conn.Flows { + // FIXME hacky right now + if f.GetVerdict() == verdict || (verdict == flowpb.Verdict_VERDICT_UNKNOWN && f.GetVerdict() != flowpb.Verdict_FORWARDED && f.GetVerdict() != flowpb.Verdict_DROPPED) { + match = true + break + } + } + + if !match { + continue + } + + connString := "" + if verdict == flowpb.Verdict_FORWARDED && conn.Flows[0].L4.GetTCP() != nil { + successful := false + rst := false + for _, f := range conn.Flows { + if f.GetVerdict() == flowpb.Verdict_FORWARDED && f.L4.GetTCP().GetFlags().GetSYN() && f.L4.GetTCP().GetFlags().GetACK() { + successful = true + continue + } + + if f.GetVerdict() == flowpb.Verdict_FORWARDED && f.L4.GetTCP().GetFlags().GetRST() { + rst = true + continue + } + } + _ = successful + connString = fmt.Sprintf("Connection: %s -> %s, Number of Flows: %d. Was Reset: %v", conn.Pod1, conn.Pod2, len(conn.Flows), rst) + + } else { + + connString = fmt.Sprintf("Connection: %s -> %s, Number of Flows: %d", conn.Pod1, conn.Pod2, len(conn.Flows)) + } + + connStrings = append(connStrings, connString) + } + + if len(connStrings) == 0 { + return "none" + } + + return strings.Join(connStrings, "\n") +} diff --git a/ai/pkg/analysis/flows/prompt.go b/ai/pkg/scenarios/drops/prompt.go similarity index 96% rename from ai/pkg/analysis/flows/prompt.go rename to ai/pkg/scenarios/drops/prompt.go index fac5113207..a6d0fd1f03 100644 --- a/ai/pkg/analysis/flows/prompt.go +++ b/ai/pkg/scenarios/drops/prompt.go @@ -1,4 +1,4 @@ -package flows +package drops const ( systemPrompt = `You are an assistant with expertise in Kubernetes Networking. The user is debugging networking issues on their Pods and/or Nodes. Provide a succinct summary identifying any issues in the "summary of network flow logs" provided by the user.` diff --git a/ai/pkg/scenarios/flows/handler.go b/ai/pkg/scenarios/flows/handler.go deleted file mode 100644 index 71d64fd5e0..0000000000 --- a/ai/pkg/scenarios/flows/handler.go +++ /dev/null @@ -1,196 +0,0 @@ -package flows - -import ( - "context" - "fmt" - "time" - - flowanalysis "github.com/microsoft/retina/ai/pkg/analysis/flows" - "github.com/microsoft/retina/ai/pkg/lm" - flowretrieval "github.com/microsoft/retina/ai/pkg/retrieval/flows" - "github.com/microsoft/retina/ai/pkg/util" - - flowpb "github.com/cilium/cilium/api/v1/flow" - observerpb "github.com/cilium/cilium/api/v1/observer" - "github.com/sirupsen/logrus" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" -) - -type FlowScenario string - -const ( - AnyScenario FlowScenario = "Any" - DropScenario FlowScenario = "Drops" - DnsScenario FlowScenario = "DNS" -) - -type ScenarioParams struct { - Scenario FlowScenario - - // parameters (all optional?) - DnsQuery string - Nodes []string - Namespace1 string - PodPrefix1 string - Namespace2 string - PodPrefix2 string -} - -type Handler struct { - log logrus.FieldLogger - r *flowretrieval.Retriever - p *flowanalysis.Parser - a *flowanalysis.Analyzer -} - -func NewHandler(log logrus.FieldLogger, config *rest.Config, clientset *kubernetes.Clientset, model lm.Model) *Handler { - return &Handler{ - log: log.WithField("component", "flow-handler"), - r: flowretrieval.NewRetriever(log, config, clientset), - p: flowanalysis.NewParser(log), - a: flowanalysis.NewAnalyzer(log, model), - } -} - -func (h *Handler) Handle(ctx context.Context, question string, chat lm.ChatHistory, params *ScenarioParams) (string, error) { - h.log.Info("handling flows scenario...") - - // get flows - // h.r.UseFile() - - if err := h.r.Init(); err != nil { - return "", fmt.Errorf("error initializing flow retriever: %w", err) - } - - req := flowsRequest(params) - flows, err := h.r.Observe(ctx, req) - if err != nil { - return "", fmt.Errorf("error observing flows: %w", err) - } - h.log.Info("observed flows") - - // analyze flows - h.p.Parse(flows) - summary := h.p.Summary() - - analyzeCtx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - resp, err := h.a.Analyze(analyzeCtx, question, chat, summary) - if err != nil { - return "", fmt.Errorf("error analyzing flows: %w", err) - } - h.log.Info("analyzed flows") - - // temporary printing - fmt.Println("flow summary:") - fmt.Println(summary.FormatForLM()) - fmt.Println() - fmt.Println("response:") - fmt.Println(resp) - - return resp, nil -} - -// TODO DNS should not have a destination Pod (except maybe a specific coredns pod) -func flowsRequest(params *ScenarioParams) *observerpb.GetFlowsRequest { - req := &observerpb.GetFlowsRequest{ - Number: util.MaxFlowsFromHubbleRelay, - Follow: true, - } - - if len(params.Nodes) == 0 { - params.Nodes = nil - } - - protocol := []string{"TCP", "UDP"} - if params.Scenario == DnsScenario { - protocol = []string{"DNS"} - } - - if params.Namespace1 == "" && params.PodPrefix1 == "" && params.Namespace2 == "" && params.PodPrefix2 == "" { - req.Whitelist = []*flowpb.FlowFilter{ - { - NodeName: params.Nodes, - Protocol: protocol, - }, - } - - return req - } - - var prefix1 []string - if params.Namespace1 != "" || params.PodPrefix1 != "" { - prefix1 = append(prefix1, fmt.Sprintf("%s/%s", params.Namespace1, params.PodPrefix1)) - } - - var prefix2 []string - if params.Namespace2 != "" || params.PodPrefix2 != "" { - prefix2 = append(prefix2, fmt.Sprintf("%s/%s", params.Namespace2, params.PodPrefix2)) - } - - filterDirection1 := &flowpb.FlowFilter{ - NodeName: params.Nodes, - SourcePod: prefix1, - DestinationPod: prefix2, - Protocol: protocol, - } - - filterDirection2 := &flowpb.FlowFilter{ - NodeName: params.Nodes, - SourcePod: prefix2, - DestinationPod: prefix1, - Protocol: protocol, - } - - // filterPod1ToIP := &flowpb.FlowFilter{ - // NodeName: params.Nodes, - // SourcePod: prefix1, - // DestinationIp: []string{"10.224.1.214"}, - // Protocol: protocol, - // } - - // filterPod1FromIP := &flowpb.FlowFilter{ - // NodeName: params.Nodes, - // SourceIp: []string{"10.224.1.214"}, - // DestinationPod: prefix1, - // Protocol: protocol, - // } - - // includes services - // world := []string{"reserved:world"} - - // filterPod1ToWorld := &flowpb.FlowFilter{ - // NodeName: params.Nodes, - // SourcePod: prefix1, - // DestinationLabel: world, - // Protocol: protocol, - // } - - // filterPod1FromWorld := &flowpb.FlowFilter{ - // NodeName: params.Nodes, - // SourceLabel: world, - // DestinationPod: prefix1, - // Protocol: protocol, - // } - - req.Whitelist = []*flowpb.FlowFilter{ - filterDirection1, - filterDirection2, - // filterPod1FromIP, - // filterPod1ToIP, - } - - req.Whitelist = nil - - req.Blacklist = []*flowpb.FlowFilter{ - { - SourcePod: []string{"kube-system/"}, - }, - { - DestinationPod: []string{"kube-system/"}, - }, - } - - return req -} diff --git a/ai/pkg/scenarios/types.go b/ai/pkg/scenarios/types.go new file mode 100644 index 0000000000..0ae48fe17c --- /dev/null +++ b/ai/pkg/scenarios/types.go @@ -0,0 +1,95 @@ +package scenarios + +import ( + "context" + "fmt" + "regexp" + "strconv" + "strings" + + "github.com/microsoft/retina/ai/pkg/lm" + flowretrieval "github.com/microsoft/retina/ai/pkg/retrieval/flows" + + "github.com/sirupsen/logrus" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +type Definition struct { + Name string + Description string + Specs []*ParameterSpec + handler +} + +func NewDefinition(name, description string, specs []*ParameterSpec, handler handler) *Definition { + return &Definition{ + Name: name, + Description: description, + Specs: specs, + handler: handler, + } +} + +func (d *Definition) Handle(ctx context.Context, cfg *Config, rawParams map[string]string, question string, history lm.ChatHistory) (string, error) { + typedParams := make(map[string]any) + + // validate params + for _, p := range d.Specs { + raw, ok := rawParams[p.Name] + if !ok { + if !p.Optional { + return "", fmt.Errorf("missing required parameter %s", p.Name) + } + + continue + } + + if p.Regex != nil && !p.Regex.MatchString(raw) { + return "", fmt.Errorf("parameter %s does not match regex format", p.Name) + } + + switch p.DataType { + case "string": + typedParams[p.Name] = raw + case "int": + i, err := strconv.Atoi(raw) + if err != nil { + return "", fmt.Errorf("parameter %s is not an integer", p.Name) + } + typedParams[p.Name] = i + case "[]string": + // make sure the format is like [a,b,c] + if raw == "" || raw[0] != '[' || raw[len(raw)-1] != ']' || strings.Count(raw, "[") != 1 || strings.Count(raw, "]") != 1 { + return "", fmt.Errorf("invalid array format for parameter %s", p.Name) + } + // remove brackets + raw = raw[1 : len(raw)-1] + typedParams[p.Name] = strings.Split(raw, ",") + default: + return "", fmt.Errorf("unsupported data type %s", p.DataType) + } + } + + return d.handler.Handle(ctx, cfg, typedParams, question, history) +} + +type ParameterSpec struct { + Name string + DataType string + Description string + Optional bool + Regex *regexp.Regexp +} + +type handler interface { + Handle(ctx context.Context, cfg *Config, typedParams map[string]any, question string, history lm.ChatHistory) (string, error) +} + +type Config struct { + Log logrus.FieldLogger + Config *rest.Config + Clientset *kubernetes.Clientset + Model lm.Model + FlowRetriever *flowretrieval.Retriever +} From aee063f77a5dca1e67db016b28cf5220fdbded55 Mon Sep 17 00:00:00 2001 From: Hunter Gregory <42728408+huntergregory@users.noreply.github.com> Date: Tue, 6 Aug 2024 16:33:57 -0700 Subject: [PATCH 8/8] feat: dns scenario and other improvements Signed-off-by: Hunter Gregory <42728408+huntergregory@users.noreply.github.com> --- ai/README.md | 2 +- ai/main.go | 87 ++++++++-- ai/pkg/chat/chat.go | 101 ++++++----- ai/pkg/chat/prompt.go | 30 ++++ ai/pkg/parse/flows/parser.go | 4 + ai/pkg/retrieval/flows/client/client.go | 2 +- ai/pkg/retrieval/flows/retriever.go | 17 +- ai/pkg/scenarios/common.go | 2 +- ai/pkg/scenarios/dns/dns.go | 217 +++++++++++++++++++++++- ai/pkg/scenarios/dns/prompt.go | 31 ++++ ai/pkg/scenarios/drops/drops.go | 11 -- ai/pkg/scenarios/drops/prompt.go | 13 ++ 12 files changed, 437 insertions(+), 80 deletions(-) create mode 100644 ai/pkg/chat/prompt.go create mode 100644 ai/pkg/scenarios/dns/prompt.go diff --git a/ai/README.md b/ai/README.md index 5dafe7fb7e..f191bcf11d 100644 --- a/ai/README.md +++ b/ai/README.md @@ -4,7 +4,7 @@ - Change into this *ai/* folder. - `go mod tidy ; go mod vendor` -- Set your `kubeconfigPath` in *main.go*. +- Modify the `defaultConfig` values in *main.go* - If using Azure OpenAI: - Make sure you're logged into your account/subscription in your terminal. - Specify environment variables for Deployment name and Endpoint URL. Get deployment from e.g. [https://oai.azure.com/portal/deployment](https://oai.azure.com/portal/deployment) and Endpoint from e.g. Deployment > Playground > Code. diff --git a/ai/main.go b/ai/main.go index d891c05cc3..dec87bedd5 100644 --- a/ai/main.go +++ b/ai/main.go @@ -1,47 +1,108 @@ package main import ( + "fmt" + "os/user" + "github.com/microsoft/retina/ai/pkg/chat" "github.com/microsoft/retina/ai/pkg/lm" + "github.com/microsoft/retina/ai/pkg/scenarios" + "github.com/microsoft/retina/ai/pkg/scenarios/drops" "github.com/sirupsen/logrus" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" ) -const kubeconfigPath = "/home/hunter/.kube/config" +// TODO incorporate this code into a CLI tool someday + +type config struct { + // currently supports "echo" or "AOAI" + model string + + // optional. defaults to ~/.kube/config + kubeconfigPath string + + // retrieved flows are currently written to ./flows.json + useFlowsFromFile bool -// const kubeconfigPath = "C:\\Users\\hgregory\\.kube\\config" + // eventually, the below should be optional once user input is implemented + question string + history lm.ChatHistory + + // eventually, the below should be optional once scenario selection is implemented + scenario *scenarios.Definition + parameters map[string]string +} + +var defaultConfig = &config{ + model: "echo", // echo or AOAI + useFlowsFromFile: false, + question: "What's wrong with my app?", + history: nil, + scenario: drops.Definition, // drops.Definition or dns.Definition + parameters: map[string]string{ + scenarios.Namespace1.Name: "default", + // scenarios.PodPrefix1.Name: "toolbox-pod", + // scenarios.Namespace2.Name: "default", + // scenarios.PodPrefix2.Name: "toolbox-pod", + // dns.DNSQuery.Name: "google.com", + // scenarios.Nodes.Name: "[node1,node2]", + }, +} func main() { + run(defaultConfig) +} + +func run(cfg *config) { log := logrus.New() // log.SetLevel(logrus.DebugLevel) log.Info("starting app...") // retrieve configs - config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) + if cfg.kubeconfigPath == "" { + usr, err := user.Current() + if err != nil { + log.WithError(err).Fatal("failed to get current user") + } + cfg.kubeconfigPath = usr.HomeDir + "/.kube/config" + } + + kconfig, err := clientcmd.BuildConfigFromFlags("", cfg.kubeconfigPath) if err != nil { log.WithError(err).Fatal("failed to get kubeconfig") } - clientset, err := kubernetes.NewForConfig(config) + clientset, err := kubernetes.NewForConfig(kconfig) if err != nil { log.WithError(err).Fatal("failed to create clientset") } log.Info("retrieved kubeconfig and clientset") // configure LM (language model) - // model := lm.NewEchoModel() - // log.Info("initialized echo model") - model, err := lm.NewAzureOpenAI() - if err != nil { - log.WithError(err).Fatal("failed to create Azure OpenAI model") + var model lm.Model + switch cfg.model { + case "echo": + model = lm.NewEchoModel() + log.Info("initialized echo model") + case "AOAI": + model, err = lm.NewAzureOpenAI() + if err != nil { + log.WithError(err).Fatal("failed to create Azure OpenAI model") + } + log.Info("initialized Azure OpenAI model") + default: + log.Fatalf("unsupported model: %s", cfg.model) } - log.Info("initialized Azure OpenAI model") - bot := chat.NewBot(log, config, clientset, model) - if err := bot.Loop(); err != nil { - log.WithError(err).Fatal("error running chat loop") + bot := chat.NewBot(log, kconfig, clientset, model, cfg.useFlowsFromFile) + newHistory, err := bot.HandleScenario(cfg.question, cfg.history, cfg.scenario, cfg.parameters) + if err != nil { + log.WithError(err).Fatal("error handling scenario") } + + log.Info("handled scenario") + fmt.Println(newHistory[len(newHistory)-1].Assistant) } diff --git a/ai/pkg/chat/chat.go b/ai/pkg/chat/chat.go index 55e2d5daa1..6ab39480e7 100644 --- a/ai/pkg/chat/chat.go +++ b/ai/pkg/chat/chat.go @@ -7,41 +7,67 @@ import ( "github.com/microsoft/retina/ai/pkg/lm" flowretrieval "github.com/microsoft/retina/ai/pkg/retrieval/flows" "github.com/microsoft/retina/ai/pkg/scenarios" - "github.com/microsoft/retina/ai/pkg/scenarios/dns" - "github.com/microsoft/retina/ai/pkg/scenarios/drops" "github.com/sirupsen/logrus" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) -var ( - definitions = []*scenarios.Definition{ - drops.Definition, - dns.Definition, - } -) - type Bot struct { - log logrus.FieldLogger - config *rest.Config - clientset *kubernetes.Clientset - model lm.Model + log logrus.FieldLogger + config *rest.Config + clientset *kubernetes.Clientset + model lm.Model + flowRetriever *flowretrieval.Retriever } // input log, config, clientset, model -func NewBot(log logrus.FieldLogger, config *rest.Config, clientset *kubernetes.Clientset, model lm.Model) *Bot { - return &Bot{ - log: log.WithField("component", "chat"), - config: config, - clientset: clientset, - model: model, +func NewBot(log logrus.FieldLogger, config *rest.Config, clientset *kubernetes.Clientset, model lm.Model, useFlowsFromFile bool) *Bot { + b := &Bot{ + log: log.WithField("component", "chat"), + config: config, + clientset: clientset, + model: model, + flowRetriever: flowretrieval.NewRetriever(log, config, clientset), + } + + if useFlowsFromFile { + b.flowRetriever.UseFile() } + + return b } +func (b *Bot) HandleScenario(question string, history lm.ChatHistory, definition *scenarios.Definition, parameters map[string]string) (lm.ChatHistory, error) { + if definition == nil { + return history, fmt.Errorf("no scenario selected") + } + + cfg := &scenarios.Config{ + Log: b.log, + Config: b.config, + Clientset: b.clientset, + Model: b.model, + FlowRetriever: b.flowRetriever, + } + + ctx := context.TODO() + response, err := definition.Handle(ctx, cfg, parameters, question, history) + if err != nil { + return history, fmt.Errorf("error handling scenario: %w", err) + } + + history = append(history, lm.MessagePair{ + User: question, + Assistant: response, + }) + + return history, nil +} + +// FIXME get user input and implement scenario selection func (b *Bot) Loop() error { var history lm.ChatHistory - flowRetriever := flowretrieval.NewRetriever(b.log, b.config, b.clientset) for { // TODO get user input @@ -53,39 +79,26 @@ func (b *Bot) Loop() error { return fmt.Errorf("error selecting scenario: %w", err) } - // cfg.FlowRetriever.UseFile() - - cfg := &scenarios.Config{ - Log: b.log, - Config: b.config, - Clientset: b.clientset, - Model: b.model, - FlowRetriever: flowRetriever, - } - - ctx := context.TODO() - response, err := definition.Handle(ctx, cfg, params, question, history) + newHistory, err := b.HandleScenario(question, history, definition, params) if err != nil { return fmt.Errorf("error handling scenario: %w", err) } - fmt.Println(response) + fmt.Println(newHistory[len(newHistory)-1].Assistant) - // TODO keep chat loop going - break + history = newHistory } - - return nil } +// FIXME fix prompts func (b *Bot) selectScenario(question string, history lm.ChatHistory) (*scenarios.Definition, map[string]string, error) { - // TODO use chat interface - // FIXME hard-coding the scenario and params for now - d := definitions[0] - params := map[string]string{ - scenarios.Namespace1.Name: "default", - scenarios.Namespace2.Name: "default", + ctx := context.TODO() + response, err := b.model.Generate(ctx, selectionSystemPrompt, nil, selectionPrompt(question, history)) + if err != nil { + return nil, nil, fmt.Errorf("error generating response: %w", err) } - return d, params, nil + // TODO parse response and return scenario definition and parameters + _ = response + return nil, nil, nil } diff --git a/ai/pkg/chat/prompt.go b/ai/pkg/chat/prompt.go new file mode 100644 index 0000000000..527a31a81e --- /dev/null +++ b/ai/pkg/chat/prompt.go @@ -0,0 +1,30 @@ +package chat + +import ( + "fmt" + "strings" + + "github.com/microsoft/retina/ai/pkg/lm" + "github.com/microsoft/retina/ai/pkg/scenarios" + "github.com/microsoft/retina/ai/pkg/scenarios/dns" + "github.com/microsoft/retina/ai/pkg/scenarios/drops" +) + +const selectionSystemPrompt = "Select a scenario" + +var ( + definitions = []*scenarios.Definition{ + drops.Definition, + dns.Definition, + } +) + +func selectionPrompt(question string, history lm.ChatHistory) string { + // TODO include parameters etc. and reference the user chat as context + var sb strings.Builder + sb.WriteString("Select a scenario:\n") + for i, d := range definitions { + sb.WriteString(fmt.Sprintf("%d. %s\n", i+1, d.Name)) + } + return sb.String() +} diff --git a/ai/pkg/parse/flows/parser.go b/ai/pkg/parse/flows/parser.go index bea8789791..8b885a9a31 100644 --- a/ai/pkg/parse/flows/parser.go +++ b/ai/pkg/parse/flows/parser.go @@ -34,6 +34,10 @@ func (p *Parser) Parse(flows []*flowpb.Flow) { } func (p *Parser) addFlow(f *flowpb.Flow) error { + if f == nil { + return nil + } + src := f.GetSource() dst := f.GetDestination() if src == nil || dst == nil { diff --git a/ai/pkg/retrieval/flows/client/client.go b/ai/pkg/retrieval/flows/client/client.go index f0137dadc2..eb8b2d14c7 100644 --- a/ai/pkg/retrieval/flows/client/client.go +++ b/ai/pkg/retrieval/flows/client/client.go @@ -31,7 +31,7 @@ func New() (*Client, error) { tlsDialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) // FIXME make address part of a config - addr := ":5555" + addr := ":5557" connection, err := grpc.NewClient(addr, tlsDialOption, connectDialOption) if err != nil { return nil, fmt.Errorf("failed to dial %s: %w", addr, err) diff --git a/ai/pkg/retrieval/flows/retriever.go b/ai/pkg/retrieval/flows/retriever.go index bb1cb18191..4340185a5f 100644 --- a/ai/pkg/retrieval/flows/retriever.go +++ b/ai/pkg/retrieval/flows/retriever.go @@ -83,13 +83,13 @@ func (r *Retriever) Observe(ctx context.Context, req *observerpb.GetFlowsRequest defer portForwardCancel() // FIXME make ports part of a config - cmd := exec.CommandContext(portForwardCtx, "kubectl", "port-forward", "-n", "kube-system", "svc/hubble-relay", "5555:80") + cmd := exec.CommandContext(portForwardCtx, "kubectl", "port-forward", "-n", "kube-system", "svc/hubble-relay", "5557:80") if err := cmd.Start(); err != nil { return nil, fmt.Errorf("failed to start port-forward. %v", err) } // observe flows - observeCtx, observeCancel := context.WithTimeout(ctx, 30*time.Second) + observeCtx, observeCancel := context.WithTimeout(ctx, 15*time.Second) defer observeCancel() maxFlows := req.Number @@ -120,18 +120,25 @@ func (r *Retriever) observeFlowsGRPC(ctx context.Context, req *observerpb.GetFlo } r.flows = make([]*flowpb.Flow, 0) + var errReceiving error for { select { case <-ctx.Done(): r.log.Info("context cancelled") return r.flows, nil default: + if errReceiving != nil { + // error receiving and context not done + // TODO handle error instead of returning error + return nil, fmt.Errorf("failed to receive flow. %v", err) + } + r.log.WithField("flowCount", len(r.flows)).Debug("processing flow") getFlowResponse, err := stream.Recv() if err != nil { - // TODO handle error instead of returning error - return nil, fmt.Errorf("failed to receive flow. %v", err) + errReceiving = err + continue } f := getFlowResponse.GetFlow() @@ -150,7 +157,7 @@ func (r *Retriever) observeFlowsGRPC(ctx context.Context, req *observerpb.GetFlo // handleFlow logic is inspired by a snippet from Hubble UI // https://github.com/cilium/hubble-ui/blob/a06e19ba65299c63a58034a360aeedde9266ec01/backend/internal/flow_stream/flow_stream.go#L360-L395 func (r *Retriever) handleFlow(f *flowpb.Flow) { - if f.GetL4() == nil || f.GetSource() == nil || f.GetDestination() == nil { + if (f.GetL7() == nil && f.GetL4() == nil) || f.GetSource() == nil || f.GetDestination() == nil { return } diff --git a/ai/pkg/scenarios/common.go b/ai/pkg/scenarios/common.go index 4ec8713f9c..6f458a7132 100644 --- a/ai/pkg/scenarios/common.go +++ b/ai/pkg/scenarios/common.go @@ -27,7 +27,7 @@ var ( Name: "namespace2", DataType: "string", Description: "Namespace 2", - Optional: false, + Optional: true, Regex: k8sNameRegex, } diff --git a/ai/pkg/scenarios/dns/dns.go b/ai/pkg/scenarios/dns/dns.go index 36ab7424a0..02efeca0ea 100644 --- a/ai/pkg/scenarios/dns/dns.go +++ b/ai/pkg/scenarios/dns/dns.go @@ -2,15 +2,22 @@ package dns import ( "context" + "fmt" + "time" "github.com/microsoft/retina/ai/pkg/lm" + flowparsing "github.com/microsoft/retina/ai/pkg/parse/flows" + flowretrieval "github.com/microsoft/retina/ai/pkg/retrieval/flows" "github.com/microsoft/retina/ai/pkg/scenarios" + + flowpb "github.com/cilium/cilium/api/v1/flow" + observerpb "github.com/cilium/cilium/api/v1/observer" ) var ( Definition = scenarios.NewDefinition("DNS", "DNS", parameterSpecs, &handler{}) - dnsQuery = &scenarios.ParameterSpec{ + DNSQuery = &scenarios.ParameterSpec{ Name: "dnsQuery", DataType: "string", Description: "DNS query", @@ -19,21 +26,223 @@ var ( parameterSpecs = []*scenarios.ParameterSpec{ scenarios.Namespace1, + scenarios.PodPrefix1, scenarios.Namespace2, - dnsQuery, + scenarios.PodPrefix2, + scenarios.Nodes, + DNSQuery, } ) // mirrored with parameterSpecs type params struct { Namespace1 string + PodPrefix1 string Namespace2 string + PodPrefix2 string + Nodes []string DNSQuery string } type handler struct{} func (h *handler) Handle(ctx context.Context, cfg *scenarios.Config, typedParams map[string]any, question string, history lm.ChatHistory) (string, error) { - // TODO - return "", nil + l := cfg.Log.WithField("scenario", "drops") + l.Info("handling drops scenario...") + + if err := cfg.FlowRetriever.Init(); err != nil { + return "", fmt.Errorf("error initializing flow retriever: %w", err) + } + + params := ¶ms{ + Namespace1: anyToString(typedParams[scenarios.Namespace1.Name]), + PodPrefix1: anyToString(typedParams[scenarios.PodPrefix1.Name]), + Namespace2: anyToString(typedParams[scenarios.Namespace2.Name]), + PodPrefix2: anyToString(typedParams[scenarios.PodPrefix2.Name]), + Nodes: anyToStringSlice(typedParams[scenarios.Nodes.Name]), + } + + req := flowsRequest(params) + flows, err := cfg.FlowRetriever.Observe(ctx, req) + if err != nil { + return "", fmt.Errorf("error observing flows: %w", err) + } + l.Info("observed flows") + + // analyze flows + p := flowparsing.NewParser(l) + p.Parse(flows) + connections := p.Connections() + + formattedFlowLogs := formatFlowLogs(connections) + + message := fmt.Sprintf(messagePromptTemplate, question, formattedFlowLogs) + analyzeCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + resp, err := cfg.Model.Generate(analyzeCtx, systemPrompt, history, message) + if err != nil { + return "", fmt.Errorf("error analyzing flows: %w", err) + } + l.Info("analyzed flows") + + return resp, nil +} + +// cast to string without nil panics +func anyToString(a any) string { + if a == nil { + return "" + } + return a.(string) +} + +// cast to []string without nil panics +func anyToStringSlice(a any) []string { + if a == nil { + return nil + } + return a.([]string) +} + +// TODO handle dnsQuery param +func flowsRequest(params *params) *observerpb.GetFlowsRequest { + req := &observerpb.GetFlowsRequest{ + Number: flowretrieval.MaxFlowsFromHubbleRelay, + Follow: true, + } + + protocol := []string{"DNS"} + + if params.Namespace1 == "" && params.PodPrefix1 == "" && params.Namespace2 == "" && params.PodPrefix2 == "" { + req.Whitelist = []*flowpb.FlowFilter{ + { + NodeName: params.Nodes, + Protocol: protocol, + }, + } + + return req + } + + var prefix1 []string + if params.Namespace1 != "" || params.PodPrefix1 != "" { + prefix1 = append(prefix1, fmt.Sprintf("%s/%s", params.Namespace1, params.PodPrefix1)) + } + + var prefix2 []string + if params.Namespace2 != "" || params.PodPrefix2 != "" { + prefix2 = append(prefix2, fmt.Sprintf("%s/%s", params.Namespace2, params.PodPrefix2)) + } + + filterDirection1 := &flowpb.FlowFilter{ + NodeName: params.Nodes, + SourcePod: prefix1, + DestinationPod: prefix2, + Protocol: protocol, + } + + filterDirection2 := &flowpb.FlowFilter{ + NodeName: params.Nodes, + SourcePod: prefix2, + DestinationPod: prefix1, + Protocol: protocol, + } + + req.Whitelist = []*flowpb.FlowFilter{ + filterDirection1, + filterDirection2, + } + + return req +} + +func formatFlowLogs(connections flowparsing.Connections) string { + requestsWithoutResponse := make([]string, 0) + successfulResponses := make([]string, 0) + failedResponses := make([]string, 0) + for _, conn := range connections { + requests := make(map[string]struct{}) + responses := make(map[string]uint32) + for _, f := range conn.Flows { + if f.GetL7().GetDns() == nil { + continue + } + + dnsType := f.GetL7().Type.String() + + query := f.GetL7().GetDns().GetQuery() + switch dnsType { + case "REQUEST": + requests[query] = struct{}{} + case "RESPONSE": + responses[query] = f.GetL7().GetDns().GetRcode() + } + } + + for q := range requests { + if _, ok := responses[q]; !ok { + line := fmt.Sprintf("Pods: %s. query: %s", conn.Key, q) + requestsWithoutResponse = append(requestsWithoutResponse, line) + } + } + + for q, rcode := range responses { + if rcode == 0 { + line := fmt.Sprintf("Pods: %s. query: %s", conn.Key, q) + successfulResponses = append(successfulResponses, line) + } else { + line := fmt.Sprintf("Pods: %s. query: %s. error: %s", conn.Key, q, rcodeToErrorName(rcode)) + failedResponses = append(failedResponses, line) + } + } + } + + return fmt.Sprintf("SUCCESSFUL RESPONSES:\n%v\n\nRESPONSES WITH ERRORS:\n%v\n\nREQUESTS WITHOUT RESPONSES:\n%v", successfulResponses, failedResponses, requestsWithoutResponse) +} + +func rcodeToErrorName(rcode uint32) string { + switch rcode { + case 0: + return "NoError" + case 1: + return "FormErr" + case 2: + return "ServFail" + case 3: + return "NXDomain" + case 4: + return "NotImp" + case 5: + return "Refused" + case 6: + return "YXDomain" + case 7: + return "YXRRSet" + case 8: + return "NXRRSet" + case 9: + return "NotAuth" + case 10: + return "NotZone" + case 11: + return "DSOTYPENI" + case 16: + return "BADVERS/BADSIG" + case 17: + return "BADKEY" + case 18: + return "BADTIME" + case 19: + return "BADMODE" + case 20: + return "BADNAME" + case 21: + return "BADALG" + case 22: + return "BADTRUNC" + case 23: + return "BADCOOKIE" + default: + return "Unknown" + } } diff --git a/ai/pkg/scenarios/dns/prompt.go b/ai/pkg/scenarios/dns/prompt.go new file mode 100644 index 0000000000..c915cac6a2 --- /dev/null +++ b/ai/pkg/scenarios/dns/prompt.go @@ -0,0 +1,31 @@ +package dns + +// TODO implement below analysis logic in code and/or LM prompt + +/* + DNS ANALYSIS LOGIC + + Primary questions: + - Do any queries have failing responses? Which? + - Do any queries have no responses? Which? + - Which Pods are impacted by above? Which are not? + - Which core-dns Pods are impacted by above? Which are not? + - Is "reserved:world" responding with errors or responding at all? + + More questions: + - What kind of queries do we see (qualitatively)? + - Do we see any issue by DNS record type? + - Does number of IPs matter?? +*/ + +const ( + systemPrompt = `You are an assistant with expertise in Kubernetes Networking. The user is debugging networking issues on their Pods and/or Nodes. Provide a succinct summary identifying any issues in the "summary of network flow logs" provided by the user.` + + // first parameter is the user's question + // second parameter is the user's network flow logs + messagePromptTemplate = `%s + +"summary of network flow logs": +%s +` +) diff --git a/ai/pkg/scenarios/drops/drops.go b/ai/pkg/scenarios/drops/drops.go index 8373748508..9fc303ef09 100644 --- a/ai/pkg/scenarios/drops/drops.go +++ b/ai/pkg/scenarios/drops/drops.go @@ -177,17 +177,6 @@ func flowsRequest(params *params) *observerpb.GetFlowsRequest { // filterPod1ToIP, } - req.Whitelist = nil - - req.Blacklist = []*flowpb.FlowFilter{ - { - SourcePod: []string{"kube-system/"}, - }, - { - DestinationPod: []string{"kube-system/"}, - }, - } - return req } diff --git a/ai/pkg/scenarios/drops/prompt.go b/ai/pkg/scenarios/drops/prompt.go index a6d0fd1f03..3ab4fe3b8d 100644 --- a/ai/pkg/scenarios/drops/prompt.go +++ b/ai/pkg/scenarios/drops/prompt.go @@ -1,5 +1,18 @@ package drops +// TODO implement below analysis logic in code and/or LM prompt + +/* + DROPS ANALYSIS LOGIC + + Primary questions: + - Is there dropped traffic? What are the source, destination, protocol, and port? + - Are there TCP SYNs missing SYN-ACKs? What are the source, destination, and port? + - Same for TCP resets. + - Which connections are successful? Compare this to above. + - Are Nodes experiencing NIC issues? +*/ + const ( systemPrompt = `You are an assistant with expertise in Kubernetes Networking. The user is debugging networking issues on their Pods and/or Nodes. Provide a succinct summary identifying any issues in the "summary of network flow logs" provided by the user.`