-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathpolygon_utils.py
328 lines (280 loc) · 12.3 KB
/
polygon_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
import logging
import sys
from functools import wraps
from typing import Dict, List, Tuple, TypeVar
import numpy as np
from scipy.optimize import linear_sum_assignment
from shapely.geometry import Polygon
from nucleus.annotation import BoxAnnotation, PolygonAnnotation
from nucleus.prediction import BoxPrediction, PolygonPrediction
from .base import ScalarResult
from .errors import PolygonAnnotationTypeError
BoxOrPolygonPrediction = TypeVar(
"BoxOrPolygonPrediction", BoxPrediction, PolygonPrediction
)
BoxOrPolygonAnnotation = TypeVar(
"BoxOrPolygonAnnotation", BoxAnnotation, PolygonAnnotation
)
BoxOrPolygonAnnoOrPred = TypeVar(
"BoxOrPolygonAnnoOrPred",
BoxAnnotation,
PolygonAnnotation,
BoxPrediction,
PolygonPrediction,
)
def polygon_annotation_to_shape(
annotation: BoxOrPolygonAnnotation,
) -> Polygon:
if isinstance(annotation, BoxAnnotation):
xmin = annotation.x - annotation.width / 2
xmax = annotation.x + annotation.width / 2
ymin = annotation.y - annotation.height / 2
ymax = annotation.y + annotation.height / 2
return Polygon(
[(xmin, ymin), (xmax, ymin), (xmax, ymax), (xmin, ymax)]
)
elif isinstance(annotation, PolygonAnnotation):
return Polygon([(point.x, point.y) for point in annotation.vertices])
else:
raise PolygonAnnotationTypeError()
def _iou(annotation: Polygon, prediction: Polygon) -> float:
intersection = annotation.intersection(prediction).area
union = annotation.area + prediction.area - intersection
return intersection / max(union, sys.float_info.epsilon)
def _iou_matrix(
annotations: List[Polygon], predictions: List[Polygon]
) -> np.ndarray:
iou_matrix = np.empty((len(predictions), len(annotations)))
for i, prediction in enumerate(predictions):
for j, annotation in enumerate(annotations):
iou_matrix[i, j] = _iou(annotation, prediction)
return iou_matrix
def _iou_assignments_for_same_reference_id(
annotations: List[BoxOrPolygonAnnotation],
predictions: List[BoxOrPolygonPrediction],
iou_threshold: float,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
# Matches annotations and precitions of the same reference ID.
# Returns a tuple of the list of all IoU values of valid assignments, a
# list of the indices of predictions matched to annotations (-1 if
# unmatched), and a list of all indices of annotations matched to
# predictions.
# Check that all annotations and predictions have same reference ID.
reference_ids = set(annotation.reference_id for annotation in annotations)
reference_ids |= set(prediction.reference_id for prediction in predictions)
assert (
len(reference_ids) <= 1
), "Expected annotations and predictions to have same reference ID."
# Convert annotation and predictions to shapely.geometry.Polygon objects
polygon_annotations = list(map(polygon_annotation_to_shape, annotations))
polygon_predictions = list(map(polygon_annotation_to_shape, predictions))
invalid_anns = [
ann
for ann, poly in zip(annotations, polygon_annotations)
if not poly.is_valid
]
invalid_preds = [
pred
for pred, poly in zip(predictions, polygon_predictions)
if not poly.is_valid
]
if invalid_anns or invalid_preds:
# Filter out invalid polys
polygon_annotations = [
poly
for ann, poly in zip(annotations, polygon_annotations)
if poly.is_valid
]
polygon_predictions = [
poly
for pred, poly in zip(predictions, polygon_predictions)
if poly.is_valid
]
invalid_dataset_ids = set(
ann.reference_id for ann in invalid_anns
).union(set(pred.reference_id for pred in invalid_preds))
# TODO(gunnar): change to .id once field is surfaced)
logging.warning(
"Invalid polygons for dataset items: %s Annotations:%s, predictions: %s",
invalid_dataset_ids,
[a.id for a in invalid_anns],
[p.id for p in invalid_preds],
)
# Compute IoU matrix and set IoU values below the threshold to 0.
iou_matrix = _iou_matrix(polygon_annotations, polygon_predictions)
iou_matrix[iou_matrix < iou_threshold] = 0
# Match annotations and predictions using linear sum assignment and filter out
# values below the threshold.
matched_0, matched_1 = linear_sum_assignment(-iou_matrix)
iou_assigns = iou_matrix[matched_0, matched_1]
valid_idxes = iou_assigns >= iou_threshold
iou_assigns = iou_assigns[valid_idxes]
matched_0 = matched_0[valid_idxes]
matched_1 = matched_1[valid_idxes]
anno_to_pred = -np.ones(len(annotations))
pred_to_anno = -np.ones(len(predictions))
anno_to_pred[matched_1] = matched_0
pred_to_anno[matched_0] = matched_1
return iou_assigns, anno_to_pred, pred_to_anno
def group_boxes_or_polygons_by_reference_id(
annotations: List[BoxOrPolygonAnnotation],
predictions: List[BoxOrPolygonPrediction],
) -> Dict[
str, Tuple[List[BoxOrPolygonAnnotation], List[BoxOrPolygonPrediction]]
]:
"""Groups input annotations and predictions by reference_id.
Args:
annotations: list of input annotations
predictions: list of input predictions
Returns:
Mapping from each reference_id to (annotations, predictions) tuple.
"""
reference_ids = set(annotation.reference_id for annotation in annotations)
reference_ids |= set(prediction.reference_id for prediction in predictions)
grouped: Dict[
str, Tuple[List[BoxOrPolygonAnnotation], List[BoxOrPolygonPrediction]]
] = {reference_id: ([], []) for reference_id in reference_ids}
for annotation in annotations:
grouped[annotation.reference_id][0].append(annotation)
for prediction in predictions:
grouped[prediction.reference_id][1].append(prediction)
return grouped
def group_boxes_or_polygons_by_label(
annotations: List[BoxOrPolygonAnnotation],
predictions: List[BoxOrPolygonPrediction],
) -> Dict[
str, Tuple[List[BoxOrPolygonAnnotation], List[BoxOrPolygonPrediction]]
]:
"""Groups input annotations and predictions by label.
Args:
annotations: list of input box or polygon annotations
predictions: list of input box or polygon predictions
Returns:
Mapping from each label to (annotations, predictions) tuple
"""
labels = set(annotation.label for annotation in annotations)
labels |= set(prediction.label for prediction in predictions)
grouped: Dict[
str, Tuple[List[BoxOrPolygonAnnotation], List[BoxOrPolygonPrediction]]
] = {label: ([], []) for label in labels}
for annotation in annotations:
grouped[annotation.label][0].append(annotation)
for prediction in predictions:
grouped[prediction.label][1].append(prediction)
return grouped
def iou_assignments(
annotations: List[BoxOrPolygonAnnotation],
predictions: List[BoxOrPolygonPrediction],
iou_threshold: float,
) -> np.ndarray:
"""Matches annotations and predictions based on linear sum cost and returns the
intersection-over-union values of the matched annotation-prediction pairs, subject
to the specified IoU threshold. Note that annotations and predictions from
different reference_ids will not be matched with one another.
See https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.linear_sum_assignment.html
Args:
annotations: list of box or polygon annotations
predictions: list of box or polygon predictions
iou_threshold: the intersection-over-union threshold for an
annotation-prediction pair to be considered a match.
Returns:
1D numpy array that contains the IoU values of the matched pairs.
"""
grouped_inputs = group_boxes_or_polygons_by_reference_id(
annotations, predictions
)
iou_assigns = []
for grouped_annotations, grouped_predictions in grouped_inputs.values():
result_per_reference_id, _, _ = _iou_assignments_for_same_reference_id(
grouped_annotations, grouped_predictions, iou_threshold
)
iou_assigns.append(result_per_reference_id)
return np.concatenate(iou_assigns)
def get_true_false_positives_confidences(
annotations: List[BoxOrPolygonAnnotation],
predictions: List[BoxOrPolygonPrediction],
iou_threshold: float,
) -> Tuple[np.ndarray, np.ndarray]:
"""Matches annotations and predictions based on linear sum cost and returns the
intersection-over-union values of the matched annotation-prediction pairs, subject
to the specified IoU threshold. Note that annotations and predictions from
different reference_ids will not be matched with one another.
See https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.linear_sum_assignment.html
Args:
annotations: list of box or polygon annotations
predictions: list of box or polygon predictions
iou_threshold: the intersection-over-union threshold for an
annotation-prediction pair to be considered a match.
Returns:
1D numpy array that contains the 1 if true positive and 0 if false positive
for each prediction.
1D numpy array of confidence values.
"""
grouped_inputs = group_boxes_or_polygons_by_reference_id(
annotations, predictions
)
true_false_positives = []
confidences = []
for grouped_annotations, grouped_predictions in grouped_inputs.values():
_, _, pred_to_anno = _iou_assignments_for_same_reference_id(
grouped_annotations, grouped_predictions, iou_threshold
)
true_false_positives.append(pred_to_anno > -1)
confidences.extend([pred.confidence for pred in grouped_predictions])
return np.concatenate(true_false_positives), np.array(confidences)
def num_true_positives(
annotations: List[BoxOrPolygonAnnotation],
predictions: List[BoxOrPolygonPrediction],
iou_threshold: float,
) -> int:
"""Counts the number of annotations with a matching prediction.
A prediction is considered a match for an annotation if it has not yet been
matched to another annotation, its reference_id is the same as the
annotation, and its IoU with the annotation is at least the iou_threshold.
Args:
annotations: list of box or polygon annotations
predictions: list of box or polygon predictions
iou_threshold: the intersection-over-union threshold for an
annotation-prediction pair to be considered a match.
Returns:
The number of true positives (predictions that are matched to annotations).
"""
iou_assigns = iou_assignments(annotations, predictions, iou_threshold)
true_positives = len(iou_assigns)
return true_positives
def label_match_wrapper(metric_fn):
"""Decorator to add the ability to only apply metric to annotations and
predictions with matching labels.
Args:
metric_fn: Metric function that takes a list of annotations, a list
of predictions, and optional args and kwargs.
Returns:
Metric function which can optionally enforce matching labels.
"""
@wraps(metric_fn)
def wrapper(
annotations: List[BoxOrPolygonAnnotation],
predictions: List[BoxOrPolygonPrediction],
*args,
enforce_label_match: bool = False,
**kwargs,
) -> ScalarResult:
# Simply return the metric if we are not enforcing label matches.
if not enforce_label_match:
return metric_fn(annotations, predictions, *args, **kwargs)
# For each bin of annotations/predictions, compute the metric applied
# only to that bin. Then aggregate results across all bins.
grouped_inputs = group_boxes_or_polygons_by_label(
annotations, predictions
)
metric_results = []
for binned_annotations, binned_predictions in grouped_inputs.values():
metric_result = metric_fn(
binned_annotations, binned_predictions, *args, **kwargs
)
metric_results.append(metric_result)
assert all(
isinstance(r, ScalarResult) for r in metric_results
), "Expected every result to be a ScalarResult"
return ScalarResult.aggregate(metric_results)
return wrapper