-
Notifications
You must be signed in to change notification settings - Fork 41
/
Copy pathgeometry.py
962 lines (803 loc) · 33.7 KB
/
geometry.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
from __future__ import annotations
import copy
from collections import ChainMap
from collections.abc import Hashable, Sequence
from dataclasses import dataclass
import numpy as np
import pandas as pd
import xarray as xr
from numpy.typing import ArrayLike
GEOMETRY_CONTAINER_NAME = "geometry_container"
FEATURES_DIM_NAME = "features"
__all__ = [
"decode_geometries",
"encode_geometries",
"cf_to_shapely",
"shapely_to_cf",
]
# Useful convention language:
# 1. Whether linked to normal CF space-time coordinates with a nodes attribute or not, inclusion of such coordinates is
# recommended to maintain backward compatibility with software that has not implemented geometry capabilities.
# 2. The geometry node coordinate variables must each have an axis attribute whose allowable values are X, Y, and Z.
# 3. If a coordinates attribute is carried by the geometry container variable or its parent data variable, then those coordinate variables
# that have a meaningful correspondence with node coordinates are indicated as such by a nodes attribute that names the corresponding node
# coordinates, but only if the grid_mapping associated the geometry node variables is the same as that of the coordinate variables.
# If a different grid mapping is used, then the provided coordinates must not have the nodes attribute.
#
# Interpretation:
# 1. node coordinates are exact; the 'normal' coordinates are a reasonable value to use, if you do not know how to interpret the nodes.
@dataclass
class GeometryNames:
"""Helper class to ease handling of all the variable names needed for CF geometries."""
def __init__(
self,
suffix: str = "",
grid_mapping_name: str | None = None,
grid_mapping: str | None = None,
):
self.container_name: str = GEOMETRY_CONTAINER_NAME + suffix
self.node_dim: str = "node" + suffix
self.node_count: str = "node_count" + suffix
self.node_coordinates_x: str = "x" + suffix
self.node_coordinates_y: str = "y" + suffix
self.coordinates_x: str = "crd_x" + suffix
self.coordinates_y: str = "crd_y" + suffix
self.part_node_count: str = "part_node_count" + suffix
self.part_dim: str = "part" + suffix
self.interior_ring: str = "interior_ring" + suffix
self.attrs_x: dict[str, str] = {}
self.attrs_y: dict[str, str] = {}
self.grid_mapping_attr = {"grid_mapping": grid_mapping} if grid_mapping else {}
# Special treatment of selected grid mappings
if grid_mapping_name in ["latitude_longitude", "rotated_latitude_longitude"]:
# Special case for longitude_latitude type grid mappings
self.coordinates_x = "lon"
self.coordinates_y = "lat"
if grid_mapping_name == "latitude_longitude":
self.attrs_x = dict(units="degrees_east", standard_name="longitude")
self.attrs_y = dict(units="degrees_north", standard_name="latitude")
elif grid_mapping_name == "rotated_latitude_longitude":
self.attrs_x = dict(
units="degrees_east", standard_name="grid_longitude"
)
self.attrs_y = dict(
units="degrees_north", standard_name="grid_latitude"
)
elif grid_mapping_name is not None:
self.attrs_x = dict(standard_name="projection_x_coordinate")
self.attrs_y = dict(standard_name="projection_y_coordinate")
self.attrs_x.update(self.grid_mapping_attr)
self.attrs_y.update(self.grid_mapping_attr)
@property
def geometry_container_attrs(self) -> dict[str, str]:
return {
"node_count": self.node_count,
"node_coordinates": f"{self.node_coordinates_x} {self.node_coordinates_y}",
"coordinates": f"{self.coordinates_x} {self.coordinates_y}",
**self.grid_mapping_attr,
}
def coords(
self,
*,
dim: Hashable,
x: ArrayLike,
y: ArrayLike,
crdX: ArrayLike | None = None,
crdY: ArrayLike | None = None,
) -> dict[str, xr.DataArray]:
"""
Construct coordinate DataArrays for the numpy data (x, y, crdX, crdY)
Parameters
----------
x: array
Node coordinates for X coordinate
y: array
Node coordinates for Y coordinate
crdX: array, optional
Nominal X coordinate
crdY: array, optional
Nominal X coordinate
"""
mapping = {
self.node_coordinates_x: xr.DataArray(
x, dims=self.node_dim, attrs={"axis": "X", **self.attrs_x}
),
self.node_coordinates_y: xr.DataArray(
y, dims=self.node_dim, attrs={"axis": "Y", **self.attrs_y}
),
}
if crdX is not None:
mapping[self.coordinates_x] = xr.DataArray(
crdX,
dims=(dim,),
attrs={"nodes": self.node_coordinates_x, **self.attrs_x},
)
if crdY is not None:
mapping[self.coordinates_y] = xr.DataArray(
crdY,
dims=(dim,),
attrs={"nodes": self.node_coordinates_y, **self.attrs_y},
)
return mapping
def _assert_single_geometry_container(ds: xr.Dataset) -> Hashable:
container_names = _get_geometry_containers(ds)
if len(container_names) > 1:
raise ValueError(
"Only one geometry container is supported by cf_to_points. "
"To handle multiple geometries use `decode_geometries` instead."
)
(container_name,) = container_names
return container_name
def _get_geometry_containers(obj: xr.DataArray | xr.Dataset) -> list[Hashable]:
"""
Translate from key (either CF key or variable name) to its bounds' variable names.
This function interprets the ``geometry`` attribute on DataArrays.
Parameters
----------
obj : DataArray, Dataset
DataArray belonging to the coordinate to be checked
Returns
-------
List[str]
Variable name(s) in parent xarray object that are bounds of `key`
"""
if isinstance(obj, xr.DataArray):
obj = obj._to_temp_dataset()
variables = obj._variables
results = set()
for name, var in variables.items():
attrs_or_encoding = ChainMap(var.attrs, var.encoding)
if "geometry_type" in attrs_or_encoding:
results.update([name])
return list(results)
def decode_geometries(encoded: xr.Dataset) -> xr.Dataset:
"""
Decode CF encoded geometries to numpy object arrays containing shapely geometries.
Parameters
----------
encoded : Dataset
A Xarray Dataset containing encoded geometries.
Returns
-------
Dataset
A Xarray Dataset containing decoded geometries.
See Also
--------
shapely_to_cf
cf_to_shapely
encode_geometries
"""
containers = _get_geometry_containers(encoded)
if not containers:
raise NotImplementedError(
"No geometry container variables detected, none of the provided variables "
"have a `geometry_type` attribute."
)
todrop: list[Hashable] = []
decoded = xr.Dataset()
for container_name in containers:
enc_geom_var = encoded[container_name]
geom_attrs = enc_geom_var.attrs
# Grab the coordinates attribute
geom_attrs.update(enc_geom_var.encoding)
geom_var = cf_to_shapely(encoded, container=container_name).variable
todrop.extend(
(container_name,)
+ tuple(
s
for s in " ".join(
geom_attrs.get(attr, "")
for attr in [
"interior_ring",
"node_coordinates",
"node_count",
"part_node_count",
"coordinates",
]
).split(" ")
if s
)
)
name = geom_attrs.get("variable_name", None)
if name in encoded.dims:
decoded = decoded.assign_coords(
xr.Coordinates(coords={name: geom_var}, indexes={})
)
else:
decoded[name] = geom_var
decoded.update(encoded.drop_vars(todrop))
# Is this a good idea? We are deleting information.
# OTOH we have decoded it to a useful in-memory representation
for var in decoded._variables.values():
if var.attrs.get("geometry") in containers:
var.attrs.pop("geometry")
return decoded
def encode_geometries(ds: xr.Dataset):
"""
Encode any discovered geometry variables using the CF conventions.
Practically speaking, geometry variables are numpy object arrays where the first
element is a shapely geometry.
Parameters
----------
ds : Dataset
Dataset containing at least one geometry variable.
Returns
-------
Dataset
Where all geometry variables are encoded. The information in a single geometry
variable in the input is split across multiple variables in the returned Dataset
following the CF conventions.
See Also
--------
shapely_to_cf
cf_to_shapely
"""
from shapely import (
LineString,
MultiLineString,
MultiPoint,
MultiPolygon,
Point,
Polygon,
)
SHAPELY_TYPES = (
Point,
LineString,
Polygon,
MultiPoint,
MultiLineString,
MultiPolygon,
)
geom_var_names = [
name
for name, var in ds._variables.items()
if var.dtype == "O" and isinstance(var.data.flat[0], SHAPELY_TYPES)
]
if not geom_var_names:
return ds
if to_drop := set(geom_var_names) & set(ds._indexes):
# e.g. xvec GeometryIndex
ds = ds.drop_indexes(to_drop)
variables = {}
for name in geom_var_names:
# TODO: do we prefer this choice be invariant to number of geometry variables
suffix = "_" + str(name) if len(geom_var_names) > 1 else ""
container_name = GEOMETRY_CONTAINER_NAME + suffix
# If `name` is a dimension name, then we need to drop it. Otherwise we don't
# So set errors="ignore"
variables.update(
shapely_to_cf(ds[name], suffix=suffix)
.drop_vars(name, errors="ignore")
._variables
)
geom_var = ds[name]
more_updates = {}
for varname, var in ds._variables.items():
if varname == name:
continue
# TODO: this is incomplete. It works for vector data cubes where one of the geometry vars
# is a dimension coordinate.
if name in var.dims:
var = var.copy()
var._attrs = copy.deepcopy(var._attrs)
var.attrs["geometry"] = container_name
# The grid_mapping and coordinates attributes can be carried by the geometry container
# variable provided they are also carried by the data variables associated with the container.
if to_add := geom_var.attrs.get("coordinates", ""):
var.attrs["coordinates"] = var.attrs.get("coordinates", "") + to_add
more_updates[varname] = var
variables.update(more_updates)
# WARNING: cf-xarray specific convention.
# For vector data cubes, `name` is a dimension name.
# By encoding to CF, we have
# encoded the information in that variable across many different
# variables (e.g. node_count) with `name` as a dimension.
# We have to record `name` somewhere so that we reconstruct
# a geometry variable of the right name at decode-time.
variables[container_name].attrs["variable_name"] = name
encoded = xr.Dataset(variables).set_coords(
set(ds._coord_names) - set(geom_var_names)
)
return encoded
def reshape_unique_geometries(
ds: xr.Dataset,
geom_var: str = "geometry",
new_dim: str = FEATURES_DIM_NAME,
) -> xr.Dataset:
"""Reshape a dataset containing a geometry variable so that all unique features are
identified along a new dimension.
This function only makes sense if the dimension of the geometry variable has no coordinate,
or if that coordinate has repeated values for each geometry.
Parameters
----------
ds : xr.Dataset
A Dataset.
geom_var : string
Name of the variable in `ds` that contains the geometry objects of type shapely.geometry.
The variable must be 1D.
new_dim : string
Name of the new dimension in the returned object.
Returns
-------
Dataset
All variables sharing the dimension of `ds[geom_var]` are reshaped so that `new_dim`
as a length equal to the number of unique geometries.
"""
if ds[geom_var].ndim > 1:
raise ValueError(
f"The geometry variable must be 1D. Got ds[{geom_var}] with dims {ds[geom_var].dims}."
)
# Shapely objects are not hashable, thus np.unique cannot be used directly.
# This trick is stolen from geopandas.
_, unique_indexes, inv_indexes = np.unique(
[g.wkb for g in ds[geom_var].values], return_index=True, return_inverse=True
)
old_name = ds[geom_var].dims[0]
if old_name in ds.coords:
old_values = ds[old_name].values
else:
# A dummy coord, a kind of counter, independent for each unique geometries
old_values = np.array(
[(inv_indexes[:i] == ind).sum() for i, ind in enumerate(inv_indexes)]
)
multi_index = pd.MultiIndex.from_arrays(
(inv_indexes, old_values), names=(new_dim, old_name)
)
temp_name = "__temp_multi_index__"
out = ds.rename({old_name: temp_name})
out[temp_name] = multi_index
out = out.unstack(temp_name)
# geom_var was reshaped also, reconstruct it from the unique values.
unique_indexes = xr.DataArray(unique_indexes, dims=(new_dim,))
out[geom_var] = ds[geom_var].isel({old_name: unique_indexes})
if old_name not in ds.coords:
# If there was no coord before, drop the dummy one we made.
out = out.drop_vars(old_name) # type: ignore[arg-type,unused-ignore] # Hashable/str stuff
return out
def shapely_to_cf(
geometries: xr.DataArray | Sequence,
grid_mapping: str | None = None,
*,
suffix: str = "",
):
"""
Convert a DataArray with shapely geometry objects into a CF-compliant dataset.
Parameters
----------
geometries : sequence of shapely geometries or xarray.DataArray
A sequence of geometry objects or a Dataset with a "geometry" variable storing such geometries.
All geometries must be of the same base type : Point, Line or Polygon, but multipart geometries are accepted.
grid_mapping : str, optional
A CF grid mapping name. When given, coordinates and attributes are named and set accordingly.
Defaults to None, in which case the coordinates are simply names "crd_x" and "crd_y".
container_name: str, optional
Name for the "geometry container" scalar variable in the encoded Dataset
Returns
-------
xr.Dataset
A dataset with shapely geometry objects translated into CF-compliant variables :
- 'x', 'y' : the node coordinates
- 'crd_x', 'crd_y' : the feature coordinates (might have different names if `grid_mapping` is available).
- 'node_count' : The number of nodes per feature. Always present for Lines and Polygons. For Points: only present if there are multipart geometries.
- 'part_node_count' : The number of nodes per individual geometry. Only for Lines with multipart geometries and for Polygons with multipart geometries or holes.
- 'interior_ring' : Integer boolean indicating whether rings are interior or exterior. Only for Polygons with holes.
- container_name : Empty variable with attributes describing the geometry type.
References
----------
Please refer to the CF conventions document: http://cfconventions.org/Data/cf-conventions/cf-conventions-1.8/cf-conventions.html#geometries
"""
if isinstance(geometries, xr.DataArray) and grid_mapping is not None:
raise DeprecationWarning(
"Explicitly passing `grid_mapping` with DataArray of geometries is deprecated. "
"Please set a `grid_mapping` attribute on `geometries`, ",
"and set the grid mapping variable as a coordinate",
)
as_data = geometries.data if isinstance(geometries, xr.DataArray) else geometries
type_ = as_data[0].geom_type
grid_mapping_varname = None
if (
grid_mapping is None
and isinstance(geometries, xr.DataArray)
and (grid_mapping_varname := geometries.attrs.get("grid_mapping"))
):
if grid_mapping_varname in geometries.coords:
# Not all CRS can be encoded in CF
grid_mapping = geometries.coords[grid_mapping_varname].attrs.get(
"grid_mapping_name", None
)
# TODO: consider accepting a GeometryNames instance from the user instead
names = GeometryNames(
suffix=suffix, grid_mapping_name=grid_mapping, grid_mapping=grid_mapping_varname
)
try:
if type_ in ["Point", "MultiPoint"]:
ds = points_to_cf(geometries, names=names)
elif type_ in ["LineString", "MultiLineString"]:
ds = lines_to_cf(geometries, names=names)
elif type_ in ["Polygon", "MultiPolygon"]:
ds = polygons_to_cf(geometries, names=names)
else:
raise ValueError(
f"This geometry type is not supported in CF-compliant datasets. Got {type_}"
)
except NotImplementedError as e:
raise ValueError(
"Error converting geometries. Possibly you have provided mixed geometry types."
) from e
return ds
def cf_to_shapely(ds: xr.Dataset, *, container: Hashable = GEOMETRY_CONTAINER_NAME):
"""
Convert geometries stored in a CF-compliant way to shapely objects stored in a single variable.
Parameters
----------
ds : xr.Dataset
Must contain a ``geometry_container`` variable with attributes giving the geometry specifications.
Must contain all variables needed to reconstruct the geometries listed in these specifications.
Returns
-------
da: xr.DataArray
A 1D DataArray of shapely objects.
It has the same dimension as the ``node_count`` or the coordinates variables, or
``features`` if those were not present in ``ds``.
References
----------
Please refer to the CF conventions document: http://cfconventions.org/Data/cf-conventions/cf-conventions-1.8/cf-conventions.html#geometries
"""
if container not in ds._variables:
raise ValueError(
f"{container!r} is not the name of a variable in the provided Dataset."
)
if not (geom_type := ds[container].attrs.get("geometry_type", None)):
raise ValueError(
f"{container!r} is not the name of a valid geometry variable. "
"It does not have a `geometry_type` attribute."
)
# Extract all necessary geometry variables
subds = ds.cf[[container]]
if geom_type == "point":
geometries = cf_to_points(subds)
elif geom_type == "line":
geometries = cf_to_lines(subds)
elif geom_type == "polygon":
geometries = cf_to_polygons(subds)
else:
raise ValueError(
f"Valid CF geometry types are 'point', 'line' and 'polygon'. Got {geom_type}"
)
if gm := ds[container].attrs.get("grid_mapping"):
geometries.attrs["grid_mapping"] = gm
return geometries.rename("geometry")
def points_to_cf(pts: xr.DataArray | Sequence, *, names: GeometryNames | None = None):
"""Get a list of points (shapely.geometry.[Multi]Point) and return a CF-compliant geometry dataset.
Parameters
----------
pts : sequence of shapely.geometry.Point or MultiPoint
The sequence of [multi]points to translate to a CF dataset.
Returns
-------
xr.Dataset
A Dataset with variables 'x', 'y', 'crd_x', 'crd_y', 'node_count' and 'geometry_container'.
The coordinates of MultiPoint instances are their first point.
"""
from shapely.geometry import MultiPoint
if isinstance(pts, xr.DataArray):
# TODO: Fix this hardcoding
if pts.ndim != 1:
raise ValueError("Only 1D DataArrays are supported.")
dim = pts.dims[0]
coord = pts[dim] if dim in pts.coords else None
pts_ = pts.values.tolist()
else:
dim = FEATURES_DIM_NAME
coord = None
pts_ = pts
x, y, node_count, crdX, crdY = [], [], [], [], []
for pt in pts_:
if isinstance(pt, MultiPoint):
xy = np.concatenate([p.coords for p in pt.geoms])
else:
xy = np.atleast_2d(pt.coords)
x.extend(xy[:, 0])
y.extend(xy[:, 1])
node_count.append(xy.shape[0])
crdX.append(xy[0, 0])
crdY.append(xy[0, 1])
if names is None:
names = GeometryNames()
ds = xr.Dataset(
data_vars={
names.node_count: xr.DataArray(node_count, dims=(dim,)),
names.container_name: xr.DataArray(
data=np.nan,
attrs={"geometry_type": "point", **names.geometry_container_attrs},
),
},
coords=names.coords(x=x, y=y, crdX=crdX, crdY=crdY, dim=dim),
)
if coord is not None:
ds = ds.assign_coords({dim: coord})
# Special case when we have no MultiPoints
if (ds[names.node_count] == 1).data.all():
ds = ds.drop_vars(names.node_count)
del ds[names.container_name].attrs["node_count"]
return ds
def cf_to_points(ds: xr.Dataset):
"""Convert point geometries stored in a CF-compliant way to shapely points stored in a single variable.
Parameters
----------
ds : xr.Dataset
A dataset with CF-compliant point geometries.
Must have a *single* "geometry container" variable with at least a 'node_coordinates' attribute.
Must also have the two 1D variables listed by this attribute.
Returns
-------
geometry : xr.DataArray
A 1D array of shapely.geometry.[Multi]Point objects.
It has the same dimension as the ``node_count`` or the coordinates variables, or
``'features'`` if those were not present in ``ds``.
"""
from shapely.geometry import MultiPoint, Point
container_name = _assert_single_geometry_container(ds)
# Shorthand for convenience
geo = ds[container_name].attrs
# The features dimension name, defaults to the one of 'node_count' or the dimension of the coordinates, if present.
feat_dim = None
if "coordinates" in geo and feat_dim is None:
xcoord_name, _ = geo["coordinates"].split(" ")
(feat_dim,) = ds[xcoord_name].dims
x_name, y_name = ds[container_name].attrs["node_coordinates"].split(" ")
xy = np.stack([ds[x_name].values, ds[y_name].values], axis=-1)
node_count_name = ds[container_name].attrs.get("node_count")
if node_count_name is None:
# No node_count means all geometries are single points (node_count = 1)
# And if we had no coordinates, then the dimension defaults to FEATURES_DIM_NAME
feat_dim = feat_dim or FEATURES_DIM_NAME
node_count = xr.DataArray([1] * xy.shape[0], dims=(feat_dim,))
if feat_dim in ds.coords:
node_count = node_count.assign_coords({feat_dim: ds[feat_dim]})
else:
node_count = ds[node_count_name]
j = 0 # The index of the first node.
geoms = np.empty(node_count.shape, dtype=object)
# i is the feature index, n its number of nodes
for i, n in enumerate(node_count.values):
if n == 1:
geoms[i] = Point(xy[j, :])
else:
geoms[i] = MultiPoint(xy[j : j + n, :])
j += n
da = xr.DataArray(geoms, dims=node_count.dims, coords=node_count.coords)
if node_count_name:
del da[node_count_name]
return da
def lines_to_cf(lines: xr.DataArray | Sequence, *, names: GeometryNames | None = None):
"""Convert an iterable of lines (shapely.geometry.[Multi]Line) into a CF-compliant geometry dataset.
Parameters
----------
lines : sequence of shapely.geometry.Line or MultiLine
The sequence of [multi]lines to translate to a CF dataset.
Returns
-------
xr.Dataset
A Dataset with variables 'x', 'y', 'crd_x', 'crd_y', 'node_count' and 'geometry_container'
and optionally 'part_node_count'.
"""
from shapely import to_ragged_array
if isinstance(lines, xr.DataArray):
dim = lines.dims[0]
coord = lines[dim] if dim in lines.coords else None
lines_ = lines.values
else:
dim = "index"
coord = None
lines_ = np.array(lines)
if names is None:
names = GeometryNames()
_, arr, offsets = to_ragged_array(lines_)
x = arr[:, 0]
y = arr[:, 1]
part_node_count = np.diff(offsets[0])
if len(offsets) == 1:
indices = offsets[0]
node_count = part_node_count
else:
indices = np.take(offsets[0], offsets[1])
node_count = np.diff(indices)
geom_coords = arr.take(indices[:-1], 0)
crdX = geom_coords[:, 0]
crdY = geom_coords[:, 1]
ds = xr.Dataset(
data_vars={
names.node_count: xr.DataArray(node_count, dims=(dim,)),
names.container_name: xr.DataArray(
data=np.nan,
attrs={"geometry_type": "line", **names.geometry_container_attrs},
),
},
coords=names.coords(x=x, y=y, crdX=crdX, crdY=crdY, dim=dim),
)
if coord is not None:
ds = ds.assign_coords({dim: coord})
# Special case when we have no MultiLines
if len(part_node_count) != len(node_count):
ds[names.part_node_count] = xr.DataArray(part_node_count, dims=names.part_dim)
ds[names.container_name].attrs["part_node_count"] = names.part_node_count
return ds
def cf_to_lines(ds: xr.Dataset):
"""Convert line geometries stored in a CF-compliant way to shapely lines stored in a single variable.
Parameters
----------
ds : xr.Dataset
A dataset with CF-compliant line geometries.
Must have a "geometry_container" variable with at least a 'node_coordinates' attribute.
Must also have the two 1D variables listed by this attribute.
Returns
-------
geometry : xr.DataArray
A 1D array of shapely.geometry.[Multi]Line objects.
It has the same dimension as the ``part_node_count`` or the coordinates variables, or
``'features'`` if those were not present in ``ds``.
"""
from shapely import GeometryType, from_ragged_array
container_name = _assert_single_geometry_container(ds)
# Shorthand for convenience
geo = ds[container_name].attrs
# The features dimension name, defaults to the one of 'node_count'
# or the dimension of the coordinates, if present.
feat_dim = None
if "coordinates" in geo:
xcoord_name, _ = geo["coordinates"].split(" ")
(feat_dim,) = ds[xcoord_name].dims
x_name, y_name = geo["node_coordinates"].split(" ")
xy = np.stack([ds[x_name].values, ds[y_name].values], axis=-1)
node_count_name = geo.get("node_count")
part_node_count_name = geo.get("part_node_count", node_count_name)
if node_count_name is None:
raise ValueError("'node_count' must be provided for line geometries")
else:
node_count = ds[node_count_name]
feat_dim = feat_dim or "index"
if feat_dim in ds.coords:
node_count = node_count.assign_coords({feat_dim: ds[feat_dim]})
# first get geometries for all the parts
part_node_count = ds[part_node_count_name]
offset1 = np.insert(np.cumsum(part_node_count.values), 0, 0)
lines = from_ragged_array(GeometryType.LINESTRING, xy, offsets=(offset1,))
# get index of offset2 values that are edges for part_node_count
offset2 = np.nonzero(np.isin(offset1, np.insert(np.cumsum(node_count), 0, 0)))[0]
multilines = from_ragged_array(
GeometryType.MULTILINESTRING, xy, offsets=(offset1, offset2)
)
# get items from lines or multilines depending on number of parts
geoms = np.where(np.diff(offset2) == 1, lines[offset2[:-1]], multilines)
return xr.DataArray(
geoms, dims=node_count.dims, coords=node_count.coords
).drop_vars(node_count_name)
def polygons_to_cf(
polygons: xr.DataArray | Sequence, *, names: GeometryNames | None = None
):
"""Convert an iterable of polygons (shapely.geometry.[Multi]Polygon) into a CF-compliant geometry dataset.
Parameters
----------
polygons : sequence of shapely.geometry.Polygon or MultiPolygon
The sequence of [multi]polygons to translate to a CF dataset.
names: GeometryNames, optional
Structure that helps manipulate geometry attrs.
Returns
-------
xr.Dataset
A Dataset with variables 'x', 'y', 'crd_x', 'crd_y', 'node_count' and 'geometry_container'
and optionally 'part_node_count'.
"""
from shapely import to_ragged_array
if isinstance(polygons, xr.DataArray):
dim = polygons.dims[0]
coord = polygons[dim] if dim in polygons.coords else None
polygons_ = polygons.values
else:
dim = "index"
coord = None
polygons_ = np.array(polygons)
if names is None:
names = GeometryNames()
_, arr, offsets = to_ragged_array(polygons_)
x = arr[:, 0]
y = arr[:, 1]
part_node_count = np.diff(offsets[0])
if len(offsets) == 1:
indices = offsets[0]
node_count = part_node_count
elif len(offsets) >= 2:
indices = np.take(offsets[0], offsets[1])
interior_ring = np.isin(offsets[0], indices, invert=True)[:-1]
if len(offsets) == 3:
indices = np.take(indices, offsets[2])
node_count = np.diff(indices)
geom_coords = arr.take(indices[:-1], 0)
crdX = geom_coords[:, 0]
crdY = geom_coords[:, 1]
data_vars = {names.node_count: (dim, node_count)}
geometry_attrs = names.geometry_container_attrs
# Special case when we have no MultiPolygons and no holes
if len(part_node_count) != len(node_count):
data_vars[names.part_node_count] = (names.part_dim, part_node_count)
geometry_attrs["part_node_count"] = names.part_node_count
# Special case when we have no holes
if interior_ring.any():
data_vars[names.interior_ring] = (names.part_dim, interior_ring)
geometry_attrs["interior_ring"] = names.interior_ring
data_vars[names.container_name] = ( # type: ignore[assignment]
(),
np.nan,
{"geometry_type": "polygon", **geometry_attrs},
)
ds = xr.Dataset(
data_vars=data_vars,
coords=names.coords(x=x, y=y, crdX=crdX, crdY=crdY, dim=dim),
)
if coord is not None:
ds = ds.assign_coords({dim: coord})
return ds
def cf_to_polygons(ds: xr.Dataset):
"""Convert polygon geometries stored in a CF-compliant way to shapely polygons stored in a single variable.
Parameters
----------
ds : xr.Dataset
A dataset with CF-compliant polygon geometries.
Must have a "geometry_container" variable with at least a 'node_coordinates' attribute.
Must also have the two 1D variables listed by this attribute.
Returns
-------
geometry : xr.DataArray
A 1D array of shapely.geometry.[Multi]Polygon objects.
It has the same dimension as the ``part_node_count`` or the coordinates variables, or
``'features'`` if those were not present in ``ds``.
"""
from shapely import GeometryType, from_ragged_array
container_name = _assert_single_geometry_container(ds)
# Shorthand for convenience
geo = ds[container_name].attrs
# The features dimension name, defaults to the one of 'part_node_count'
# or the dimension of the coordinates, if present.
feat_dim = None
if "coordinates" in geo:
xcoord_name, _ = geo["coordinates"].split(" ")
(feat_dim,) = ds[xcoord_name].dims
x_name, y_name = geo["node_coordinates"].split(" ")
xy = np.stack([ds[x_name].values, ds[y_name].values], axis=-1)
node_count_name = geo.get("node_count")
part_node_count_name = geo.get("part_node_count", node_count_name)
interior_ring_name = geo.get("interior_ring")
if node_count_name is None:
raise ValueError("'node_count' must be provided for polygon geometries")
else:
node_count = ds[node_count_name]
feat_dim = feat_dim or "index"
if feat_dim in ds.coords:
node_count = node_count.assign_coords({feat_dim: ds[feat_dim]})
# first get geometries for all the rings
part_node_count = ds[part_node_count_name]
offset1 = np.insert(np.cumsum(part_node_count.values), 0, 0)
if interior_ring_name is None:
offset2 = np.array(list(range(len(offset1))))
else:
interior_ring = ds[interior_ring_name]
if not interior_ring[0] == 0:
raise ValueError("coordinate array must start with an exterior ring")
offset2 = np.append(np.where(interior_ring == 0)[0], [len(part_node_count)])
polygons = from_ragged_array(GeometryType.POLYGON, xy, offsets=(offset1, offset2))
# get index of offset2 values that are edges for node_count
offset3 = np.nonzero(
np.isin(
offset2,
np.nonzero(np.isin(offset1, np.insert(np.cumsum(node_count), 0, 0)))[0],
)
)[0]
multipolygons = from_ragged_array(
GeometryType.MULTIPOLYGON, xy, offsets=(offset1, offset2, offset3)
)
# get items from polygons or multipolygons depending on number of parts
geoms = np.where(np.diff(offset3) == 1, polygons[offset3[:-1]], multipolygons)
return xr.DataArray(
geoms, dims=node_count.dims, coords=node_count.coords
).drop_vars(node_count_name)