-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathzips.py
99 lines (82 loc) · 2.73 KB
/
zips.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from typing import Optional
from utz import cached_property, DefaultDict
from utz.ym import YM
from ctbk.has_root_cli import HasRootCLI, yms_arg
from ctbk.task import Task
from ctbk.util.region import REGIONS, Region, get_regions, region
DIR = 'tripdata'
class TripdataZip(Task):
DIR = DIR
NAMES = ['zip']
def __init__(
self,
ym: YM,
region: Region,
roots: Optional[DefaultDict[str]] = None,
):
if region not in REGIONS:
raise ValueError(f"Unrecognized region: {region}")
self.ym = ym
self.yym = ym.y if region == 'NYC' and ym.y < 2024 else ym
self.region = region
Task.__init__(self, roots=roots)
@cached_property
def url(self):
ymi = int(self.yym)
region = self.region
return f'{self.dir}/{"JC-" if region == "JC" else ""}{ymi}-citibike-tripdata.zip'
class TripdataZips(HasRootCLI):
DIR = DIR
CHILD_CLS = TripdataZip
def __init__(
self,
yms: list[YM],
regions: Optional[list[str]] = None,
roots: Optional[DefaultDict[str]] = None,
**kwargs,
):
self.yms = yms
self.regions = regions or REGIONS
# "month" to "region" to "url" map
m2r2u = [
(
ym,
{
region: TripdataZip(ym=ym, region=region, roots=roots)
for region in self.regions
if region in get_regions(ym)
}
)
for ym in self.yms
]
# Default "end": current or previous calendar month
end = max(yms) + 1
end1, last1 = m2r2u[-1]
missing1 = [ region for region, month in last1.items() if not month.exists() ]
if missing1:
if len(m2r2u) < 2:
raise RuntimeError(f"Missing regions from {end1} ({', ' .join(missing1)})")
else:
end2, last2 = m2r2u[-2]
missing2 = [ region for region, month in last2.items() if not month.exists() ]
if missing2:
raise RuntimeError(f"Missing regions from {end1} ({', ' .join(missing1)}) and {end2} ({', '.join(missing2)})")
end = end2 + 1
m2r2u = dict(m2r2u[:-1])
else:
m2r2u = dict(m2r2u)
self.m2r2u: dict[YM, dict[Region, TripdataZip]] = m2r2u
self.end: YM = end
super().__init__(**kwargs, roots=roots)
@cached_property
def children(self) -> list[TripdataZip]:
return [
u
for r2u in self.m2r2u.values()
for u in r2u.values()
]
TripdataZips.cli(
help="Read .csv.zip files from s3://tripdata",
cmd_decos=[yms_arg, region],
create=False,
)