This repository was archived by the owner on Aug 31, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathparser.py
140 lines (106 loc) · 4.33 KB
/
parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""Helper to parse JSON/YAML SAM template and dump YAML files."""
import copy
import json
from collections import OrderedDict
import six
import yaml
from yaml.resolver import ScalarNode, SequenceNode
from .application_metadata import ApplicationMetadata
from .exceptions import ApplicationMetadataNotFoundError
METADATA = "Metadata"
SERVERLESS_REPO_APPLICATION = "AWS::ServerlessRepo::Application"
def intrinsics_multi_constructor(loader, tag_prefix, node):
"""
YAML constructor to parse CloudFormation intrinsics.
:return: a dictionary with key being the instrinsic name
"""
# Get the actual tag name excluding the first exclamation
tag = node.tag[1:]
# Some intrinsic functions doesn't support prefix "Fn::"
prefix = "Fn::"
if tag in ["Ref", "Condition"]:
prefix = ""
cfntag = prefix + tag
if tag == "GetAtt" and isinstance(node.value, six.string_types):
# ShortHand notation for !GetAtt accepts Resource.Attribute format
# while the standard notation is to use an array
# [Resource, Attribute]. Convert shorthand to standard format
value = node.value.split(".", 1)
elif isinstance(node, ScalarNode):
# Value of this node is scalar
value = loader.construct_scalar(node)
elif isinstance(node, SequenceNode):
# Value of this node is an array (Ex: [1,2])
value = loader.construct_sequence(node)
else:
# Value of this node is an mapping (ex: {foo: bar})
value = loader.construct_mapping(node)
return {cfntag: value}
def _dict_representer(dumper, data):
return dumper.represent_dict(data.items())
def yaml_dump(dict_to_dump):
"""
Dump the dictionary as a YAML document.
:param dict_to_dump: Data to be serialized as YAML
:type dict_to_dump: dict
:return: YAML document
:rtype: str
"""
yaml.SafeDumper.add_representer(OrderedDict, _dict_representer)
return yaml.safe_dump(dict_to_dump, default_flow_style=False)
def _dict_constructor(loader, node):
return OrderedDict(loader.construct_pairs(node))
def parse_template(template_str):
"""
Parse the SAM template.
:param template_str: A packaged YAML or json CloudFormation template
:type template_str: str
:return: Dictionary with keys defined in the template
:rtype: dict
"""
try:
# PyYAML doesn't support json as well as it should, so if the input
# is actually just json it is better to parse it with the standard
# json parser.
return json.loads(template_str, object_pairs_hook=OrderedDict)
except ValueError:
yaml.SafeLoader.add_constructor(
yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, _dict_constructor
)
yaml.SafeLoader.add_multi_constructor("!", intrinsics_multi_constructor)
return yaml.safe_load(template_str)
def get_app_metadata(template_dict):
"""
Get the application metadata from a SAM template.
:param template_dict: SAM template as a dictionary
:type template_dict: dict
:return: Application metadata as defined in the template
:rtype: ApplicationMetadata
:raises ApplicationMetadataNotFoundError
"""
if SERVERLESS_REPO_APPLICATION in template_dict.get(METADATA, {}):
app_metadata_dict = template_dict.get(METADATA).get(SERVERLESS_REPO_APPLICATION)
return ApplicationMetadata(app_metadata_dict)
raise ApplicationMetadataNotFoundError(
error_message="missing {} section in template Metadata".format(
SERVERLESS_REPO_APPLICATION
)
)
def strip_app_metadata(template_dict):
"""
Strip the "AWS::ServerlessRepo::Application" metadata section from template.
:param template_dict: SAM template as a dictionary
:type template_dict: dict
:return: stripped template content
:rtype: str
"""
if SERVERLESS_REPO_APPLICATION not in template_dict.get(METADATA, {}):
return template_dict
template_dict_copy = copy.deepcopy(template_dict)
# strip the whole metadata section if SERVERLESS_REPO_APPLICATION is the only key in it
metadata = template_dict_copy.get(METADATA)
if not any(k for k in metadata if k != SERVERLESS_REPO_APPLICATION):
template_dict_copy.pop(METADATA, None)
else:
template_dict_copy.get(METADATA).pop(SERVERLESS_REPO_APPLICATION, None)
return template_dict_copy