-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmonth_agg_table.py
97 lines (79 loc) · 2.4 KB
/
month_agg_table.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from abc import ABC
from os.path import exists
import pandas as pd
from click import argument, option
from utz import err
from utz.ym import Monthy, YM
from ctbk.cli.base import ctbk
from ctbk.has_root_cli import yms_opt
from ctbk.util.constants import DEFAULT_ROOT
from ctbk.util.df import DataFrame
class MonthAggTable(ABC):
ROOT = DEFAULT_ROOT
NAME = None
SRC = None
OUT = None
def __init__(
self,
yms: list[YM],
root: str,
overwrite: bool = False,
out: str | None = None,
):
self.root = root or self.ROOT
if not self.SRC:
raise RuntimeError(f"Set {self.__class__.__name__}.SRC")
self.src = self.SRC
self.dir = f'{self.root}/{self.src}'
self.overwrite = overwrite
self.out = out or self.OUT
self.yms = yms
def url(self, ym: Monthy) -> str:
return f'{self.dir}/{ym}.parquet'
@staticmethod
def read(url: str):
return pd.read_parquet(url)
def load(self, ym: Monthy) -> DataFrame:
url = self.url(ym)
try:
df = pd.read_parquet(url)
except FileNotFoundError:
raise FileNotFoundError(url)
return df
@property
def dfs(self) -> list[DataFrame]:
return [ self.load(ym) for ym in self.yms ]
def mapped_dfs(self) -> list[DataFrame]:
return [ self.map(df) for df in self.dfs ]
def map(self, df):
return df
def reduce(self, mapped_dfs) -> DataFrame:
return pd.concat(mapped_dfs)
def write(self, df: DataFrame):
self.write_df(df)
def write_df(self, df: pd.DataFrame):
err(f"Writing {self.out}")
df.to_json(self.out, orient='records')
def run(self):
out = self.out
if exists(out):
if self.overwrite:
err(f'Overwriting {out}')
else:
err(f'{out} exists')
return
else:
err(f'Writing {out}')
mapped_dfs = self.mapped_dfs()
df = self.reduce(mapped_dfs)
self.write(df)
@classmethod
def init_cli(cls, help: str | None = None):
@ctbk.command(cls.NAME, help=help)
@yms_opt
@option('-r', '--root')
@option('-f', '--overwrite', is_flag=True)
@argument('out', required=False)
def _main(*args, **kwargs):
task = cls(*args, **kwargs)
task.run()