diff --git a/.circleci/config.yml b/.circleci/config.yml index 03feaee4a..d0d8dabad 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -34,12 +34,20 @@ jobs: - checkout - restore_cache: key: kafka-go-mod-{{ checksum "go.sum" }}-1 - - run: go mod download + - run: + name: Download dependencies + command: go mod download - save_cache: key: kafka-go-mod-{{ checksum "go.sum" }}-1 paths: - /go/pkg/mod - - run: go test -race -cover ./... + - run: + name: Test kafka-go + command: go test -race -cover ./... + - run: + name: Test kafka-go/sasl/aws_msk_iam + working_directory: ./sasl/aws_msk_iam + command: go test -race -cover ./... # Starting at version 0.11, the kafka features and configuration remained # mostly stable, so we can use this CI job configuration as template for other @@ -219,7 +227,7 @@ jobs: - 9093:9093 environment: *environment steps: *steps - + workflows: version: 2 run: diff --git a/sasl/aws_msk_iam/go.mod b/sasl/aws_msk_iam/go.mod new file mode 100644 index 000000000..c27b50dd4 --- /dev/null +++ b/sasl/aws_msk_iam/go.mod @@ -0,0 +1,8 @@ +module github.com/segmentio/kafka-go/sasl/aws_msk_iam + +go 1.15 + +require ( + github.com/aws/aws-sdk-go v1.41.3 + github.com/segmentio/kafka-go v0.4.24 +) diff --git a/sasl/aws_msk_iam/go.sum b/sasl/aws_msk_iam/go.sum new file mode 100644 index 000000000..952e675db --- /dev/null +++ b/sasl/aws_msk_iam/go.sum @@ -0,0 +1,60 @@ +github.com/aws/aws-sdk-go v1.41.3 h1:deglLZ1jjHdhkd6Rbad1MZM4gL+1pfnTfjuFk6CGJFM= +github.com/aws/aws-sdk-go v1.41.3/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY= +github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA= +github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A= +github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/segmentio/kafka-go v0.4.24 h1:R3tYSYxyLK3SknDIU15LtpDdq59gRg2/J0GKhDFXrBQ= +github.com/segmentio/kafka-go v0.4.24/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= +github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 h1:rlLehGeYg6jfoyz/eDqDU1iRXLKfR42nnNh57ytKEWo= +golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q= +golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/sasl/aws_msk_iam/msk_iam.go b/sasl/aws_msk_iam/msk_iam.go new file mode 100644 index 000000000..4e341d398 --- /dev/null +++ b/sasl/aws_msk_iam/msk_iam.go @@ -0,0 +1,124 @@ +package aws_msk_iam + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "net/url" + "runtime" + "strings" + "time" + + sigv4 "github.com/aws/aws-sdk-go/aws/signer/v4" + "github.com/segmentio/kafka-go/sasl" +) + +const ( + // These constants come from https://github.com/aws/aws-msk-iam-auth#details and + // https://github.com/aws/aws-msk-iam-auth/blob/main/src/main/java/software/amazon/msk/auth/iam/internals/AWS4SignedPayloadGenerator.java. + signVersion = "2020_10_22" + signService = "kafka-cluster" + signAction = "kafka-cluster:Connect" + signVersionKey = "version" + signHostKey = "host" + signUserAgentKey = "user-agent" + signActionKey = "action" + queryActionKey = "Action" +) + +var signUserAgent = fmt.Sprintf("kafka-go/sasl/aws_msk_iam/%s", runtime.Version()) + +// Mechanism implements sasl.Mechanism for the AWS_MSK_IAM mechanism, based on the official java implementation: +// https://github.com/aws/aws-msk-iam-auth +type Mechanism struct { + // The sigv4.Signer to use when signing the request. Required. + Signer *sigv4.Signer + // The region where the msk cluster is hosted, e.g. "us-east-1". Required. + Region string + // The time the request is planned for. Optional, defaults to time.Now() at time of authentication. + SignTime time.Time + // The duration for which the presigned request is active. Optional, defaults to 5 minutes. + Expiry time.Duration +} + +func (m *Mechanism) Name() string { + return "AWS_MSK_IAM" +} + +// Start produces the authentication values required for AWS_MSK_IAM. It produces the following json as a byte array, +// making use of the aws-sdk to produce the signed output. +// { +// "version" : "2020_10_22", +// "host" : "", +// "user-agent": "", +// "action": "kafka-cluster:Connect", +// "x-amz-algorithm" : "", +// "x-amz-credential" : "///kafka-cluster/aws4_request", +// "x-amz-date" : "", +// "x-amz-security-token" : "", +// "x-amz-signedheaders" : "host", +// "x-amz-expires" : "", +// "x-amz-signature" : "" +// } +func (m *Mechanism) Start(ctx context.Context) (sess sasl.StateMachine, ir []byte, err error) { + saslMeta := sasl.MetadataFromContext(ctx) + if saslMeta == nil { + return nil, nil, errors.New("missing sasl metadata") + } + + query := url.Values{ + queryActionKey: {signAction}, + } + + signUrl := url.URL{ + Scheme: "kafka", + Host: saslMeta.Host, + Path: "/", + RawQuery: query.Encode(), + } + + req, err := http.NewRequest("GET", signUrl.String(), nil) + if err != nil { + return nil, nil, err + } + + signTime := m.SignTime + if signTime.IsZero() { + signTime = time.Now() + } + + expiry := m.Expiry + if expiry == 0 { + expiry = 5 * time.Minute + } + + header, err := m.Signer.Presign(req, nil, signService, m.Region, expiry, signTime) + if err != nil { + return nil, nil, err + } + signedMap := map[string]string{ + signVersionKey: signVersion, + signHostKey: signUrl.Host, + signUserAgentKey: signUserAgent, + signActionKey: signAction, + } + // The protocol requires lowercase keys. + for key, vals := range header { + signedMap[strings.ToLower(key)] = vals[0] + } + for key, vals := range req.URL.Query() { + signedMap[strings.ToLower(key)] = vals[0] + } + + signedJson, err := json.Marshal(signedMap) + return m, signedJson, err +} + +func (m *Mechanism) Next(ctx context.Context, challenge []byte) (bool, []byte, error) { + // After the initial step, the authentication is complete + // kafka will return error if it rejected the credentials, so we'll only + // arrive here on success. + return true, nil, nil +} diff --git a/sasl/aws_msk_iam/msk_iam_test.go b/sasl/aws_msk_iam/msk_iam_test.go new file mode 100644 index 000000000..1b3cc7a04 --- /dev/null +++ b/sasl/aws_msk_iam/msk_iam_test.go @@ -0,0 +1,105 @@ +package aws_msk_iam + +import ( + "bytes" + "context" + "encoding/json" + "testing" + "time" + + "github.com/segmentio/kafka-go/sasl" + + "github.com/aws/aws-sdk-go/aws/credentials" + sigv4 "github.com/aws/aws-sdk-go/aws/signer/v4" +) + +const ( + accessKeyId = "ACCESS_KEY" + secretAccessKey = "SECRET_KEY" +) + +// using a fixed time allows the signature to be verifiable in a test +var signTime = time.Date(2021, 10, 14, 13, 5, 0, 0, time.UTC) + +func TestAwsMskIamMechanism(t *testing.T) { + tests := []struct { + description string + ctx func() context.Context + shouldFail bool + }{ + { + description: "with metadata", + ctx: func() context.Context { + return sasl.WithMetadata(context.Background(), &sasl.Metadata{ + Host: "localhost", + Port: 9092, + }) + }, + }, + { + description: "without metadata", + ctx: func() context.Context { + return context.Background() + }, + shouldFail: true, + }, + } + + for _, tt := range tests { + t.Run(tt.description, func(t *testing.T) { + ctx := tt.ctx() + + creds := credentials.NewStaticCredentials(accessKeyId, secretAccessKey, "") + mskMechanism := &Mechanism{ + Signer: sigv4.NewSigner(creds), + Region: "us-east-1", + SignTime: signTime, + } + + sess, auth, err := mskMechanism.Start(ctx) + if tt.shouldFail { // if error is expected + if err == nil { // but we don't find one + t.Fatal("error expected") + } else { // but we do find one + return // return early since the remaining assertions are irrelevant + } + } else { // if error is not expected (typical) + if err != nil { // but we do find one + t.Fatal(err) + } + } + + if sess != mskMechanism { + t.Error( + "Unexpected session", + "expected", mskMechanism, + "got", sess, + ) + } + + expectedMap := map[string]string{ + "version": "2020_10_22", + "action": "kafka-cluster:Connect", + "host": "localhost", + "user-agent": signUserAgent, + "x-amz-algorithm": "AWS4-HMAC-SHA256", + "x-amz-credential": "ACCESS_KEY/20211014/us-east-1/kafka-cluster/aws4_request", + "x-amz-date": "20211014T130500Z", + "x-amz-expires": "300", + "x-amz-signedheaders": "host", + "x-amz-signature": "6b8d25f9b45b9c7db9da855a49112d80379224153a27fd279c305a5b7940d1a7", + } + expectedAuth, err := json.Marshal(expectedMap) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(expectedAuth, auth) { + t.Error("Unexpected authentication", + "expected", expectedAuth, + "got", auth, + ) + } + }) + } +}