Skip to content

Commit 0429185

Browse files
chore: support optional file format for metafiles
1 parent 1a3ed58 commit 0429185

File tree

3 files changed

+66
-20
lines changed

3 files changed

+66
-20
lines changed

deltacat/constants.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,14 @@
3838
False,
3939
)
4040

41-
DELTACAT_METAFILE_FORMAT = env_string("DELTACAT_METAFILE_FORMAT", "msgpack")
41+
# CLI Args
42+
METAFILE_FORMAT_KEY = "metafile-format"
43+
METAFILE_FORMAT = env_string(METAFILE_FORMAT_KEY, "msgpack")
44+
print(f"METAFILE_FORMAT: {METAFILE_FORMAT}")
45+
METAFILE_EXT = {
46+
"json": ".json",
47+
"msgpack": ".mpk",
48+
}[METAFILE_FORMAT]
4249

4350

4451
# Byte Units
@@ -76,7 +83,6 @@
7683

7784
# Metastore Constants
7885
REVISION_DIR_NAME: str = "rev"
79-
METAFILE_EXT = ".json" if DELTACAT_METAFILE_FORMAT == "json" else ".mpk"
8086
TXN_DIR_NAME: str = "txn"
8187
RUNNING_TXN_DIR_NAME: str = "running"
8288
FAILED_TXN_DIR_NAME: str = "failed"

deltacat/storage/model/metafile.py

+33-14
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from typing import Optional, Tuple, List
77

8+
import base64
89
import json
910
import msgpack
1011
import pyarrow.fs
@@ -13,6 +14,7 @@
1314
import deltacat
1415

1516
from deltacat.constants import (
17+
METAFILE_FORMAT,
1618
REVISION_DIR_NAME,
1719
METAFILE_EXT,
1820
TXN_DIR_NAME,
@@ -22,15 +24,12 @@
2224
from deltacat.storage.model.list_result import ListResult
2325
from deltacat.storage.model.locator import Locator
2426
from deltacat.storage.model.types import TransactionOperationType
25-
from deltacat.utils.common import env_string
2627
from deltacat.utils.filesystem import (
2728
resolve_path_and_filesystem,
2829
list_directory,
2930
get_file_info,
3031
)
3132

32-
DELTACAT_METAFILE_FORMAT = env_string("DELTACAT_METAFILE_FORMAT", "msgpack")
33-
3433

3534
class MetafileRevisionInfo(dict):
3635
"""
@@ -500,7 +499,7 @@ def read(
500499
cls,
501500
path: str,
502501
filesystem: Optional[pyarrow.fs.FileSystem] = None,
503-
format: str = DELTACAT_METAFILE_FORMAT,
502+
format: Optional[str] = METAFILE_FORMAT,
504503
) -> Metafile:
505504
"""
506505
Read a metadata file and return the deserialized object.
@@ -513,11 +512,19 @@ def read(
513512
path, filesystem = resolve_path_and_filesystem(path, filesystem)
514513
with filesystem.open_input_stream(path) as file:
515514
binary = file.readall()
516-
loader = {
517-
"json": lambda b: json.loads(b.decode("utf-8")),
515+
reader = {
516+
"json": lambda b: json.loads(
517+
b.decode("utf-8"),
518+
object_hook=lambda obj: {
519+
k: base64.b64decode(v)
520+
if isinstance(v, str) and v.startswith("b64:")
521+
else v
522+
for k, v in obj.items()
523+
},
524+
),
518525
"msgpack": msgpack.loads,
519526
}[format]
520-
obj = cls(**loader(binary)).from_serializable(path, filesystem)
527+
obj = cls(**reader(binary)).from_serializable(path, filesystem)
521528
return obj
522529

523530
def write_txn(
@@ -560,7 +567,7 @@ def write(
560567
self,
561568
path: str,
562569
filesystem: Optional[pyarrow.fs.FileSystem] = None,
563-
format: str = DELTACAT_METAFILE_FORMAT,
570+
format: Optional[str] = METAFILE_FORMAT,
564571
) -> None:
565572
"""
566573
Serialize and write this object to a metadata file.
@@ -570,17 +577,29 @@ def write(
570577
the catalog root path.
571578
param: format: Format to use for serializing the metadata file.
572579
"""
580+
if format not in {"json", "msgpack"}:
581+
raise ValueError(
582+
f"Unsupported format '{format}'. Must be 'json' or 'msgpack'."
583+
)
584+
573585
if not filesystem:
574586
path, filesystem = resolve_path_and_filesystem(path, filesystem)
575587
revision_dir_path = posixpath.dirname(path)
576588
filesystem.create_dir(revision_dir_path, recursive=True)
589+
590+
writer = {
591+
"json": lambda data: json.dumps(
592+
data,
593+
indent=4,
594+
default=lambda b: base64.b64encode(b).decode("utf-8")
595+
if isinstance(b, bytes)
596+
else b,
597+
).encode("utf-8"),
598+
"msgpack": msgpack.dumps,
599+
}[format]
600+
577601
with filesystem.open_output_stream(path) as file:
578-
file.write(
579-
{
580-
"json": lambda data: json.dumps(data, indent=4).encode("utf-8"),
581-
"msgpack": msgpack.dumps,
582-
}[format](self.to_serializable())
583-
)
602+
file.write(writer(self.to_serializable()))
584603

585604
def equivalent_to(self, other: Metafile) -> bool:
586605
"""

deltacat/utils/common.py

+25-4
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,46 @@
1+
import argparse
12
import hashlib
23
import os
34
import time
5+
import sys
46
from typing import Any, Dict
57

68

7-
def env_bool(key: str, default: bool) -> int:
9+
def get_cli_arg(key: str) -> Any:
10+
parser = argparse.ArgumentParser()
11+
12+
# allow hyphens in keys, normalize as underscores
13+
normalized_key = key.replace("-", "_")
14+
15+
parser.add_argument(f"--{key}", metavar=normalized_key, type=str)
16+
args, _ = parser.parse_known_args(sys.argv[1:]) # Allow unknown args
17+
return getattr(args, normalized_key, None)
18+
19+
20+
def env_bool(key: str, default: bool) -> bool:
21+
cli_value = get_cli_arg(key)
22+
if cli_value is not None:
23+
return bool(cli_value)
24+
825
if key in os.environ:
926
return bool(os.environ[key])
27+
1028
return default
1129

1230

1331
def env_integer(key: str, default: int) -> int:
32+
cli_value = get_cli_arg(key)
33+
if cli_value is not None:
34+
return int(cli_value)
35+
1436
if key in os.environ:
1537
return int(os.environ[key])
38+
1639
return default
1740

1841

1942
def env_string(key: str, default: str) -> str:
20-
if key in os.environ:
21-
return os.environ[key]
22-
return default
43+
return get_cli_arg(key) or os.getenv(key, default)
2344

2445

2546
def current_time_ms() -> int:

0 commit comments

Comments
 (0)