forked from cloud-custodian/cloud-custodian
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_quotas.py
130 lines (112 loc) · 4.99 KB
/
test_quotas.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# Copyright The Cloud Custodian Authors.
# SPDX-License-Identifier: Apache-2.0
#
from datetime import timedelta
from c7n.executor import MainThreadExecutor
from c7n.resources.quotas import ServiceQuota, UsageFilter
from c7n.utils import local_session
from .common import BaseTest
class TestQuotas(BaseTest):
def setUp(self):
super().setUp()
self.patch(ServiceQuota, "executor_factory", MainThreadExecutor)
def test_service_quota_request_history_filter(self):
session_factory = self.replay_flight_data('test_service_quota')
p = self.load_policy({
"name": "service-quota-history-filter",
"resource": "aws.service-quota",
"filters": [{
"type": "request-history",
"key": "[].Status",
"value": "CASE_CLOSED",
"op": "in",
"value_type": "swap"}
]},
session_factory=session_factory
)
resources = p.run()
self.assertTrue(resources)
def test_service_quota_request_increase(self):
session_factory = self.replay_flight_data('test_service_quota')
p = self.load_policy({
"name": "service-quota-request-increase",
"resource": "aws.service-quota",
"filters": [{
"QuotaCode": "L-355B2B67"}],
"actions": [{
"type": "request-increase",
"multiplier": 1.2}
]},
session_factory=session_factory)
resources = p.run()
self.assertEqual(len(resources), 1)
client = local_session(session_factory).client('service-quotas')
changes = client.list_requested_service_quota_change_history_by_quota(
ServiceCode=resources[0]['ServiceCode'],
QuotaCode=resources[0]['QuotaCode']
)['RequestedQuotas']
self.assertTrue(changes)
# Given the ServiceQuota.augment.get_quotas is a nested function,can't patch it;
# This test case is the best we can do at the moment.
def test_service_quota_metadata_incl_filter(self):
session_factory = self.replay_flight_data('test_service_quota')
p = self.load_policy({
"name": "service-quota-metaddata-filter",
"resource": "aws.service-quota",
"query": [{"include_service_codes": ["ec2"]}],
},
session_factory=session_factory)
resources = p.run()
# called ListAWSDefaultServiceQuotas once, 6 quotas returned
# called servicequotas.ListServiceQuotas once, 2 quotas returned
assert len(resources) == 8
def test_service_quota_metadata_excl_filter(self):
session_factory = self.replay_flight_data('test_service_quota')
p = self.load_policy({
"name": "service-quota-metaddata-filter",
"resource": "aws.service-quota",
"query": [{"exclude_service_codes": ["logs"]}],
},
session_factory=session_factory)
resources = p.run()
# called ListAWSDefaultServiceQuotas twice, 6x2 quotas returned
# called servicequotas.ListServiceQuotas twice, 2+1 quotas returned
assert len(resources) == 15
def test_usage_metric_filter(self):
session_factory = self.replay_flight_data('test_service_quota')
p = self.load_policy({
"name": "service-quota-usage-metric",
"resource": "aws.service-quota",
"filters": [
{"UsageMetric": "present"},
{"type": "usage-metric",
"min_period": 60,
"limit": 20}
]},
session_factory=session_factory)
resources = p.run()
self.assertEqual(len(resources), 2)
def test_usage_filter_round_up(self):
filter = UsageFilter({})
self.assertEqual(filter.round_up(1, 60), 60)
self.assertEqual(filter.round_up(59, 60), 60)
self.assertEqual(filter.round_up(60, 60), 60)
self.assertEqual(filter.round_up(61, 60), 120)
def test_usage_filter_scale_period(self):
filter = UsageFilter({})
# The provided period is smaller than the minimum one
scaled_period, scale = filter.scale_period(timedelta(1).total_seconds(), 1, 60)
self.assertEqual(scaled_period, 60)
self.assertEqual(scale, 60)
# The provided period generates too many data points
scaled_period, scale = filter.scale_period(timedelta(1).total_seconds(), 1, 1)
self.assertEqual(scaled_period, 120)
self.assertEqual(scale, 120)
# The provided is not aligned with AWS pre-defined periods
scaled_period, scale = filter.scale_period(timedelta(hours=1).total_seconds(), 30, 1)
self.assertEqual(scaled_period, 60)
self.assertEqual(scale, 2)
# The provided period is accepted as-is not generating too many data points
scaled_period, scale = filter.scale_period(timedelta(1).total_seconds(), 300, 60)
self.assertEqual(scaled_period, 300)
self.assertEqual(scale, 1)