diff --git a/.gitignore b/.gitignore index 27e2f7832c..630958033a 100644 --- a/.gitignore +++ b/.gitignore @@ -15,7 +15,7 @@ *.out # Dependency directories (remove the comment below to include it) -# vendor/ +vendor/ # Go workspace file go.work diff --git a/ai/README.md b/ai/README.md new file mode 100644 index 0000000000..f191bcf11d --- /dev/null +++ b/ai/README.md @@ -0,0 +1,17 @@ +# Retina AI + +## Usage + +- Change into this *ai/* folder. +- `go mod tidy ; go mod vendor` +- Modify the `defaultConfig` values in *main.go* +- If using Azure OpenAI: + - Make sure you're logged into your account/subscription in your terminal. + - Specify environment variables for Deployment name and Endpoint URL. Get deployment from e.g. [https://oai.azure.com/portal/deployment](https://oai.azure.com/portal/deployment) and Endpoint from e.g. Deployment > Playground > Code. + - Linux: + - `read -p "Enter AOAI_COMPLETIONS_ENDPOINT: " AOAI_COMPLETIONS_ENDPOINT && export AOAI_COMPLETIONS_ENDPOINT=$AOAI_COMPLETIONS_ENDPOINT` + - `read -p "Enter AOAI_DEPLOYMENT_NAME: " AOAI_DEPLOYMENT_NAME && export AOAI_DEPLOYMENT_NAME=$AOAI_DEPLOYMENT_NAME` + - Windows: + - `$env:AOAI_COMPLETIONS_ENDPOINT = Read-Host 'Enter AOAI_COMPLETIONS_ENDPOINT'` + - `$env:AOAI_DEPLOYMENT_NAME = Read-Host 'Enter AOAI_DEPLOYMENT_NAME'` +- `go run main.go` diff --git a/ai/go.mod b/ai/go.mod new file mode 100644 index 0000000000..e8637780d5 --- /dev/null +++ b/ai/go.mod @@ -0,0 +1,61 @@ +module github.com/microsoft/retina/ai + +go 1.22.3 + +require ( + github.com/Azure/azure-sdk-for-go/sdk/ai/azopenai v0.6.0 + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.13.0 + github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 + github.com/cilium/cilium v1.15.7 + github.com/cilium/hubble-ui/backend v0.0.0-20240603143312-a06e19ba6529 + github.com/sirupsen/logrus v1.9.3 + google.golang.org/grpc v1.65.0 + k8s.io/client-go v0.30.3 +) + +require ( + github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect + github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/emicklei/go-restful/v3 v3.12.1 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-openapi/jsonpointer v0.21.0 // indirect + github.com/go-openapi/jsonreference v0.21.0 // indirect + github.com/go-openapi/swag v0.23.0 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang-jwt/jwt/v5 v5.2.1 // indirect + github.com/golang/protobuf v1.5.4 // indirect + github.com/google/gnostic-models v0.6.8 // indirect + github.com/google/gofuzz v1.2.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/imdario/mergo v0.3.16 // indirect + github.com/josharian/intern v1.0.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect + github.com/mailru/easyjson v0.7.7 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect + github.com/spf13/pflag v1.0.6-0.20210604193023-d5e0c0615ace // indirect + golang.org/x/crypto v0.25.0 // indirect + golang.org/x/net v0.27.0 // indirect + golang.org/x/oauth2 v0.20.0 // indirect + golang.org/x/sys v0.22.0 // indirect + golang.org/x/term v0.22.0 // indirect + golang.org/x/text v0.16.0 // indirect + golang.org/x/time v0.5.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect + google.golang.org/protobuf v1.34.1 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/api v0.30.3 // indirect + k8s.io/apimachinery v0.30.3 // indirect + k8s.io/klog/v2 v2.120.1 // indirect + k8s.io/kube-openapi v0.0.0-20240521193020-835d969ad83a // indirect + k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0 // indirect + sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect + sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect + sigs.k8s.io/yaml v1.4.0 // indirect +) diff --git a/ai/go.sum b/ai/go.sum new file mode 100644 index 0000000000..0565622d6b --- /dev/null +++ b/ai/go.sum @@ -0,0 +1,173 @@ +github.com/Azure/azure-sdk-for-go/sdk/ai/azopenai v0.6.0 h1:FQOmDxJj1If0D0khZR00MDa2Eb+k9BBsSaK7cEbLwkk= +github.com/Azure/azure-sdk-for-go/sdk/ai/azopenai v0.6.0/go.mod h1:X0+PSrHOZdTjkiEhgv53HS5gplbzVVl2jd6hQRYSS3c= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.13.0 h1:GJHeeA2N7xrG3q30L2UXDyuWRzDM900/65j70wcM4Ww= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.13.0/go.mod h1:l38EPgmsp71HHLq9j7De57JcKOWPyhrsW1Awm1JS6K0= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 h1:tfLQ34V6F7tVSwoTf/4lH5sE0o6eCJuNDTmH09nDpbc= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0/go.mod h1:9kIvujWAA58nmPmWB1m23fyWic1kYZMxD9CxaWn4Qpg= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xPBn1663uRv2t2q/ESv9seY= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= +github.com/cilium/cilium v1.15.7 h1:7LwGfAW/fR/VFcm6zlESjE2Ut5vJWe+kdWq3RNJrNRc= +github.com/cilium/cilium v1.15.7/go.mod h1:6Ml8eeyWjMJKDeadutWhn5NibMps0H+yLOgfKBoHTUs= +github.com/cilium/hubble-ui/backend v0.0.0-20240603143312-a06e19ba6529 h1:Ilb4HpEErh/A48torRF6lnYVRD18hF6yt4aKuH/DZFo= +github.com/cilium/hubble-ui/backend v0.0.0-20240603143312-a06e19ba6529/go.mod h1:rOV4LLOdYuQK/XPYZyUuDeUgeadVLx/LqmGvYmPWDiY= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/emicklei/go-restful/v3 v3.12.1 h1:PJMDIM/ak7btuL8Ex0iYET9hxM3CI2sjZtzpL63nKAU= +github.com/emicklei/go-restful/v3 v3.12.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ= +github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY= +github.com/go-openapi/jsonreference v0.21.0 h1:Rs+Y7hSXT83Jacb7kFyjn4ijOuVGSvOdF2+tg1TRrwQ= +github.com/go-openapi/jsonreference v0.21.0/go.mod h1:LmZmgsrTkVg9LG4EaHeY8cBDslNPMo06cago5JNLkm4= +github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+GrE= +github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ= +github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= +github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= +github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= +github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg= +github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= +github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= +github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo/v2 v2.17.2 h1:7eMhcy3GimbsA3hEnVKdw/PQM9XN9krpKVXsZdph0/g= +github.com/onsi/ginkgo/v2 v2.17.2/go.mod h1:nP2DPOQoNsQmsVyv5rDA8JkXQoCs6goXIvr/PRJ1eCc= +github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= +github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/spf13/pflag v1.0.6-0.20210604193023-d5e0c0615ace h1:9PNP1jnUjRhfmGMlkXHjYPishpcw4jpSt/V/xYY3FMA= +github.com/spf13/pflag v1.0.6-0.20210604193023-d5e0c0615ace/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= +golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= +golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/oauth2 v0.20.0 h1:4mQdhULixXKP1rwYBW0vAijoXnkTG0BLCDRzfe1idMo= +golang.org/x/oauth2 v0.20.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.22.0 h1:BbsgPEJULsl2fV/AT3v15Mjva5yXKQDyKf+TbDz7QJk= +golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 h1:Zy9XzmMEflZ/MAaA7vNcoebnRAld7FsPW1EeBB7V0m8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= +google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= +google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +k8s.io/api v0.30.3 h1:ImHwK9DCsPA9uoU3rVh4QHAHHK5dTSv1nxJUapx8hoQ= +k8s.io/api v0.30.3/go.mod h1:GPc8jlzoe5JG3pb0KJCSLX5oAFIW3/qNJITlDj8BH04= +k8s.io/apimachinery v0.30.3 h1:q1laaWCmrszyQuSQCfNB8cFgCuDAoPszKY4ucAjDwHc= +k8s.io/apimachinery v0.30.3/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc= +k8s.io/client-go v0.30.3 h1:bHrJu3xQZNXIi8/MoxYtZBBWQQXwy16zqJwloXXfD3k= +k8s.io/client-go v0.30.3/go.mod h1:8d4pf8vYu665/kUbsxWAQ/JDBNWqfFeZnvFiVdmx89U= +k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw= +k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= +k8s.io/kube-openapi v0.0.0-20240521193020-835d969ad83a h1:zD1uj3Jf+mD4zmA7W+goE5TxDkI7OGJjBNBzq5fJtLA= +k8s.io/kube-openapi v0.0.0-20240521193020-835d969ad83a/go.mod h1:UxDHUPsUwTOOxSU+oXURfFBcAS6JwiRXTYqYwfuGowc= +k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0 h1:jgGTlFYnhF1PM1Ax/lAlxUPE+KfCIXHaathvJg1C3ak= +k8s.io/utils v0.0.0-20240502163921-fe8a2dddb1d0/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= +sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= +sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= +sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= +sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= +sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= diff --git a/ai/main.go b/ai/main.go new file mode 100644 index 0000000000..dec87bedd5 --- /dev/null +++ b/ai/main.go @@ -0,0 +1,108 @@ +package main + +import ( + "fmt" + "os/user" + + "github.com/microsoft/retina/ai/pkg/chat" + "github.com/microsoft/retina/ai/pkg/lm" + "github.com/microsoft/retina/ai/pkg/scenarios" + "github.com/microsoft/retina/ai/pkg/scenarios/drops" + + "github.com/sirupsen/logrus" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" +) + +// TODO incorporate this code into a CLI tool someday + +type config struct { + // currently supports "echo" or "AOAI" + model string + + // optional. defaults to ~/.kube/config + kubeconfigPath string + + // retrieved flows are currently written to ./flows.json + useFlowsFromFile bool + + // eventually, the below should be optional once user input is implemented + question string + history lm.ChatHistory + + // eventually, the below should be optional once scenario selection is implemented + scenario *scenarios.Definition + parameters map[string]string +} + +var defaultConfig = &config{ + model: "echo", // echo or AOAI + useFlowsFromFile: false, + question: "What's wrong with my app?", + history: nil, + scenario: drops.Definition, // drops.Definition or dns.Definition + parameters: map[string]string{ + scenarios.Namespace1.Name: "default", + // scenarios.PodPrefix1.Name: "toolbox-pod", + // scenarios.Namespace2.Name: "default", + // scenarios.PodPrefix2.Name: "toolbox-pod", + // dns.DNSQuery.Name: "google.com", + // scenarios.Nodes.Name: "[node1,node2]", + }, +} + +func main() { + run(defaultConfig) +} + +func run(cfg *config) { + log := logrus.New() + // log.SetLevel(logrus.DebugLevel) + + log.Info("starting app...") + + // retrieve configs + if cfg.kubeconfigPath == "" { + usr, err := user.Current() + if err != nil { + log.WithError(err).Fatal("failed to get current user") + } + cfg.kubeconfigPath = usr.HomeDir + "/.kube/config" + } + + kconfig, err := clientcmd.BuildConfigFromFlags("", cfg.kubeconfigPath) + if err != nil { + log.WithError(err).Fatal("failed to get kubeconfig") + } + + clientset, err := kubernetes.NewForConfig(kconfig) + if err != nil { + log.WithError(err).Fatal("failed to create clientset") + } + log.Info("retrieved kubeconfig and clientset") + + // configure LM (language model) + var model lm.Model + switch cfg.model { + case "echo": + model = lm.NewEchoModel() + log.Info("initialized echo model") + case "AOAI": + model, err = lm.NewAzureOpenAI() + if err != nil { + log.WithError(err).Fatal("failed to create Azure OpenAI model") + } + log.Info("initialized Azure OpenAI model") + default: + log.Fatalf("unsupported model: %s", cfg.model) + } + + bot := chat.NewBot(log, kconfig, clientset, model, cfg.useFlowsFromFile) + newHistory, err := bot.HandleScenario(cfg.question, cfg.history, cfg.scenario, cfg.parameters) + if err != nil { + log.WithError(err).Fatal("error handling scenario") + } + + log.Info("handled scenario") + fmt.Println(newHistory[len(newHistory)-1].Assistant) +} diff --git a/ai/pkg/chat/chat.go b/ai/pkg/chat/chat.go new file mode 100644 index 0000000000..6ab39480e7 --- /dev/null +++ b/ai/pkg/chat/chat.go @@ -0,0 +1,104 @@ +package chat + +import ( + "context" + "fmt" + + "github.com/microsoft/retina/ai/pkg/lm" + flowretrieval "github.com/microsoft/retina/ai/pkg/retrieval/flows" + "github.com/microsoft/retina/ai/pkg/scenarios" + + "github.com/sirupsen/logrus" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +type Bot struct { + log logrus.FieldLogger + config *rest.Config + clientset *kubernetes.Clientset + model lm.Model + flowRetriever *flowretrieval.Retriever +} + +// input log, config, clientset, model +func NewBot(log logrus.FieldLogger, config *rest.Config, clientset *kubernetes.Clientset, model lm.Model, useFlowsFromFile bool) *Bot { + b := &Bot{ + log: log.WithField("component", "chat"), + config: config, + clientset: clientset, + model: model, + flowRetriever: flowretrieval.NewRetriever(log, config, clientset), + } + + if useFlowsFromFile { + b.flowRetriever.UseFile() + } + + return b +} + +func (b *Bot) HandleScenario(question string, history lm.ChatHistory, definition *scenarios.Definition, parameters map[string]string) (lm.ChatHistory, error) { + if definition == nil { + return history, fmt.Errorf("no scenario selected") + } + + cfg := &scenarios.Config{ + Log: b.log, + Config: b.config, + Clientset: b.clientset, + Model: b.model, + FlowRetriever: b.flowRetriever, + } + + ctx := context.TODO() + response, err := definition.Handle(ctx, cfg, parameters, question, history) + if err != nil { + return history, fmt.Errorf("error handling scenario: %w", err) + } + + history = append(history, lm.MessagePair{ + User: question, + Assistant: response, + }) + + return history, nil +} + +// FIXME get user input and implement scenario selection +func (b *Bot) Loop() error { + var history lm.ChatHistory + + for { + // TODO get user input + question := "what's wrong with my app?" + + // select scenario and get parameters + definition, params, err := b.selectScenario(question, history) + if err != nil { + return fmt.Errorf("error selecting scenario: %w", err) + } + + newHistory, err := b.HandleScenario(question, history, definition, params) + if err != nil { + return fmt.Errorf("error handling scenario: %w", err) + } + + fmt.Println(newHistory[len(newHistory)-1].Assistant) + + history = newHistory + } +} + +// FIXME fix prompts +func (b *Bot) selectScenario(question string, history lm.ChatHistory) (*scenarios.Definition, map[string]string, error) { + ctx := context.TODO() + response, err := b.model.Generate(ctx, selectionSystemPrompt, nil, selectionPrompt(question, history)) + if err != nil { + return nil, nil, fmt.Errorf("error generating response: %w", err) + } + + // TODO parse response and return scenario definition and parameters + _ = response + return nil, nil, nil +} diff --git a/ai/pkg/chat/prompt.go b/ai/pkg/chat/prompt.go new file mode 100644 index 0000000000..527a31a81e --- /dev/null +++ b/ai/pkg/chat/prompt.go @@ -0,0 +1,30 @@ +package chat + +import ( + "fmt" + "strings" + + "github.com/microsoft/retina/ai/pkg/lm" + "github.com/microsoft/retina/ai/pkg/scenarios" + "github.com/microsoft/retina/ai/pkg/scenarios/dns" + "github.com/microsoft/retina/ai/pkg/scenarios/drops" +) + +const selectionSystemPrompt = "Select a scenario" + +var ( + definitions = []*scenarios.Definition{ + drops.Definition, + dns.Definition, + } +) + +func selectionPrompt(question string, history lm.ChatHistory) string { + // TODO include parameters etc. and reference the user chat as context + var sb strings.Builder + sb.WriteString("Select a scenario:\n") + for i, d := range definitions { + sb.WriteString(fmt.Sprintf("%d. %s\n", i+1, d.Name)) + } + return sb.String() +} diff --git a/ai/pkg/lm/azure-openai.go b/ai/pkg/lm/azure-openai.go new file mode 100644 index 0000000000..0fd846c205 --- /dev/null +++ b/ai/pkg/lm/azure-openai.go @@ -0,0 +1,102 @@ +package lm + +import ( + "context" + "fmt" + "os" + "regexp" + + "github.com/Azure/azure-sdk-for-go/sdk/ai/azopenai" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/azidentity" +) + +const ( + endpointPattern = `^https://[a-zA-Z0-9-]+\.openai\.azure\.com/?$` + deploymentPattern = `^[a-zA-Z0-9-]+$` +) + +var ( + endpointRegex = regexp.MustCompile(endpointPattern) + deploymentRegex = regexp.MustCompile(deploymentPattern) + ErrNoCompletions = fmt.Errorf("no completions returned") + ErrNoMessage = fmt.Errorf("no message included in completion") +) + +type AzureOpenAI struct { + modelDeployment string + client *azopenai.Client +} + +func NewAzureOpenAI() (*AzureOpenAI, error) { + aoai := &AzureOpenAI{} + + // Ex: "https://.openai.azure.com" + azureOpenAIEndpoint := os.Getenv("AOAI_COMPLETIONS_ENDPOINT") + if azureOpenAIEndpoint == "" { + return nil, fmt.Errorf("set endpoint with environment variable AOAI_COMPLETIONS_ENDPOINT") + } + if !endpointRegex.MatchString(azureOpenAIEndpoint) { + return nil, fmt.Errorf("invalid Azure OpenAI endpoint. must follow pattern: %s", endpointPattern) + } + + modelDeployment := os.Getenv("AOAI_DEPLOYMENT_NAME") + if modelDeployment == "" { + return nil, fmt.Errorf("set model deployment name with environment variable AOAI_DEPLOYMENT_NAME") + } + if !deploymentRegex.MatchString(modelDeployment) { + return nil, fmt.Errorf("invalid Azure OpenAI deployment name. must follow pattern: %s", deploymentPattern) + } + aoai.modelDeployment = modelDeployment + + cred, err := azidentity.NewDefaultAzureCredential(nil) + if err != nil { + return nil, fmt.Errorf("failed to get Azure credentials: %w", err) + } + + // NOTE: this constructor creates a client that connects to an Azure OpenAI endpoint. + // To connect to the public OpenAI endpoint, use azopenai.NewClientForOpenAI (requires an OpenAI API key). + client, err := azopenai.NewClient(azureOpenAIEndpoint, cred, nil) + if err != nil { + return nil, fmt.Errorf("failed to create Azure OpenAI client: %w", err) + } + aoai.client = client + + return aoai, nil +} + +func (m *AzureOpenAI) Generate(ctx context.Context, systemPrompt string, history ChatHistory, message string) (string, error) { + messages := []azopenai.ChatRequestMessageClassification{ + &azopenai.ChatRequestSystemMessage{Content: to.Ptr(systemPrompt)}, + } + for _, pair := range history { + messages = append(messages, &azopenai.ChatRequestUserMessage{Content: azopenai.NewChatRequestUserMessageContent(pair.User)}) + messages = append(messages, &azopenai.ChatRequestAssistantMessage{Content: to.Ptr(pair.Assistant)}) + } + messages = append(messages, &azopenai.ChatRequestUserMessage{Content: azopenai.NewChatRequestUserMessageContent(message)}) + + chatOptions := azopenai.ChatCompletionsOptions{ + Messages: messages, + MaxTokens: to.Ptr(int32(2048)), + N: to.Ptr(int32(1)), + Temperature: to.Ptr(float32(0.0)), + DeploymentName: &m.modelDeployment, + } + resp, err := m.client.GetChatCompletions(ctx, chatOptions, nil) + + if err != nil { + return "", fmt.Errorf("failed to get completions: %w", err) + } + + if len(resp.Choices) == 0 { + return "", ErrNoCompletions + } + + choice := resp.Choices[0] + if choice.Message == nil || choice.Message.Content == nil { + return "", ErrNoMessage + } + + // TODO check ContentFilterResultsForChoice. And CompletionsFinishReason? + return *choice.Message.Content, nil +} diff --git a/ai/pkg/lm/echo.go b/ai/pkg/lm/echo.go new file mode 100644 index 0000000000..e4c1143212 --- /dev/null +++ b/ai/pkg/lm/echo.go @@ -0,0 +1,23 @@ +package lm + +import ( + "context" + "fmt" + "strings" +) + +// EchoModel is a mock model that echoes the prompt back +type EchoModel struct{} + +func NewEchoModel() *EchoModel { + return &EchoModel{} +} + +func (m *EchoModel) Generate(ctx context.Context, systemPrompt string, history ChatHistory, message string) (string, error) { + chatStrings := make([]string, 0, len(history)) + for _, pair := range history { + chatStrings = append(chatStrings, fmt.Sprintf("USER: %s\nASSISTANT: %s\n", pair.User, pair.Assistant)) + } + resp := fmt.Sprintf("systemPrompt: %s\nhistory: %s\nmessage: %s", systemPrompt, strings.Join(chatStrings, "\n"), message) + return resp, nil +} diff --git a/ai/pkg/lm/model.go b/ai/pkg/lm/model.go new file mode 100644 index 0000000000..7e9eaca3d9 --- /dev/null +++ b/ai/pkg/lm/model.go @@ -0,0 +1,14 @@ +package lm + +import "context" + +type MessagePair struct { + User string + Assistant string +} + +type ChatHistory []MessagePair + +type Model interface { + Generate(ctx context.Context, systemPrompt string, history ChatHistory, message string) (string, error) +} diff --git a/ai/pkg/parse/flows/parser.go b/ai/pkg/parse/flows/parser.go new file mode 100644 index 0000000000..8b885a9a31 --- /dev/null +++ b/ai/pkg/parse/flows/parser.go @@ -0,0 +1,94 @@ +package flows + +import ( + "fmt" + "sort" + + flowpb "github.com/cilium/cilium/api/v1/flow" + "github.com/sirupsen/logrus" +) + +type Parser struct { + log logrus.FieldLogger + connections Connections +} + +func NewParser(log logrus.FieldLogger) *Parser { + return &Parser{ + log: log.WithField("component", "flow-parser"), + connections: make(map[string]*Connection), + } +} + +func (p *Parser) Connections() Connections { + return p.connections +} + +func (p *Parser) Parse(flows []*flowpb.Flow) { + for _, flow := range flows { + err := p.addFlow(flow) + if err != nil { + p.log.WithError(err).WithField("flow", flow).Error("failed to add flow") + } + } +} + +func (p *Parser) addFlow(f *flowpb.Flow) error { + if f == nil { + return nil + } + + src := f.GetSource() + dst := f.GetDestination() + if src == nil || dst == nil { + // empty flow. ignore + return nil + } + + srcName, err := endpointName(src) + if err != nil { + return fmt.Errorf("error getting source name: %w", err) + } + + dstName, err := endpointName(dst) + if err != nil { + return fmt.Errorf("error getting destination name: %w", err) + } + + // Ensure pod1 is alphabetically before pod2 + pods := []string{srcName, dstName} + sort.Strings(pods) + pod1, pod2 := pods[0], pods[1] + key := pod1 + "#" + pod2 + + conn, exists := p.connections[key] + if !exists { + conn = &Connection{ + Pod1: pod1, + Pod2: pod2, + Key: key, + Flows: []*flowpb.Flow{}, + } + p.connections[key] = conn + } + + conn.Flows = append(conn.Flows, f) + return nil +} + +func endpointName(ep *flowpb.Endpoint) (string, error) { + name := ep.GetPodName() + if name != "" { + return name, nil + } + + lbls := ep.GetLabels() + if len(lbls) == 0 { + return "", ErrNoEndpointName + } + // should be a reserved label like: + // reserved:world + // reserved:host + // reserved:kube-apiserver + return lbls[0], nil +} diff --git a/ai/pkg/parse/flows/types.go b/ai/pkg/parse/flows/types.go new file mode 100644 index 0000000000..02f6a8e833 --- /dev/null +++ b/ai/pkg/parse/flows/types.go @@ -0,0 +1,149 @@ +package flows + +import ( + "errors" + + flowpb "github.com/cilium/cilium/api/v1/flow" +) + +var ( + ErrNoEndpointName = errors.New("no endpoint name") + ErrNilEndpoint = errors.New("nil endpoint") +) + +type Connection struct { + Pod1 string + Pod2 string + Key string + + // UDP *UdpSummary + // TCP *TcpSummary + Flows []*flowpb.Flow +} + +type Connections map[string]*Connection + +// func + +// type UdpSummary struct { +// MinLatency time.Duration +// MaxLatency time.Duration +// AvgLatency time.Duration +// TotalPackets int +// TotalBytes int +// } + +// type TcpSummary struct { +// MinLatency time.Duration +// MaxLatency time.Duration +// AvgLatency time.Duration +// TotalPackets int +// TotalBytes int +// *TcpFlagSummary +// } + +// type TcpFlagSummary struct { +// SynCount int +// AckCount int +// SynAckCount int +// FinCount int +// RstCount int +// } + +// type FlowSummary map[string]*Connection + +// func (fs FlowSummary) Aggregate() { +// for _, conn := range fs { +// udpTimestamps := make(map[string][]*timestamppb.Timestamp) +// tcpTimestamps := make(map[string][]*timestamppb.Timestamp) +// for _, f := range conn.Flows { +// l4 := f.GetL4() +// if l4 == nil { +// continue +// } + +// udp := l4.GetUDP() +// if udp != nil { +// if conn.UDP == nil { +// conn.UDP = &UdpSummary{} +// } + +// conn.UDP.TotalPackets += 1 + +// src, err := endpointName(f.GetSource()) +// if err != nil { +// // FIXME warn and continue +// log.Fatalf("bad src endpoint while aggregating: %w", err) +// } +// dst, err := endpointName(f.GetDestination()) +// if err != nil { +// // FIXME warn and continue +// log.Fatalf("bad dst endpoint while aggregating: %w", err) +// } + +// tuple := fmt.Sprintf("%s:%d -> %s:%d", src, udp.GetSourcePort(), dst, udp.GetDestinationPort()) + +// time := f.GetTime() +// if time == nil { +// // FIXME warn and continue +// log.Fatalf("nil time while aggregating") +// } + +// udpTimestamps[tuple] = append(udpTimestamps[tuple], f.GetTime()) +// } + +// tcp := l4.GetTCP() +// if tcp != nil { +// if conn.TCP == nil { +// conn.TCP = &TcpSummary{} +// } + +// conn.TCP.TotalPackets += 1 + +// if conn.TCP.TcpFlagSummary == nil { +// conn.TCP.TcpFlagSummary = &TcpFlagSummary{} +// } + +// flags := tcp.GetFlags() +// if flags == nil { +// // FIXME warn and continue +// log.Fatalf("nil flags while aggregating") +// } + +// switch { +// case flags.SYN && flags.ACK: +// conn.TCP.TcpFlagSummary.SynAckCount += 1 +// case flags.SYN: +// conn.TCP.TcpFlagSummary.SynCount += 1 +// case flags.ACK: +// conn.TCP.TcpFlagSummary.AckCount += 1 +// case flags.FIN: +// conn.TCP.TcpFlagSummary.FinCount += 1 +// case flags.RST: +// conn.TCP.TcpFlagSummary.RstCount += 1 +// } + +// src, err := endpointName(f.GetSource()) +// if err != nil { +// // FIXME warn and continue +// log.Fatalf("bad src endpoint while aggregating: %w", err) +// } +// dst, err := endpointName(f.GetDestination()) +// if err != nil { +// // FIXME warn and continue +// log.Fatalf("bad dst endpoint while aggregating: %w", err) +// } + +// tuple := fmt.Sprintf("%s:%d -> %s:%d", src, udp.GetSourcePort(), dst, udp.GetDestinationPort()) + +// time := f.GetTime() +// if time == nil { +// // FIXME warn and continue +// log.Fatalf("nil time while aggregating") +// } + +// tcpTimestamps[tuple] = append(tcpTimestamps[tuple], f.GetTime()) +// } +// } +// } +// } diff --git a/ai/pkg/retrieval/flows/client/client.go b/ai/pkg/retrieval/flows/client/client.go new file mode 100644 index 0000000000..eb8b2d14c7 --- /dev/null +++ b/ai/pkg/retrieval/flows/client/client.go @@ -0,0 +1,44 @@ +package client + +import ( + "fmt" + "time" + + observerv1 "github.com/cilium/cilium/api/v1/observer" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" + "google.golang.org/grpc/credentials/insecure" +) + +type Client struct { + observerv1.ObserverClient +} + +func New() (*Client, error) { + // TODO rethink the dial opts + // starting with opts seen at https://github.com/cilium/hubble-ui/blob/a06e19ba65299c63a58034a360aeedde9266ec01/backend/internal/relay_client/connection_props.go#L34-L70 + connectParams := grpc.ConnectParams{ + Backoff: backoff.Config{ + BaseDelay: 1.0 * time.Second, + Multiplier: 1.6, + Jitter: 0.2, + MaxDelay: 7 * time.Second, + }, + MinConnectTimeout: 5 * time.Second, + } + connectDialOption := grpc.WithConnectParams(connectParams) + + tlsDialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + + // FIXME make address part of a config + addr := ":5557" + connection, err := grpc.NewClient(addr, tlsDialOption, connectDialOption) + if err != nil { + return nil, fmt.Errorf("failed to dial %s: %w", addr, err) + } + + client := &Client{ + ObserverClient: observerv1.NewObserverClient(connection), + } + return client, nil +} diff --git a/ai/pkg/retrieval/flows/retriever.go b/ai/pkg/retrieval/flows/retriever.go new file mode 100644 index 0000000000..4340185a5f --- /dev/null +++ b/ai/pkg/retrieval/flows/retriever.go @@ -0,0 +1,225 @@ +package flows + +import ( + "context" + "encoding/json" + "fmt" + "os" + "os/exec" + "time" + + "github.com/microsoft/retina/ai/pkg/retrieval/flows/client" + + flowpb "github.com/cilium/cilium/api/v1/flow" + observerpb "github.com/cilium/cilium/api/v1/observer" + "github.com/cilium/hubble-ui/backend/domain/labels" + "github.com/cilium/hubble-ui/backend/domain/service" + "github.com/sirupsen/logrus" + "google.golang.org/grpc" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +const MaxFlowsFromHubbleRelay = 30000 + +type Retriever struct { + log logrus.FieldLogger + config *rest.Config + clientset *kubernetes.Clientset + initialized bool + client *client.Client + readFromFile bool + flows []*flowpb.Flow +} + +func NewRetriever(log logrus.FieldLogger, config *rest.Config, clientset *kubernetes.Clientset) *Retriever { + return &Retriever{ + log: log.WithField("component", "flow-retriever"), + config: config, + clientset: clientset, + } +} + +func (r *Retriever) UseFile() { + r.readFromFile = true +} + +func (r *Retriever) Init() error { + if r.readFromFile { + r.log.Info("using flows from file") + return nil + } + + client, err := client.New() + if err != nil { + return fmt.Errorf("failed to create grpc client. %v", err) + } + + r.log.Info("initialized grpc client") + + r.client = client + r.initialized = true + return nil +} + +func (r *Retriever) Observe(ctx context.Context, req *observerpb.GetFlowsRequest) ([]*flowpb.Flow, error) { + if r.readFromFile { + flows, err := readFlowsFromFile("flows.json") + if err != nil { + return nil, fmt.Errorf("failed to read flows from file. %v", err) + } + + return flows, nil + } + + if !r.initialized { + if err := r.Init(); err != nil { + return nil, fmt.Errorf("failed to initialize. %v", err) + } + } + + // port-forward to hubble-relay + portForwardCtx, portForwardCancel := context.WithCancel(ctx) + defer portForwardCancel() + + // FIXME make ports part of a config + cmd := exec.CommandContext(portForwardCtx, "kubectl", "port-forward", "-n", "kube-system", "svc/hubble-relay", "5557:80") + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("failed to start port-forward. %v", err) + } + + // observe flows + observeCtx, observeCancel := context.WithTimeout(ctx, 15*time.Second) + defer observeCancel() + + maxFlows := req.Number + flows, err := r.observeFlowsGRPC(observeCtx, req, int(maxFlows)) + if err != nil { + return nil, fmt.Errorf("failed to observe flows over grpc. %v", err) + } + + // stop the port-forward + portForwardCancel() + // will error with "exit status 1" because of context cancellation + _ = cmd.Wait() + r.log.Info("stopped port-forward") + + r.log.Info("saving flows to JSON") + if err := saveFlowsToJSON(flows, "flows.json"); err != nil { + r.log.WithError(err).Error("failed to save flows to JSON") + return nil, err + } + + return flows, nil +} + +func (r *Retriever) observeFlowsGRPC(ctx context.Context, req *observerpb.GetFlowsRequest, maxFlows int) ([]*flowpb.Flow, error) { + stream, err := r.client.GetFlows(ctx, req, grpc.WaitForReady(true)) + if err != nil { + return nil, fmt.Errorf("failed to get flow stream. %v", err) + } + + r.flows = make([]*flowpb.Flow, 0) + var errReceiving error + for { + select { + case <-ctx.Done(): + r.log.Info("context cancelled") + return r.flows, nil + default: + if errReceiving != nil { + // error receiving and context not done + // TODO handle error instead of returning error + return nil, fmt.Errorf("failed to receive flow. %v", err) + } + + r.log.WithField("flowCount", len(r.flows)).Debug("processing flow") + + getFlowResponse, err := stream.Recv() + if err != nil { + errReceiving = err + continue + } + + f := getFlowResponse.GetFlow() + if f == nil { + continue + } + + r.handleFlow(f) + if len(r.flows) >= maxFlows { + return r.flows, nil + } + } + } +} + +// handleFlow logic is inspired by a snippet from Hubble UI +// https://github.com/cilium/hubble-ui/blob/a06e19ba65299c63a58034a360aeedde9266ec01/backend/internal/flow_stream/flow_stream.go#L360-L395 +func (r *Retriever) handleFlow(f *flowpb.Flow) { + if (f.GetL7() == nil && f.GetL4() == nil) || f.GetSource() == nil || f.GetDestination() == nil { + return + } + + sourceId, destId := service.IdsFromFlowProto(f) + if sourceId == "0" || destId == "0" { + r.log.Warn("invalid (zero) identity in source / dest services") + // TODO print offending flow? + return + } + + // TODO: workaround to hide flows/services which are showing as "World", + // but actually they are k8s services without initialized pods. + // Appropriate fix is to construct and show special service map cards + // and show these flows in special way inside flows table. + if f.GetDestination() != nil { + destService := f.GetDestinationService() + destLabelsProps := labels.Props(f.GetDestination().GetLabels()) + destNames := f.GetDestinationNames() + isDestOutside := destLabelsProps.IsWorld || len(destNames) > 0 + + if destService != nil && isDestOutside { + return + } + } + + r.flows = append(r.flows, f) +} + +func saveFlowsToJSON(flows []*flowpb.Flow, filename string) error { + for _, f := range flows { + // to avoid getting an error: + // failed to encode JSON: json: error calling MarshalJSON for type *flow.Flow: proto:\u00a0google.protobuf.Any: unable to resolve \"type.googleapis.com/utils.RetinaMetadata\": not found + f.Extensions = nil + } + + file, err := os.Create(filename) + if err != nil { + return fmt.Errorf("failed to create file: %w", err) + } + defer file.Close() + + encoder := json.NewEncoder(file) + encoder.SetIndent("", " ") // optional: to make the JSON output pretty + if err := encoder.Encode(flows); err != nil { + return fmt.Errorf("failed to encode JSON: %w", err) + } + + return nil +} + +func readFlowsFromFile(filename string) ([]*flowpb.Flow, error) { + file, err := os.Open(filename) + if err != nil { + return nil, fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + var flows []*flowpb.Flow + decoder := json.NewDecoder(file) + if err := decoder.Decode(&flows); err != nil { + return nil, fmt.Errorf("failed to decode JSON: %w", err) + } + + return flows, nil +} diff --git a/ai/pkg/scenarios/common.go b/ai/pkg/scenarios/common.go new file mode 100644 index 0000000000..6f458a7132 --- /dev/null +++ b/ai/pkg/scenarios/common.go @@ -0,0 +1,49 @@ +package scenarios + +import "regexp" + +// common parameters +var ( + k8sNameRegex = regexp.MustCompile(`^[a-zA-Z][-a-zA-Z0-9]*$`) + nodesRegex = regexp.MustCompile(`^\[[a-zA-Z][-a-zA-Z0-9_,]*\]$`) + + Namespace1 = &ParameterSpec{ + Name: "namespace1", + DataType: "string", + Description: "Namespace 1", + Optional: false, + Regex: k8sNameRegex, + } + + PodPrefix1 = &ParameterSpec{ + Name: "podPrefix1", + DataType: "string", + Description: "Pod prefix 1", + Optional: true, + Regex: k8sNameRegex, + } + + Namespace2 = &ParameterSpec{ + Name: "namespace2", + DataType: "string", + Description: "Namespace 2", + Optional: true, + Regex: k8sNameRegex, + } + + PodPrefix2 = &ParameterSpec{ + Name: "podPrefix2", + DataType: "string", + Description: "Pod prefix 2", + Optional: true, + Regex: k8sNameRegex, + } + + Nodes = &ParameterSpec{ + Name: "nodes", + DataType: "[]string", + Description: "Nodes", + Optional: true, + Regex: nodesRegex, + } +) diff --git a/ai/pkg/scenarios/dns/dns.go b/ai/pkg/scenarios/dns/dns.go new file mode 100644 index 0000000000..02efeca0ea --- /dev/null +++ b/ai/pkg/scenarios/dns/dns.go @@ -0,0 +1,248 @@ +package dns + +import ( + "context" + "fmt" + "time" + + "github.com/microsoft/retina/ai/pkg/lm" + flowparsing "github.com/microsoft/retina/ai/pkg/parse/flows" + flowretrieval "github.com/microsoft/retina/ai/pkg/retrieval/flows" + "github.com/microsoft/retina/ai/pkg/scenarios" + + flowpb "github.com/cilium/cilium/api/v1/flow" + observerpb "github.com/cilium/cilium/api/v1/observer" +) + +var ( + Definition = scenarios.NewDefinition("DNS", "DNS", parameterSpecs, &handler{}) + + DNSQuery = &scenarios.ParameterSpec{ + Name: "dnsQuery", + DataType: "string", + Description: "DNS query", + Optional: true, + } + + parameterSpecs = []*scenarios.ParameterSpec{ + scenarios.Namespace1, + scenarios.PodPrefix1, + scenarios.Namespace2, + scenarios.PodPrefix2, + scenarios.Nodes, + DNSQuery, + } +) + +// mirrored with parameterSpecs +type params struct { + Namespace1 string + PodPrefix1 string + Namespace2 string + PodPrefix2 string + Nodes []string + DNSQuery string +} + +type handler struct{} + +func (h *handler) Handle(ctx context.Context, cfg *scenarios.Config, typedParams map[string]any, question string, history lm.ChatHistory) (string, error) { + l := cfg.Log.WithField("scenario", "drops") + l.Info("handling drops scenario...") + + if err := cfg.FlowRetriever.Init(); err != nil { + return "", fmt.Errorf("error initializing flow retriever: %w", err) + } + + params := ¶ms{ + Namespace1: anyToString(typedParams[scenarios.Namespace1.Name]), + PodPrefix1: anyToString(typedParams[scenarios.PodPrefix1.Name]), + Namespace2: anyToString(typedParams[scenarios.Namespace2.Name]), + PodPrefix2: anyToString(typedParams[scenarios.PodPrefix2.Name]), + Nodes: anyToStringSlice(typedParams[scenarios.Nodes.Name]), + } + + req := flowsRequest(params) + flows, err := cfg.FlowRetriever.Observe(ctx, req) + if err != nil { + return "", fmt.Errorf("error observing flows: %w", err) + } + l.Info("observed flows") + + // analyze flows + p := flowparsing.NewParser(l) + p.Parse(flows) + connections := p.Connections() + + formattedFlowLogs := formatFlowLogs(connections) + + message := fmt.Sprintf(messagePromptTemplate, question, formattedFlowLogs) + analyzeCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + resp, err := cfg.Model.Generate(analyzeCtx, systemPrompt, history, message) + if err != nil { + return "", fmt.Errorf("error analyzing flows: %w", err) + } + l.Info("analyzed flows") + + return resp, nil +} + +// cast to string without nil panics +func anyToString(a any) string { + if a == nil { + return "" + } + return a.(string) +} + +// cast to []string without nil panics +func anyToStringSlice(a any) []string { + if a == nil { + return nil + } + return a.([]string) +} + +// TODO handle dnsQuery param +func flowsRequest(params *params) *observerpb.GetFlowsRequest { + req := &observerpb.GetFlowsRequest{ + Number: flowretrieval.MaxFlowsFromHubbleRelay, + Follow: true, + } + + protocol := []string{"DNS"} + + if params.Namespace1 == "" && params.PodPrefix1 == "" && params.Namespace2 == "" && params.PodPrefix2 == "" { + req.Whitelist = []*flowpb.FlowFilter{ + { + NodeName: params.Nodes, + Protocol: protocol, + }, + } + + return req + } + + var prefix1 []string + if params.Namespace1 != "" || params.PodPrefix1 != "" { + prefix1 = append(prefix1, fmt.Sprintf("%s/%s", params.Namespace1, params.PodPrefix1)) + } + + var prefix2 []string + if params.Namespace2 != "" || params.PodPrefix2 != "" { + prefix2 = append(prefix2, fmt.Sprintf("%s/%s", params.Namespace2, params.PodPrefix2)) + } + + filterDirection1 := &flowpb.FlowFilter{ + NodeName: params.Nodes, + SourcePod: prefix1, + DestinationPod: prefix2, + Protocol: protocol, + } + + filterDirection2 := &flowpb.FlowFilter{ + NodeName: params.Nodes, + SourcePod: prefix2, + DestinationPod: prefix1, + Protocol: protocol, + } + + req.Whitelist = []*flowpb.FlowFilter{ + filterDirection1, + filterDirection2, + } + + return req +} + +func formatFlowLogs(connections flowparsing.Connections) string { + requestsWithoutResponse := make([]string, 0) + successfulResponses := make([]string, 0) + failedResponses := make([]string, 0) + for _, conn := range connections { + requests := make(map[string]struct{}) + responses := make(map[string]uint32) + for _, f := range conn.Flows { + if f.GetL7().GetDns() == nil { + continue + } + + dnsType := f.GetL7().Type.String() + + query := f.GetL7().GetDns().GetQuery() + switch dnsType { + case "REQUEST": + requests[query] = struct{}{} + case "RESPONSE": + responses[query] = f.GetL7().GetDns().GetRcode() + } + } + + for q := range requests { + if _, ok := responses[q]; !ok { + line := fmt.Sprintf("Pods: %s. query: %s", conn.Key, q) + requestsWithoutResponse = append(requestsWithoutResponse, line) + } + } + + for q, rcode := range responses { + if rcode == 0 { + line := fmt.Sprintf("Pods: %s. query: %s", conn.Key, q) + successfulResponses = append(successfulResponses, line) + } else { + line := fmt.Sprintf("Pods: %s. query: %s. error: %s", conn.Key, q, rcodeToErrorName(rcode)) + failedResponses = append(failedResponses, line) + } + } + } + + return fmt.Sprintf("SUCCESSFUL RESPONSES:\n%v\n\nRESPONSES WITH ERRORS:\n%v\n\nREQUESTS WITHOUT RESPONSES:\n%v", successfulResponses, failedResponses, requestsWithoutResponse) +} + +func rcodeToErrorName(rcode uint32) string { + switch rcode { + case 0: + return "NoError" + case 1: + return "FormErr" + case 2: + return "ServFail" + case 3: + return "NXDomain" + case 4: + return "NotImp" + case 5: + return "Refused" + case 6: + return "YXDomain" + case 7: + return "YXRRSet" + case 8: + return "NXRRSet" + case 9: + return "NotAuth" + case 10: + return "NotZone" + case 11: + return "DSOTYPENI" + case 16: + return "BADVERS/BADSIG" + case 17: + return "BADKEY" + case 18: + return "BADTIME" + case 19: + return "BADMODE" + case 20: + return "BADNAME" + case 21: + return "BADALG" + case 22: + return "BADTRUNC" + case 23: + return "BADCOOKIE" + default: + return "Unknown" + } +} diff --git a/ai/pkg/scenarios/dns/prompt.go b/ai/pkg/scenarios/dns/prompt.go new file mode 100644 index 0000000000..c915cac6a2 --- /dev/null +++ b/ai/pkg/scenarios/dns/prompt.go @@ -0,0 +1,31 @@ +package dns + +// TODO implement below analysis logic in code and/or LM prompt + +/* + DNS ANALYSIS LOGIC + + Primary questions: + - Do any queries have failing responses? Which? + - Do any queries have no responses? Which? + - Which Pods are impacted by above? Which are not? + - Which core-dns Pods are impacted by above? Which are not? + - Is "reserved:world" responding with errors or responding at all? + + More questions: + - What kind of queries do we see (qualitatively)? + - Do we see any issue by DNS record type? + - Does number of IPs matter?? +*/ + +const ( + systemPrompt = `You are an assistant with expertise in Kubernetes Networking. The user is debugging networking issues on their Pods and/or Nodes. Provide a succinct summary identifying any issues in the "summary of network flow logs" provided by the user.` + + // first parameter is the user's question + // second parameter is the user's network flow logs + messagePromptTemplate = `%s + +"summary of network flow logs": +%s +` +) diff --git a/ai/pkg/scenarios/drops/drops.go b/ai/pkg/scenarios/drops/drops.go new file mode 100644 index 0000000000..9fc303ef09 --- /dev/null +++ b/ai/pkg/scenarios/drops/drops.go @@ -0,0 +1,240 @@ +package drops + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/microsoft/retina/ai/pkg/lm" + flowparsing "github.com/microsoft/retina/ai/pkg/parse/flows" + flowretrieval "github.com/microsoft/retina/ai/pkg/retrieval/flows" + "github.com/microsoft/retina/ai/pkg/scenarios" + + flowpb "github.com/cilium/cilium/api/v1/flow" + observerpb "github.com/cilium/cilium/api/v1/observer" +) + +var ( + Definition = scenarios.NewDefinition("DROPS", "DROPS", parameterSpecs, &handler{}) + + parameterSpecs = []*scenarios.ParameterSpec{ + scenarios.Namespace1, + scenarios.PodPrefix1, + scenarios.Namespace2, + scenarios.PodPrefix2, + scenarios.Nodes, + } +) + +// mirrored with parameterSpecs +type params struct { + Namespace1 string + PodPrefix1 string + Namespace2 string + PodPrefix2 string + Nodes []string +} + +type handler struct{} + +func (h *handler) Handle(ctx context.Context, cfg *scenarios.Config, typedParams map[string]any, question string, history lm.ChatHistory) (string, error) { + l := cfg.Log.WithField("scenario", "drops") + l.Info("handling drops scenario...") + + if err := cfg.FlowRetriever.Init(); err != nil { + return "", fmt.Errorf("error initializing flow retriever: %w", err) + } + + params := ¶ms{ + Namespace1: anyToString(typedParams[scenarios.Namespace1.Name]), + PodPrefix1: anyToString(typedParams[scenarios.PodPrefix1.Name]), + Namespace2: anyToString(typedParams[scenarios.Namespace2.Name]), + PodPrefix2: anyToString(typedParams[scenarios.PodPrefix2.Name]), + Nodes: anyToStringSlice(typedParams[scenarios.Nodes.Name]), + } + + req := flowsRequest(params) + flows, err := cfg.FlowRetriever.Observe(ctx, req) + if err != nil { + return "", fmt.Errorf("error observing flows: %w", err) + } + l.Info("observed flows") + + // analyze flows + p := flowparsing.NewParser(l) + p.Parse(flows) + connections := p.Connections() + + formattedFlowLogs := formatFlowLogs(connections) + + message := fmt.Sprintf(messagePromptTemplate, question, formattedFlowLogs) + analyzeCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + resp, err := cfg.Model.Generate(analyzeCtx, systemPrompt, history, message) + if err != nil { + return "", fmt.Errorf("error analyzing flows: %w", err) + } + l.Info("analyzed flows") + + return resp, nil +} + +// cast to string without nil panics +func anyToString(a any) string { + if a == nil { + return "" + } + return a.(string) +} + +// cast to []string without nil panics +func anyToStringSlice(a any) []string { + if a == nil { + return nil + } + return a.([]string) +} + +func flowsRequest(params *params) *observerpb.GetFlowsRequest { + req := &observerpb.GetFlowsRequest{ + Number: flowretrieval.MaxFlowsFromHubbleRelay, + Follow: true, + } + + protocol := []string{"TCP", "UDP"} + + if params.Namespace1 == "" && params.PodPrefix1 == "" && params.Namespace2 == "" && params.PodPrefix2 == "" { + req.Whitelist = []*flowpb.FlowFilter{ + { + NodeName: params.Nodes, + Protocol: protocol, + }, + } + + return req + } + + var prefix1 []string + if params.Namespace1 != "" || params.PodPrefix1 != "" { + prefix1 = append(prefix1, fmt.Sprintf("%s/%s", params.Namespace1, params.PodPrefix1)) + } + + var prefix2 []string + if params.Namespace2 != "" || params.PodPrefix2 != "" { + prefix2 = append(prefix2, fmt.Sprintf("%s/%s", params.Namespace2, params.PodPrefix2)) + } + + filterDirection1 := &flowpb.FlowFilter{ + NodeName: params.Nodes, + SourcePod: prefix1, + DestinationPod: prefix2, + Protocol: protocol, + } + + filterDirection2 := &flowpb.FlowFilter{ + NodeName: params.Nodes, + SourcePod: prefix2, + DestinationPod: prefix1, + Protocol: protocol, + } + + // filterPod1ToIP := &flowpb.FlowFilter{ + // NodeName: params.Nodes, + // SourcePod: prefix1, + // DestinationIp: []string{""}, + // Protocol: protocol, + // } + + // filterPod1FromIP := &flowpb.FlowFilter{ + // NodeName: params.Nodes, + // SourceIp: []string{""}, + // DestinationPod: prefix1, + // Protocol: protocol, + // } + + // includes services + // world := []string{"reserved:world"} + + // filterPod1ToWorld := &flowpb.FlowFilter{ + // NodeName: params.Nodes, + // SourcePod: prefix1, + // DestinationLabel: world, + // Protocol: protocol, + // } + + // filterPod1FromWorld := &flowpb.FlowFilter{ + // NodeName: params.Nodes, + // SourceLabel: world, + // DestinationPod: prefix1, + // Protocol: protocol, + // } + + req.Whitelist = []*flowpb.FlowFilter{ + filterDirection1, + filterDirection2, + // filterPod1FromIP, + // filterPod1ToIP, + } + + return req +} + +func formatFlowLogs(connections flowparsing.Connections) string { + // FIXME hacky right now + forwards := connStrings(connections, flowpb.Verdict_FORWARDED) + + drops := connStrings(connections, flowpb.Verdict_DROPPED) + other := connStrings(connections, flowpb.Verdict_VERDICT_UNKNOWN) + + return fmt.Sprintf("SUCCESSFUL CONNECTIONS:\n%s\n\nDROPPED CONNECTIONS:\n%s\n\nOTHER CONNECTIONS:\n%s", forwards, drops, other) +} + +func connStrings(connections flowparsing.Connections, verdict flowpb.Verdict) string { + connStrings := make([]string, 0, len(connections)) + for _, conn := range connections { + match := false + for _, f := range conn.Flows { + // FIXME hacky right now + if f.GetVerdict() == verdict || (verdict == flowpb.Verdict_VERDICT_UNKNOWN && f.GetVerdict() != flowpb.Verdict_FORWARDED && f.GetVerdict() != flowpb.Verdict_DROPPED) { + match = true + break + } + } + + if !match { + continue + } + + connString := "" + if verdict == flowpb.Verdict_FORWARDED && conn.Flows[0].L4.GetTCP() != nil { + successful := false + rst := false + for _, f := range conn.Flows { + if f.GetVerdict() == flowpb.Verdict_FORWARDED && f.L4.GetTCP().GetFlags().GetSYN() && f.L4.GetTCP().GetFlags().GetACK() { + successful = true + continue + } + + if f.GetVerdict() == flowpb.Verdict_FORWARDED && f.L4.GetTCP().GetFlags().GetRST() { + rst = true + continue + } + } + _ = successful + connString = fmt.Sprintf("Connection: %s -> %s, Number of Flows: %d. Was Reset: %v", conn.Pod1, conn.Pod2, len(conn.Flows), rst) + + } else { + + connString = fmt.Sprintf("Connection: %s -> %s, Number of Flows: %d", conn.Pod1, conn.Pod2, len(conn.Flows)) + } + + connStrings = append(connStrings, connString) + } + + if len(connStrings) == 0 { + return "none" + } + + return strings.Join(connStrings, "\n") +} diff --git a/ai/pkg/scenarios/drops/prompt.go b/ai/pkg/scenarios/drops/prompt.go new file mode 100644 index 0000000000..3ab4fe3b8d --- /dev/null +++ b/ai/pkg/scenarios/drops/prompt.go @@ -0,0 +1,26 @@ +package drops + +// TODO implement below analysis logic in code and/or LM prompt + +/* + DROPS ANALYSIS LOGIC + + Primary questions: + - Is there dropped traffic? What are the source, destination, protocol, and port? + - Are there TCP SYNs missing SYN-ACKs? What are the source, destination, and port? + - Same for TCP resets. + - Which connections are successful? Compare this to above. + - Are Nodes experiencing NIC issues? +*/ + +const ( + systemPrompt = `You are an assistant with expertise in Kubernetes Networking. The user is debugging networking issues on their Pods and/or Nodes. Provide a succinct summary identifying any issues in the "summary of network flow logs" provided by the user.` + + // first parameter is the user's question + // second parameter is the user's network flow logs + messagePromptTemplate = `%s + +"summary of network flow logs": +%s +` +) diff --git a/ai/pkg/scenarios/types.go b/ai/pkg/scenarios/types.go new file mode 100644 index 0000000000..0ae48fe17c --- /dev/null +++ b/ai/pkg/scenarios/types.go @@ -0,0 +1,95 @@ +package scenarios + +import ( + "context" + "fmt" + "regexp" + "strconv" + "strings" + + "github.com/microsoft/retina/ai/pkg/lm" + flowretrieval "github.com/microsoft/retina/ai/pkg/retrieval/flows" + + "github.com/sirupsen/logrus" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +type Definition struct { + Name string + Description string + Specs []*ParameterSpec + handler +} + +func NewDefinition(name, description string, specs []*ParameterSpec, handler handler) *Definition { + return &Definition{ + Name: name, + Description: description, + Specs: specs, + handler: handler, + } +} + +func (d *Definition) Handle(ctx context.Context, cfg *Config, rawParams map[string]string, question string, history lm.ChatHistory) (string, error) { + typedParams := make(map[string]any) + + // validate params + for _, p := range d.Specs { + raw, ok := rawParams[p.Name] + if !ok { + if !p.Optional { + return "", fmt.Errorf("missing required parameter %s", p.Name) + } + + continue + } + + if p.Regex != nil && !p.Regex.MatchString(raw) { + return "", fmt.Errorf("parameter %s does not match regex format", p.Name) + } + + switch p.DataType { + case "string": + typedParams[p.Name] = raw + case "int": + i, err := strconv.Atoi(raw) + if err != nil { + return "", fmt.Errorf("parameter %s is not an integer", p.Name) + } + typedParams[p.Name] = i + case "[]string": + // make sure the format is like [a,b,c] + if raw == "" || raw[0] != '[' || raw[len(raw)-1] != ']' || strings.Count(raw, "[") != 1 || strings.Count(raw, "]") != 1 { + return "", fmt.Errorf("invalid array format for parameter %s", p.Name) + } + // remove brackets + raw = raw[1 : len(raw)-1] + typedParams[p.Name] = strings.Split(raw, ",") + default: + return "", fmt.Errorf("unsupported data type %s", p.DataType) + } + } + + return d.handler.Handle(ctx, cfg, typedParams, question, history) +} + +type ParameterSpec struct { + Name string + DataType string + Description string + Optional bool + Regex *regexp.Regexp +} + +type handler interface { + Handle(ctx context.Context, cfg *Config, typedParams map[string]any, question string, history lm.ChatHistory) (string, error) +} + +type Config struct { + Log logrus.FieldLogger + Config *rest.Config + Clientset *kubernetes.Clientset + Model lm.Model + FlowRetriever *flowretrieval.Retriever +} diff --git a/ai/test/integration/lm/azure-openai_test.go b/ai/test/integration/lm/azure-openai_test.go new file mode 100644 index 0000000000..746a3526f3 --- /dev/null +++ b/ai/test/integration/lm/azure-openai_test.go @@ -0,0 +1,30 @@ +package lmtest + +import ( + "context" + "fmt" + "testing" + + "github.com/microsoft/retina/ai/pkg/lm" + "github.com/sirupsen/logrus" +) + +func TestAzureOpenAICompletion(t *testing.T) { + log := logrus.New() + + // configure LM (language model) + // model := lm.NewEchoModel() + model, err := lm.NewAzureOpenAI() + if err != nil { + log.WithError(err).Fatal("failed to create Azure OpenAI model") + } + log.Info("initialized Azure OpenAI model") + + resp, err := model.Generate(context.TODO(), `You are an assistant with expertise in Kubernetes Networking. The user is debugging networking issues on their Pods and/or Nodes. Provide a succinct summary identifying any issues in the "summary of network flow logs" provided by the user.`, nil, `summary of network flow logs: + abc`) + if err != nil { + log.WithError(err).Fatal("error calling llm") + } + log.Info("called llm") + fmt.Println(resp) +}