-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathresolver.py
195 lines (148 loc) · 5.66 KB
/
resolver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import ast
from fields import Field, Fields
class Resolver(object):
"""
Wrapper class for static methods to resolve various ast nodes and more
complex datatypes to their respective data.
"""
@staticmethod
def resolve(node):
node_type = type(node).__name__.split('.')[-1]
method = getattr(Resolver, node_type)
return method(node)
@staticmethod
def Call(field_node):
try:
func = field_node.func
except AttributeError:
print field_node
# e.g. serializers.CharField
if isinstance(func, ast.Attribute):
# Attribute has value: Name
# attr : str
return Resolver.Attribute(func)
else:
return func.id
@staticmethod
def Attribute(node):
rhs = node.attr
lhs = Resolver.resolve(node.value)
return '.'.join([lhs, rhs])
@staticmethod
def Assign(node):
# TODO: We don't usually do multi assignments
target = node.targets[0]
if hasattr(target, 'id'):
lhs = target.id
else:
# This is a Subscript node and should probably be represented
# in a better way to demonstrate that.
lhs = Resolver.resolve(target.slice)
rhs = node.value
return (lhs, rhs)
@staticmethod
def Name(node):
return node.id
@staticmethod
def Str(node):
return node.s
@staticmethod
def Num(node):
return node.n
@staticmethod
def List(node):
return [Resolver.resolve(x) for x in node.elts]
@staticmethod
def Index(node):
return Resolver.resolve(node.value)
@staticmethod
def Tuple(node):
return tuple(Resolver.resolve(x) for x in node.elts)
@staticmethod
def keywords(keywords):
# each keyword is a `keyword` with arg: str and value: node
return {
keyword.arg: Resolver.resolve(keyword.value)
for keyword in keywords
}
@staticmethod
def func_params(field_node):
field = Field()
if field_node.args:
field['args'] = Resolver.resolve(field_node.args)
field.update(**Resolver.keywords(field_node.keywords))
return field
@staticmethod
def drf_field_assignment(node):
field_name, rhs = Resolver.resolve(node)
# TODO: we don't care about other class variables.
if not isinstance(rhs, ast.Call):
return None
return Resolver.parse_drf_field_node(field_name, rhs)
@staticmethod
def drf_meta_fields(meta_node):
def resolve_fields(fields_node, read_only=False):
fields = []
known_types = [ast.Tuple, ast.List, ast.Set]
if isinstance(fields_node, ast.BinOp):
assert isinstance(fields_node.op, ast.Add)
# if either is Attribute, resolve_fields returns [] which is fine
# since it will be handled by the logic in bases
return resolve_fields(fields_node.left) + resolve_fields(fields_node.right)
if any(isinstance(fields_node, t) for t in known_types):
for field_node in fields_node.elts:
field = Field(field_name=Resolver.resolve(field_node),
read_only=read_only)
fields.append(field)
return fields
fields = Fields()
for node in meta_node.body:
if not isinstance(node, ast.Assign):
continue
lhs, rhs = Resolver.resolve(node)
if lhs == 'fields':
fields.extend(resolve_fields(rhs))
elif lhs == 'read_only_fields':
fields.extend(resolve_fields(rhs, read_only=True))
return fields
@staticmethod
def parse_drf_field_node(field_name, field_node):
return Field(field_name=field_name,
func_name=Resolver.resolve(field_node),
**Resolver.func_params(field_node))
@staticmethod
def is_filter_conditional(node):
return (
isinstance(node, ast.If) and
# TODO: Might need to support more test types. This isn't flexible.
# Currently it only returns true when testing a variable name or
# an unary operation like `if some_var` or `if not some_var`.
(isinstance(node.test, ast.Name) or isinstance(node.test, ast.UnaryOp))
)
@staticmethod
def is_assignment_to_field(node):
return (
isinstance(node, ast.Assign) and
isinstance(node.targets[0], ast.Subscript) and
isinstance(node.targets[0].value, ast.Attribute) and
node.targets[0].value.attr == 'fields'
)
@staticmethod
def init_method(init_node):
fields = Fields()
# TODO: This method currently only detects field assignments
# It needs to also support field deletions eventually.
for init_body_node in init_node.body:
if not Resolver.is_filter_conditional(init_body_node):
continue
for if_body_node in init_body_node.body:
if not Resolver.is_assignment_to_field(if_body_node):
continue
filter_name = Resolver.resolve(init_body_node.test)
field = Resolver.drf_field_assignment(if_body_node)
# TODO: Sometimes fields are assigned to temporary variables
# before being assigned to the actual field.
if not field:
continue
fields.add_representation(field['field_name'], filter_name, field)
return fields