Skip to content

Commit

Permalink
fix msk nuking (#652)
Browse files Browse the repository at this point in the history
  • Loading branch information
autero1 committed Mar 18, 2024
1 parent 24f6c4a commit 152f823
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 24 deletions.
7 changes: 7 additions & 0 deletions .circleci/nuke_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,10 @@ RdsParameterGroup:
exclude:
names_regex:
- "gruntwork-test-.*"

MSKCluster:
include:
names_regex:
- "^msk-cluster-[a-zA-Z0-9]{6}$"
- "^msk-cluster-[a-zA-Z0-9]{6}-.*"
- "^msk-tiered-storage-test.*"
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -519,10 +519,10 @@ of the file that are supported are listed here.
| ec2-dedicated-hosts | EC2DedicatedHosts | ✅ (EC2 Name Tag) | ✅ (Allocation Time) | ❌ |
| ec2-dhcp-option | EC2DhcpOption | ❌ | ❌ | ❌ |
| ec2-keypairs | EC2KeyPairs | ✅ (Key Pair Name) | ✅ (Creation Time) | ✅ |
| ec2-ipam | EC2IPAM | ✅ (IPAM name) | ✅ (Creation Time) | ✅ |
| ec2-ipam-pool | EC2IPAMPool | ✅ (IPAM Pool name) | ✅ (Creation Time) | ✅ |
| ec2-ipam-resource-discovery | EC2IPAMResourceDiscovery | ✅ (IPAM Discovery Name) | ✅ (Creation Time) | ✅ |
| ec2-ipam-scope | EC2IPAMScope | ✅ (IPAM Scope Name) | ✅ (Creation Time) | ✅ |
| ec2-ipam | EC2IPAM | ✅ (IPAM name) | ✅ (Creation Time) | ✅ |
| ec2-ipam-pool | EC2IPAMPool | ✅ (IPAM Pool name) | ✅ (Creation Time) | ✅ |
| ec2-ipam-resource-discovery | EC2IPAMResourceDiscovery | ✅ (IPAM Discovery Name) | ✅ (Creation Time) | ✅ |
| ec2-ipam-scope | EC2IPAMScope | ✅ (IPAM Scope Name) | ✅ (Creation Time) | ✅ |
| ecr | ECRRepository | ✅ (Repository Name) | ✅ (Creation Time) | ❌ |
| ecscluster | ECSCluster | ✅ (Cluster Name) | ❌ | ❌ |
| ecsserv | ECSService | ✅ (Service Name) | ✅ (Creation Time) | ❌ |
Expand All @@ -546,7 +546,7 @@ of the file that are supported are listed here.
| lc | LaunchConfiguration | ✅ (Launch Configuration Name) | ✅ (Created Time) | ❌ |
| lt | LaunchTemplate | ✅ (Launch Template Name) | ✅ (Created Time) | ❌ |
| macie-member | MacieMember | ❌ | ✅ (Creation Time) | ❌ |
| msk-cluster | MskCluster | ✅ (Cluster Name) | ✅ (Creation Time) | ❌ |
| msk-cluster | MSKCluster | ✅ (Cluster Name) | ✅ (Creation Time) | ❌ |
| nat-gateway | NatGateway | ✅ (EC2 Name Tag) | ✅ (Creation Time) | ✅ |
| oidcprovider | OIDCProvider | ✅ (Provider URL) | ✅ (Creation Time) | ❌ |
| opensearchdomain | OpenSearchDomain | ✅ (Domain Name) | ✅ (First Seen Tag Time) | ❌ |
Expand Down
1 change: 1 addition & 0 deletions aws/resource_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func getRegisteredRegionalResources() []AwsResource {
&resources.LaunchConfigs{},
&resources.LaunchTemplates{},
&resources.MacieMember{},
&resources.MSKCluster{},
&resources.NatGateways{},
&resources.OpenSearchDomains{},
&resources.DBInstances{},
Expand Down
22 changes: 17 additions & 5 deletions aws/resources/msk_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package resources

import (
"context"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kafka"
"github.com/gruntwork-io/cloud-nuke/config"
"github.com/gruntwork-io/cloud-nuke/logging"
"github.com/gruntwork-io/cloud-nuke/report"
)

func (m MSKCluster) getAll(c context.Context, configObj config.Config) ([]*string, error) {
func (m *MSKCluster) getAll(c context.Context, configObj config.Config) ([]*string, error) {
var clusterIDs []*string

err := m.Client.ListClustersV2Pages(&kafka.ListClustersV2Input{}, func(page *kafka.ListClustersV2Output, lastPage bool) bool {
Expand All @@ -26,7 +28,7 @@ func (m MSKCluster) getAll(c context.Context, configObj config.Config) ([]*strin
return clusterIDs, nil
}

func (m MSKCluster) shouldInclude(cluster *kafka.Cluster, configObj config.Config) bool {
func (m *MSKCluster) shouldInclude(cluster *kafka.Cluster, configObj config.Config) bool {
if *cluster.State == kafka.ClusterStateDeleting {
return false
}
Expand All @@ -37,24 +39,34 @@ func (m MSKCluster) shouldInclude(cluster *kafka.Cluster, configObj config.Confi
return false
}

// if cluster is in maintenance, skip it as it will only throw an error when attempting to delete it
// BadRequestException: You can't delete cluster in MAINTENANCE state.
if *cluster.State == kafka.ClusterStateMaintenance {
return false
}

return configObj.MSKCluster.ShouldInclude(config.ResourceValue{
Name: cluster.ClusterName,
Time: cluster.CreationTime,
})
}

func (m MSKCluster) nukeAll(identifiers []string) error {
func (m *MSKCluster) nukeAll(identifiers []*string) error {
if len(identifiers) == 0 {
return nil
}

for _, clusterArn := range identifiers {
_, err := m.Client.DeleteCluster(&kafka.DeleteClusterInput{
ClusterArn: &clusterArn,
ClusterArn: clusterArn,
})
if err != nil {
logging.Errorf("[Failed] %s", err)
}

// Record status of this resource
e := report.Entry{
Identifier: clusterArn,
Identifier: aws.StringValue(clusterArn),
ResourceType: "MSKCluster",
Error: err,
}
Expand Down
4 changes: 2 additions & 2 deletions aws/resources/msk_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestShouldIncludeMSKCluster(t *testing.T) {
IncludeRule: config.FilterRule{
NamesRegExp: []config.Expression{
{
RE: *regexp.MustCompile("test-cluster"),
RE: *regexp.MustCompile("^test-cluster"),
},
},
},
Expand Down Expand Up @@ -226,7 +226,7 @@ func TestNukeMSKCluster(t *testing.T) {
Client: &mockMskClient,
}

err := msk.Nuke(nil, []string{})
err := msk.Nuke([]string{})
if err != nil {
t.Fatalf("Unable to nuke MSK Clusters: %v", err)
}
Expand Down
24 changes: 12 additions & 12 deletions aws/resources/msk_cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,40 @@ type MSKCluster struct {
ClusterArns []string
}

func (msk MSKCluster) Init(session *session.Session) {
msk.Client = kafka.New(session)
func (m *MSKCluster) Init(session *session.Session) {
m.Client = kafka.New(session)
}

// ResourceName - the simple name of the aws resource
func (msk MSKCluster) ResourceName() string {
func (m *MSKCluster) ResourceName() string {
return "msk-cluster"
}

// ResourceIdentifiers - The instance ids of the AWS Managed Streaming for Kafka clusters
func (msk MSKCluster) ResourceIdentifiers() []string {
return msk.ClusterArns
func (m *MSKCluster) ResourceIdentifiers() []string {
return m.ClusterArns
}

func (msk MSKCluster) MaxBatchSize() int {
func (m *MSKCluster) MaxBatchSize() int {
// Tentative batch size to ensure AWS doesn't throttle. Note that nat gateway does not support bulk delete, so
// we will be deleting this many in parallel using go routines. We conservatively pick 10 here, both to limit
// overloading the runtime and to avoid AWS throttling with many API calls.
return 10
}

func (msk MSKCluster) GetAndSetIdentifiers(c context.Context, configObj config.Config) ([]string, error) {
identifiers, err := msk.getAll(c, configObj)
func (m *MSKCluster) GetAndSetIdentifiers(c context.Context, configObj config.Config) ([]string, error) {
identifiers, err := m.getAll(c, configObj)
if err != nil {
return nil, err
}

msk.ClusterArns = awsgo.StringValueSlice(identifiers)
return msk.ClusterArns, nil
m.ClusterArns = awsgo.StringValueSlice(identifiers)
return m.ClusterArns, nil
}

// Nuke - nuke 'em all!!!
func (msk MSKCluster) Nuke(_ *session.Session, identifiers []string) error {
if err := msk.nukeAll(identifiers); err != nil {
func (m *MSKCluster) Nuke(identifiers []string) error {
if err := m.nukeAll(awsgo.StringSlice(identifiers)); err != nil {
return errors.WithStackTrace(err)
}

Expand Down

0 comments on commit 152f823

Please sign in to comment.