From 1a3ed580687b75c0e18897d1d3d46d15094e40d7 Mon Sep 17 00:00:00 2001 From: Anshuman Komawar Date: Mon, 3 Feb 2025 10:07:48 -0800 Subject: [PATCH 1/3] chore: support json transaction files for debugging --- deltacat/constants.py | 6 +++++- deltacat/storage/model/metafile.py | 22 +++++++++++++++++++--- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/deltacat/constants.py b/deltacat/constants.py index 8959b315..a015d517 100644 --- a/deltacat/constants.py +++ b/deltacat/constants.py @@ -1,5 +1,6 @@ from __future__ import annotations + from deltacat.utils.common import env_string, env_bool # Environment variables @@ -37,6 +38,9 @@ False, ) +DELTACAT_METAFILE_FORMAT = env_string("DELTACAT_METAFILE_FORMAT", "msgpack") + + # Byte Units BYTES_PER_KIBIBYTE = 2**10 BYTES_PER_MEBIBYTE = 2**20 @@ -72,7 +76,7 @@ # Metastore Constants REVISION_DIR_NAME: str = "rev" -METAFILE_EXT = ".mpk" +METAFILE_EXT = ".json" if DELTACAT_METAFILE_FORMAT == "json" else ".mpk" TXN_DIR_NAME: str = "txn" RUNNING_TXN_DIR_NAME: str = "running" FAILED_TXN_DIR_NAME: str = "failed" diff --git a/deltacat/storage/model/metafile.py b/deltacat/storage/model/metafile.py index 7625ebc7..145c648e 100644 --- a/deltacat/storage/model/metafile.py +++ b/deltacat/storage/model/metafile.py @@ -5,6 +5,7 @@ from typing import Optional, Tuple, List +import json import msgpack import pyarrow.fs import posixpath @@ -21,12 +22,15 @@ from deltacat.storage.model.list_result import ListResult from deltacat.storage.model.locator import Locator from deltacat.storage.model.types import TransactionOperationType +from deltacat.utils.common import env_string from deltacat.utils.filesystem import ( resolve_path_and_filesystem, list_directory, get_file_info, ) +DELTACAT_METAFILE_FORMAT = env_string("DELTACAT_METAFILE_FORMAT", "msgpack") + class MetafileRevisionInfo(dict): """ @@ -496,18 +500,24 @@ def read( cls, path: str, filesystem: Optional[pyarrow.fs.FileSystem] = None, + format: str = DELTACAT_METAFILE_FORMAT, ) -> Metafile: """ Read a metadata file and return the deserialized object. :param path: Metadata file path to read. :param filesystem: File system to use for reading the metadata file. + :param format: Format to use for deserializing the metadata file. :return: Deserialized object from the metadata file. """ if not filesystem: path, filesystem = resolve_path_and_filesystem(path, filesystem) with filesystem.open_input_stream(path) as file: binary = file.readall() - obj = cls(**msgpack.loads(binary)).from_serializable(path, filesystem) + loader = { + "json": lambda b: json.loads(b.decode("utf-8")), + "msgpack": msgpack.loads, + }[format] + obj = cls(**loader(binary)).from_serializable(path, filesystem) return obj def write_txn( @@ -550,6 +560,7 @@ def write( self, path: str, filesystem: Optional[pyarrow.fs.FileSystem] = None, + format: str = DELTACAT_METAFILE_FORMAT, ) -> None: """ Serialize and write this object to a metadata file. @@ -557,14 +568,19 @@ def write( :param filesystem: File system to use for writing the metadata file. If not given, a default filesystem will be automatically selected based on the catalog root path. + param: format: Format to use for serializing the metadata file. """ if not filesystem: path, filesystem = resolve_path_and_filesystem(path, filesystem) revision_dir_path = posixpath.dirname(path) filesystem.create_dir(revision_dir_path, recursive=True) with filesystem.open_output_stream(path) as file: - packed = msgpack.dumps(self.to_serializable()) - file.write(packed) + file.write( + { + "json": lambda data: json.dumps(data, indent=4).encode("utf-8"), + "msgpack": msgpack.dumps, + }[format](self.to_serializable()) + ) def equivalent_to(self, other: Metafile) -> bool: """ From 346b197b846608f074b22f476025eabfc09a2cca Mon Sep 17 00:00:00 2001 From: Anshuman Komawar Date: Tue, 4 Feb 2025 14:56:56 -0800 Subject: [PATCH 2/3] chore: support optional file format for metafiles --- deltacat/constants.py | 10 ++++-- deltacat/storage/model/metafile.py | 52 ++++++++++++++++++++++-------- deltacat/utils/common.py | 29 ++++++++++++++--- 3 files changed, 71 insertions(+), 20 deletions(-) diff --git a/deltacat/constants.py b/deltacat/constants.py index a015d517..a3c2c32f 100644 --- a/deltacat/constants.py +++ b/deltacat/constants.py @@ -38,7 +38,14 @@ False, ) -DELTACAT_METAFILE_FORMAT = env_string("DELTACAT_METAFILE_FORMAT", "msgpack") +# CLI Args +METAFILE_FORMAT_KEY = "metafile-format" +METAFILE_FORMAT = env_string(METAFILE_FORMAT_KEY, "msgpack") +print(f"METAFILE_FORMAT: {METAFILE_FORMAT}") +METAFILE_EXT = { + "json": ".json", + "msgpack": ".mpk", +}[METAFILE_FORMAT] # Byte Units @@ -76,7 +83,6 @@ # Metastore Constants REVISION_DIR_NAME: str = "rev" -METAFILE_EXT = ".json" if DELTACAT_METAFILE_FORMAT == "json" else ".mpk" TXN_DIR_NAME: str = "txn" RUNNING_TXN_DIR_NAME: str = "running" FAILED_TXN_DIR_NAME: str = "failed" diff --git a/deltacat/storage/model/metafile.py b/deltacat/storage/model/metafile.py index 145c648e..a2ddad0f 100644 --- a/deltacat/storage/model/metafile.py +++ b/deltacat/storage/model/metafile.py @@ -5,6 +5,7 @@ from typing import Optional, Tuple, List +import base64 import json import msgpack import pyarrow.fs @@ -13,6 +14,7 @@ import deltacat from deltacat.constants import ( + METAFILE_FORMAT, REVISION_DIR_NAME, METAFILE_EXT, TXN_DIR_NAME, @@ -22,15 +24,12 @@ from deltacat.storage.model.list_result import ListResult from deltacat.storage.model.locator import Locator from deltacat.storage.model.types import TransactionOperationType -from deltacat.utils.common import env_string from deltacat.utils.filesystem import ( resolve_path_and_filesystem, list_directory, get_file_info, ) -DELTACAT_METAFILE_FORMAT = env_string("DELTACAT_METAFILE_FORMAT", "msgpack") - class MetafileRevisionInfo(dict): """ @@ -500,7 +499,7 @@ def read( cls, path: str, filesystem: Optional[pyarrow.fs.FileSystem] = None, - format: str = DELTACAT_METAFILE_FORMAT, + format: Optional[str] = METAFILE_FORMAT, ) -> Metafile: """ Read a metadata file and return the deserialized object. @@ -509,15 +508,28 @@ def read( :param format: Format to use for deserializing the metadata file. :return: Deserialized object from the metadata file. """ + if format not in {"json", "msgpack"}: + raise ValueError( + f"Unsupported format '{format}'. Must be 'json' or 'msgpack'." + ) + if not filesystem: path, filesystem = resolve_path_and_filesystem(path, filesystem) with filesystem.open_input_stream(path) as file: binary = file.readall() - loader = { - "json": lambda b: json.loads(b.decode("utf-8")), + reader = { + "json": lambda b: json.loads( + b.decode("utf-8"), + object_hook=lambda obj: { + k: base64.b64decode(v) + if isinstance(v, str) and v.startswith("b64:") + else v + for k, v in obj.items() + }, + ), "msgpack": msgpack.loads, }[format] - obj = cls(**loader(binary)).from_serializable(path, filesystem) + obj = cls(**reader(binary)).from_serializable(path, filesystem) return obj def write_txn( @@ -560,7 +572,7 @@ def write( self, path: str, filesystem: Optional[pyarrow.fs.FileSystem] = None, - format: str = DELTACAT_METAFILE_FORMAT, + format: Optional[str] = METAFILE_FORMAT, ) -> None: """ Serialize and write this object to a metadata file. @@ -570,17 +582,29 @@ def write( the catalog root path. param: format: Format to use for serializing the metadata file. """ + if format not in {"json", "msgpack"}: + raise ValueError( + f"Unsupported format '{format}'. Must be 'json' or 'msgpack'." + ) + if not filesystem: path, filesystem = resolve_path_and_filesystem(path, filesystem) revision_dir_path = posixpath.dirname(path) filesystem.create_dir(revision_dir_path, recursive=True) + + writer = { + "json": lambda data: json.dumps( + data, + indent=4, + default=lambda b: base64.b64encode(b).decode("utf-8") + if isinstance(b, bytes) + else b, + ).encode("utf-8"), + "msgpack": msgpack.dumps, + }[format] + with filesystem.open_output_stream(path) as file: - file.write( - { - "json": lambda data: json.dumps(data, indent=4).encode("utf-8"), - "msgpack": msgpack.dumps, - }[format](self.to_serializable()) - ) + file.write(writer(self.to_serializable())) def equivalent_to(self, other: Metafile) -> bool: """ diff --git a/deltacat/utils/common.py b/deltacat/utils/common.py index 3a79278c..e4ad1003 100644 --- a/deltacat/utils/common.py +++ b/deltacat/utils/common.py @@ -1,25 +1,46 @@ +import argparse import hashlib import os import time +import sys from typing import Any, Dict -def env_bool(key: str, default: bool) -> int: +def get_cli_arg(key: str) -> Any: + parser = argparse.ArgumentParser() + + # allow hyphens in keys, normalize as underscores + normalized_key = key.replace("-", "_") + + parser.add_argument(f"--{key}", metavar=normalized_key, type=str) + args, _ = parser.parse_known_args(sys.argv[1:]) # Allow unknown args + return getattr(args, normalized_key, None) + + +def env_bool(key: str, default: bool) -> bool: + cli_value = get_cli_arg(key) + if cli_value is not None: + return bool(cli_value) + if key in os.environ: return bool(os.environ[key]) + return default def env_integer(key: str, default: int) -> int: + cli_value = get_cli_arg(key) + if cli_value is not None: + return int(cli_value) + if key in os.environ: return int(os.environ[key]) + return default def env_string(key: str, default: str) -> str: - if key in os.environ: - return os.environ[key] - return default + return get_cli_arg(key) or os.getenv(key, default) def current_time_ms() -> int: From 9550e35d9c1aed31225bfc9510d328747fd37d26 Mon Sep 17 00:00:00 2001 From: Anshuman Komawar Date: Wed, 5 Feb 2025 13:34:07 -0800 Subject: [PATCH 3/3] refactor: remove uncessary print --- deltacat/constants.py | 1 - 1 file changed, 1 deletion(-) diff --git a/deltacat/constants.py b/deltacat/constants.py index a3c2c32f..6078aff0 100644 --- a/deltacat/constants.py +++ b/deltacat/constants.py @@ -41,7 +41,6 @@ # CLI Args METAFILE_FORMAT_KEY = "metafile-format" METAFILE_FORMAT = env_string(METAFILE_FORMAT_KEY, "msgpack") -print(f"METAFILE_FORMAT: {METAFILE_FORMAT}") METAFILE_EXT = { "json": ".json", "msgpack": ".mpk",