forked from cloud-custodian/cloud-custodian
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_appflow.py
73 lines (67 loc) · 2.34 KB
/
test_appflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# Copyright The Cloud Custodian Authors.
# SPDX-License-Identifier: Apache-2.0
from .common import BaseTest
class AppFlowTests(BaseTest):
def test_appflow_tag(self):
session_factory = self.replay_flight_data('test_appflow_tag')
new_tag = {'lob': 'overhead'}
p = self.load_policy(
{
'name': 'app-flow',
'resource': 'app-flow',
'filters': [{
'tag:lob': 'absent'
}],
'actions': [{
'type': 'tag',
'tags': new_tag
}]
},
session_factory=session_factory
)
resources = p.run()
self.assertEqual(1, len(resources))
flow_name = resources[0].get('flowName')
appflow = session_factory().client('appflow')
call = appflow.describe_flow(flowName=flow_name)
self.assertEqual(new_tag, call.get('tags'))
def test_appflow_untag(self):
session_factory = self.replay_flight_data('test_appflow_untag')
p = self.load_policy(
{
'name': 'app-flow',
'resource': 'app-flow',
'filters': [{
'tag:lob': 'overhead'
}],
'actions': [{
'type': 'remove-tag',
'tags': ['lob']
}],
},
session_factory=session_factory
)
resources = p.run()
self.assertEqual(1, len(resources))
flow_name = resources[0].get('flowName')
appflow = session_factory().client('appflow')
call = appflow.describe_flow(flowName=flow_name)
self.assertEqual({}, call.get('tags'))
def test_appflow_delete(self):
session_factory = self.replay_flight_data('test_appflow_delete')
p = self.load_policy(
{
'name': 'app-flow',
'resource': 'app-flow',
'actions': [{
'type': 'delete',
'force': True
}]
},
session_factory=session_factory
)
resources = p.run()
self.assertEqual(1, len(resources))
appflow = session_factory().client('appflow')
call = appflow.list_flows(maxResults=1)
self.assertEqual([], call.get('flows'))