forked from cloud-custodian/cloud-custodian
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_put_metrics.py
109 lines (96 loc) · 3.72 KB
/
test_put_metrics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# Copyright The Cloud Custodian Authors.
# SPDX-License-Identifier: Apache-2.0
from c7n.utils import yaml_load
from .common import BaseTest
import logging
from pprint import pformat
logger = logging.getLogger(name="c7n.tests")
class PutMetricsTest(BaseTest):
record = False
EXAMPLE_EC2_POLICY = """
policies:
- name: track-attached-ebs
resource: ec2
comment: |
Put the count of the number of EBS attached disks to an instance
#filters:
# - Name: tracked-ec2-instance
actions:
- type: put-metric
key: BlockDeviceMappings[].DeviceName
namespace: Usage Metrics
metric_name: Attached Disks
dimensions:
- { a: b }
op: distinct_count
"""
EXAMPLE_S3_POLICY = """
policies:
- name: bucket-count
resource: s3
comment: |
Count all the buckets!
#filters:
# - Name: passthru
# type: value
# key: Name
# value: 0
actions:
- type: put-metric
key: Name
namespace: Usage Metrics
metric_name: S3 Buckets
op: count
"""
def _get_test_policy(self, name, yaml_doc, record=False):
if record:
logger.warn("TestPutMetrics is RECORDING")
session_factory = self.record_flight_data("test_cw_put_metrics_" + name)
else:
logger.debug("TestPutMetrics is replaying")
session_factory = self.replay_flight_data("test_cw_put_metrics_" + name)
policy = self.load_policy(
yaml_load(yaml_doc)["policies"][0], session_factory=session_factory
)
return policy
def _test_putmetrics_s3(self):
""" This test fails when replaying flight data due to an issue with placebo.
"""
policy = self._get_test_policy(
name="s3test", yaml_doc=self.EXAMPLE_S3_POLICY, record=self.record
)
resources = policy.run()
logger.debug(
"these are the results from the policy, assumed to be resources that were processed"
)
logger.debug(pformat(resources))
self.assertGreaterEqual(
len(resources), 1, "PutMetricsTest appears to have processed 0 resources."
)
def test_putmetrics_ec2(self):
policy = self._get_test_policy(
name="ec2test", yaml_doc=self.EXAMPLE_EC2_POLICY, record=self.record
)
resources = policy.run()
logger.debug(
"these are the results from the policy, assumed to be resources that were processed"
)
logger.debug(pformat(resources))
self.assertGreaterEqual(
len(resources),
1,
"PutMetricsTest appears to have processed 0 resources. "
"Are there any running ec2 instances?",
)
def test_putmetrics_permissions(self):
from c7n.actions import PutMetric
self.assertTrue("cloudwatch:PutMetricData" in PutMetric.permissions)
pma = PutMetric()
self.assertTrue("cloudwatch:PutMetricData" in pma.get_permissions())
def test_putmetrics_schema(self):
import jsonschema
from c7n.actions import PutMetric
data = yaml_load(self.EXAMPLE_EC2_POLICY)
action_schema = PutMetric.schema
res = jsonschema.validate(data["policies"][0]["actions"][0], action_schema)
self.assertIsNone(res, "PutMetric.schema failed to validate.")