-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathhas_root_cli.py
133 lines (110 loc) · 3.47 KB
/
has_root_cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
from abc import ABC
from functools import wraps
from typing import Optional
import click
import utz
from click import option, pass_context, Group, argument
from utz import decos, YM, DefaultDict
from utz.case import dash_case
from ctbk.cli.base import ctbk, StableCommandOrder
from ctbk.has_root import HasRoot
from ctbk.task import Task
from ctbk.tasks import Tasks
from ctbk.util import GENESIS
from ctbk.util.ym import parse_ym_ranges_str
_default_end = None
def default_end(roots: Optional[DefaultDict[str]]):
"""Infer the last available month of data by checking which Tripdata Zips are present."""
global _default_end
if not _default_end:
from ctbk.zips import TripdataZips
yms = list(GENESIS.until(YM()))
zips = TripdataZips(yms=yms, roots=roots)
_default_end = zips.end
return _default_end
def yms_param(deco):
def wrapper(fn):
@deco
@wraps(fn)
def _fn(*args, ym_ranges_str: str | None, **kwargs):
yms = parse_ym_ranges_str(
ym_ranges_str,
default_start=GENESIS,
default_end=lambda: default_end(roots=kwargs.get('roots') or kwargs.get('root')),
)
return fn(*args, yms=yms, **kwargs)
return _fn
return wrapper
yms_opt = yms_param(option('-d', '--dates', 'ym_ranges_str', help="Start and end dates in the format 'YYYY-MM'"))
yms_arg = yms_param(argument('ym_ranges_str', required=False))
class HasRootCLI(Tasks, HasRoot, ABC):
ROOT_DECOS = []
CHILD_CLS: type[Task] = None
@classmethod
def names(cls):
return cls.CHILD_CLS.NAMES
@classmethod
def name(cls):
return cls.names()[0]
@classmethod
def init_cli(
cls,
group: click.Group,
cmd_decos: list = None,
create_decos: list = None,
group_cls: type[click.Group] = None,
urls=True,
create=True,
):
cmd_decos = cmd_decos or []
def cmd(help):
return decos(
group.command(cls=group_cls, help=help),
pass_context,
*cmd_decos
)
if urls:
@cmd(help="Print URLs for selected datasets")
def urls(ctx, **kwargs):
o = ctx.obj
tasks = cls(**o, **kwargs)
children = tasks.children
for month in children:
print(month.url)
if create:
@cmd(help="Create selected datasets")
@decos(create_decos or [])
def create(ctx, **kwargs):
o = ctx.obj
tasks = cls(**o, **kwargs)
tasks.create(read=None)
@classmethod
def cli(
cls,
help: str,
decos: Optional[list] = None,
cmd_decos: Optional[list] = None,
create_decos: Optional[list] = None,
**kwargs
) -> Group:
command_cls = cls.command_cls()
decos = decos or []
@utz.decos(
ctbk.group(dash_case(cls.name()), cls=command_cls, help=help),
pass_context,
*decos
)
def group(ctx, **kwargs):
ctx.obj = dict(**ctx.obj, **kwargs)
cls.init_cli(
group,
cmd_decos=cmd_decos,
create_decos=create_decos,
**kwargs,
)
return group
@classmethod
def command_cls(cls):
class Command(StableCommandOrder):
ALIASES = cls.names()
return Command