-
Notifications
You must be signed in to change notification settings - Fork 40
/
Copy patharrayconnection.py
271 lines (223 loc) · 8.17 KB
/
arrayconnection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
from ..utils import base64, unbase64, is_str
from .connectiontypes import Connection, PageInfo, Edge
def connection_from_list(data, args=None, **kwargs):
"""
A simple function that accepts an array and connection arguments, and returns
a connection object for use in GraphQL. It uses array offsets as pagination,
so pagination will only work if the array is static.
"""
_len = len(data)
return connection_from_list_slice(
data,
args,
slice_start=0,
list_length=_len,
list_slice_length=_len,
**kwargs
)
def connection_from_promised_list(data_promise, args=None, **kwargs):
"""
A version of `connectionFromArray` that takes a promised array, and returns a
promised connection.
"""
return data_promise.then(lambda data: connection_from_list(data, args, **kwargs))
def connection_from_list_slice(list_slice, args=None, connection_type=None,
edge_type=None, pageinfo_type=None,
slice_start=0, list_length=0, list_slice_length=None):
"""
Given a slice (subset) of an array, returns a connection object for use in
GraphQL.
This function is similar to `connectionFromArray`, but is intended for use
cases where you know the cardinality of the connection, consider it too large
to materialize the entire array, and instead wish pass in a slice of the
total result large enough to cover the range specified in `args`.
"""
connection_type = connection_type or Connection
edge_type = edge_type or Edge
pageinfo_type = pageinfo_type or PageInfo
args = args or {}
before = args.get('before')
after = args.get('after')
first = args.get('first')
last = args.get('last')
if list_slice_length is None:
list_slice_length = len(list_slice)
slice_end = slice_start + list_slice_length
before_offset = get_offset_with_default(before, list_length)
after_offset = get_offset_with_default(after, -1)
start_offset = max(
slice_start - 1,
after_offset,
-1
) + 1
end_offset = min(
slice_end,
before_offset,
list_length
)
if isinstance(first, int):
end_offset = min(
end_offset,
start_offset + first
)
if isinstance(last, int):
start_offset = max(
start_offset,
end_offset - last
)
# If supplied slice is too large, trim it down before mapping over it.
_slice = list_slice[
max(start_offset - slice_start, 0):
list_slice_length - (slice_end - end_offset)
]
edges = [
edge_type(
node=node,
cursor=offset_to_cursor(start_offset + i)
)
for i, node in enumerate(_slice)
]
first_edge_cursor = edges[0].cursor if edges else None
last_edge_cursor = edges[-1].cursor if edges else None
lower_bound = after_offset + 1 if after else 0
upper_bound = before_offset if before else list_length
return connection_type(
edges=edges,
page_info=pageinfo_type(
start_cursor=first_edge_cursor,
end_cursor=last_edge_cursor,
has_previous_page=isinstance(last, int) and start_offset > lower_bound,
has_next_page=isinstance(first, int) and end_offset < upper_bound
)
)
def connection_from_list_slice_lazy(list_slice, args=None, connection_type=None,
edge_type=None, pageinfo_type=None,
slice_start=0, list_length=0, list_slice_length=None):
'''
Given an iterator it consumes the needed amount based on the pagination
params, and also tries to figure out if there are more results to be
consumed. We do so by trying to fetch one more element than the specified
amount, if we were able to fetch n+1 it means there are more to be consumed.
This spares the caller passing the total count of results, which
usually means making an extra query just to find out that number.
'''
connection_type = connection_type or Connection
edge_type = edge_type or Edge
pageinfo_type = pageinfo_type or PageInfo
args = args or {}
before = args.get('before')
after = args.get('after')
first = args.get('first')
last = args.get('last')
first_is_negative = type(first) == int and first < 0
last_is_negative = type(last) == int and last < 0
start_offset = slice_start
end_offset = None
# Validations
if first_is_negative or last_is_negative:
raise ValueError("first and last can't be negative values")
if first and last:
raise ValueError(
"Including a value for both first and last is strongly discouraged,"
" as it is likely to lead to confusing queries and results"
)
if before and last is None:
raise ValueError(
"before without last is not supported"
)
if after and first is None:
raise ValueError(
"after without first is not supported"
)
if before and after:
raise ValueError(
"Mixing before and after is not supported"
)
if type(first) == int:
after_offset = get_offset_with_default(after, -1)
start_offset = after_offset + 1
end_offset = start_offset + first
elif type(last) == int:
if before:
before_offset = get_offset_with_default(before)
else:
try:
before_offset = int(list_slice.count())
except (TypeError, AttributeError):
before_offset = len(list_slice)
end_offset = before_offset
start_offset = before_offset - last if before_offset - last > 0 else 0
# Slice with an extra item to figure out if there's a next page
_slice = list_slice[start_offset : end_offset + 1]
has_next_page = False
if (first and len(_slice) > first) or (last and len(_slice) > last):
has_next_page = True
del _slice[-1]
edges = [
edge_type(
node=node,
cursor=offset_to_cursor(start_offset + index)
)
for index, node in enumerate(_slice)
]
# To know if there are more pages we just check if there is n+1 element
# In this case it would just be the end_offset element because array indexes
# start from zero
extra_item = None
try:
extra_item = list_slice[end_offset]
has_next_page = True
except (IndexError, TypeError):
has_next_page = False
if extra_item is None:
has_next_page = False
has_previous_page = start_offset > 0
first_edge_cursor = edges[0].cursor if edges else None
last_edge_cursor = edges[-1].cursor if edges else None
return connection_type(
edges=edges,
page_info=pageinfo_type(
start_cursor=first_edge_cursor,
end_cursor=last_edge_cursor,
has_previous_page=has_previous_page,
has_next_page=has_next_page
)
)
PREFIX = 'arrayconnection:'
def connection_from_promised_list_slice(data_promise, args=None, **kwargs):
return data_promise.then(
lambda data: connection_from_list_slice(data, args, **kwargs))
def offset_to_cursor(offset):
"""
Creates the cursor string from an offset.
"""
return base64(PREFIX + str(offset))
def cursor_to_offset(cursor):
"""
Rederives the offset from the cursor string.
"""
try:
return int(unbase64(cursor)[len(PREFIX):])
except Exception:
return None
def cursor_for_object_in_connection(data, _object):
"""
Return the cursor associated with an object in an array.
"""
if _object not in data:
return None
offset = data.index(_object)
return offset_to_cursor(offset)
def get_offset_with_default(cursor=None, default_offset=0):
"""
Given an optional cursor and a default offset, returns the offset
to use; if the cursor contains a valid offset, that will be used,
otherwise it will be the default.
"""
if not is_str(cursor):
return default_offset
offset = cursor_to_offset(cursor)
try:
return int(offset)
except Exception:
return default_offset