Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: support json transaction files for debugging #483

Open
wants to merge 3 commits into
base: 2.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion deltacat/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations


from deltacat.utils.common import env_string, env_bool

# Environment variables
Expand Down Expand Up @@ -37,6 +38,15 @@
False,
)

# CLI Args
METAFILE_FORMAT_KEY = "metafile-format"
METAFILE_FORMAT = env_string(METAFILE_FORMAT_KEY, "msgpack")
METAFILE_EXT = {
"json": ".json",
"msgpack": ".mpk",
}[METAFILE_FORMAT]


# Byte Units
BYTES_PER_KIBIBYTE = 2**10
BYTES_PER_MEBIBYTE = 2**20
Expand Down Expand Up @@ -72,7 +82,6 @@

# Metastore Constants
REVISION_DIR_NAME: str = "rev"
METAFILE_EXT = ".mpk"
TXN_DIR_NAME: str = "txn"
RUNNING_TXN_DIR_NAME: str = "running"
FAILED_TXN_DIR_NAME: str = "failed"
Expand Down
46 changes: 43 additions & 3 deletions deltacat/storage/model/metafile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@

from typing import Optional, Tuple, List

import base64
import json
import msgpack
import pyarrow.fs
import posixpath
import uuid
import deltacat

from deltacat.constants import (
METAFILE_FORMAT,
REVISION_DIR_NAME,
METAFILE_EXT,
TXN_DIR_NAME,
Expand Down Expand Up @@ -496,18 +499,37 @@ def read(
cls,
path: str,
filesystem: Optional[pyarrow.fs.FileSystem] = None,
format: Optional[str] = METAFILE_FORMAT,
) -> Metafile:
"""
Read a metadata file and return the deserialized object.
:param path: Metadata file path to read.
:param filesystem: File system to use for reading the metadata file.
:param format: Format to use for deserializing the metadata file.
:return: Deserialized object from the metadata file.
"""
if format not in {"json", "msgpack"}:
raise ValueError(
f"Unsupported format '{format}'. Must be 'json' or 'msgpack'."
)

if not filesystem:
path, filesystem = resolve_path_and_filesystem(path, filesystem)
with filesystem.open_input_stream(path) as file:
binary = file.readall()
obj = cls(**msgpack.loads(binary)).from_serializable(path, filesystem)
reader = {
"json": lambda b: json.loads(
b.decode("utf-8"),
object_hook=lambda obj: {
k: base64.b64decode(v)
if isinstance(v, str) and v.startswith("b64:")
else v
for k, v in obj.items()
},
),
"msgpack": msgpack.loads,
}[format]
obj = cls(**reader(binary)).from_serializable(path, filesystem)
return obj

def write_txn(
Expand Down Expand Up @@ -550,21 +572,39 @@ def write(
self,
path: str,
filesystem: Optional[pyarrow.fs.FileSystem] = None,
format: Optional[str] = METAFILE_FORMAT,
) -> None:
"""
Serialize and write this object to a metadata file.
:param path: Metadata file path to write to.
:param filesystem: File system to use for writing the metadata file. If
not given, a default filesystem will be automatically selected based on
the catalog root path.
param: format: Format to use for serializing the metadata file.
"""
if format not in {"json", "msgpack"}:
raise ValueError(
f"Unsupported format '{format}'. Must be 'json' or 'msgpack'."
)

if not filesystem:
path, filesystem = resolve_path_and_filesystem(path, filesystem)
revision_dir_path = posixpath.dirname(path)
filesystem.create_dir(revision_dir_path, recursive=True)

writer = {
"json": lambda data: json.dumps(
data,
indent=4,
default=lambda b: base64.b64encode(b).decode("utf-8")
if isinstance(b, bytes)
else b,
).encode("utf-8"),
"msgpack": msgpack.dumps,
}[format]

with filesystem.open_output_stream(path) as file:
packed = msgpack.dumps(self.to_serializable())
file.write(packed)
file.write(writer(self.to_serializable()))

def equivalent_to(self, other: Metafile) -> bool:
"""
Expand Down
29 changes: 25 additions & 4 deletions deltacat/utils/common.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,46 @@
import argparse
import hashlib
import os
import time
import sys
from typing import Any, Dict


def env_bool(key: str, default: bool) -> int:
def get_cli_arg(key: str) -> Any:
parser = argparse.ArgumentParser()

# allow hyphens in keys, normalize as underscores
normalized_key = key.replace("-", "_")

parser.add_argument(f"--{key}", metavar=normalized_key, type=str)
args, _ = parser.parse_known_args(sys.argv[1:]) # Allow unknown args
return getattr(args, normalized_key, None)


def env_bool(key: str, default: bool) -> bool:
cli_value = get_cli_arg(key)
if cli_value is not None:
return bool(cli_value)

if key in os.environ:
return bool(os.environ[key])

return default


def env_integer(key: str, default: int) -> int:
cli_value = get_cli_arg(key)
if cli_value is not None:
return int(cli_value)

if key in os.environ:
return int(os.environ[key])

return default


def env_string(key: str, default: str) -> str:
if key in os.environ:
return os.environ[key]
return default
return get_cli_arg(key) or os.getenv(key, default)


def current_time_ms() -> int:
Expand Down