Skip to content

Commit

Permalink
Make transaction time provider thread safe and unit-agnostic.
Browse files Browse the repository at this point in the history
  • Loading branch information
pdames committed Jan 29, 2025
1 parent 0e779ef commit 9455b4d
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 63 deletions.
5 changes: 5 additions & 0 deletions deltacat/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@
SIGNED_INT64_MIN_VALUE = -(2**63)
SIGNED_INT64_MAX_VALUE = 2**63 - 1

# Time Units
NANOS_PER_SEC = 1_000_000_000
MICROS_PER_SEC = 1_000_000
MILLIS_PER_SEC = 1000

# Inflation multiplier from snappy-compressed parquet to pyarrow.
# This should be kept larger than actual average inflation multipliers.
# Note that this is a very rough guess since actual observed pyarrow
Expand Down
167 changes: 110 additions & 57 deletions deltacat/storage/model/transaction.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from __future__ import annotations

import os
import copy
import time
import uuid
import posixpath
import threading
from collections import defaultdict

from itertools import chain
from typing import Optional, List, Union, Tuple
Expand All @@ -17,6 +20,7 @@
RUNNING_TXN_DIR_NAME,
FAILED_TXN_DIR_NAME,
SUCCESS_TXN_DIR_NAME,
NANOS_PER_SEC,
)
from deltacat.storage.model.list_result import ListResult
from deltacat.storage.model.types import (
Expand Down Expand Up @@ -56,35 +60,86 @@ def end_time(self) -> int:

class TransactionSystemTimeProvider(TransactionTimeProvider):
"""
A simple transaction time provider that returns the current system clock
epoch time in milliseconds (i.e., provides no external consistency
guarantees due to clock skew within and between nodes).
This is typically suitable for use with a local catalog, but is not
recommended when writing to cloud catalogs, as transactions using this
provider may display anomalies due to misalignment of distributed system
clocks (e.g., a previously completed transaction is not visible to
a subsequent transaction, or an in-progress transaction only sees partial
results from a completed transaction).
A local transaction time provider that returns the current system clock
epoch time in nanoseconds. Ensures that all local transaction start
times are greater than all last known end times, and that all known end
times are no less than all last known start time across all local threads
using this time provider.
Note that this time provider gives no external consistency guarantees due
to potential clock skew between distributed nodes writing to the same
catalog, and is only recommended for use with local catalogs.
"""

last_known_start_times = defaultdict(int)
last_known_end_times = defaultdict(int)

# don't wait more than 60 seconds for the system clock to catch up
# between transactions (assumed to be indicative of a larger system
# clock change made between transactions)
max_sync_wait_time = 60 * NANOS_PER_SEC

def start_time(self) -> int:
"""
Gets the current system time in milliseconds since the epoch.
:return: Current epoch time in milliseconds.
"""
# hack to ensure serial transactions in a single process have unique IDs
time.sleep(0.0005)
return time.time_ns() // 1_000_000
Gets the current system time in nanoseconds since the epoch.
:return: Current epoch time in nanoseconds.
"""
# ensure serial transactions in a single process have start times after
# the last known end time
last_known_end_times = self.last_known_end_times.values() or [0]
max_known_end_time = max(last_known_end_times)

elapsed_start_time = time.monotonic_ns()
current_time = time.time_ns()
while current_time <= max_known_end_time:
elapsed_time = time.monotonic_ns() - elapsed_start_time
if elapsed_time > self.max_sync_wait_time:
raise TimeoutError(
f"Failed to sync cross-transaction system clock time after "
f"{self.max_sync_wait_time / NANOS_PER_SEC} seconds, "
f"aborting."
)
time.sleep(0.000001)
current_time = time.time_ns()

# update the current thread's last known end time
pid = os.getpid()
tid = threading.current_thread().ident
current_thread_time_key = (pid, tid)
self.last_known_end_times[current_thread_time_key] = current_time

return current_time

def end_time(self) -> int:
"""
Gets the current system time in milliseconds since the epoch.
:return: Current epoch time in milliseconds.
"""
# hack to ensure serial transactions in a single process have unique IDs
time.sleep(0.0005)
return time.time_ns() // 1_000_000
Gets the current system time in nanoseconds since the epoch.
:return: Current epoch time in nanoseconds.
"""
# ensure serial transactions in a single process have end times no less
# than the last known start time
last_known_start_times = self.last_known_start_times.values() or [0]
last_start_time = max(last_known_start_times)

elapsed_start_time = time.monotonic_ns()
current_time = time.time_ns()
while current_time < last_start_time:
elapsed_time = time.monotonic_ns() - elapsed_start_time
if elapsed_time > self.max_sync_wait_time:
raise TimeoutError(
f"Failed to sync cross-transaction system clock time after "
f"{self.max_sync_wait_time / NANOS_PER_SEC} seconds, "
f"aborting."
)
time.sleep(0.000001)
current_time = time.time_ns()

# update the current thread's last known end time
pid = os.getpid()
tid = threading.current_thread().ident
current_thread_time_key = (pid, tid)
self.last_known_start_times[current_thread_time_key] = current_time

return current_time


class TransactionOperation(dict):
Expand Down Expand Up @@ -258,9 +313,8 @@ def read_end_time(
filesystem: Optional[pyarrow.fs.FileSystem] = None,
) -> Optional[int]:
"""
Returns the end time of the transaction in milliseconds since the
epoch based on the transaction log file's modified timestamp, or None
if the transaction log file does not exist.
Returns the end time of the transaction, or None if the transaction
log file does not exist.
:param path: Transaction log path to read.
:param filesystem: File system to use for reading the Transaction file.
:return: Deserialized object from the Transaction file.
Expand Down Expand Up @@ -355,9 +409,9 @@ def end_time(self) -> Optional[int]:

def _mark_start_time(self, time_provider: TransactionTimeProvider) -> int:
"""
Sets the start time of the transaction as current milliseconds since
the epoch. Raises a runtime error if the transaction start time has
already been set by a previous commit.
Sets the start time of the transaction using the given
TransactionTimeProvider. Raises a runtime error if the transaction
start time has already been set by a previous commit.
"""
if self.get("start_time"):
raise RuntimeError("Cannot restart a previously started transaction.")
Expand All @@ -366,15 +420,15 @@ def _mark_start_time(self, time_provider: TransactionTimeProvider) -> int:

def _mark_end_time(self, time_provider: TransactionTimeProvider) -> int:
"""
Sets the end time of the transaction as current milliseconds since
the epoch. Raises a runtime error if the transaction end time has
already been set by a previous commit, or if the transaction start
time has not been set.
Sets the end time of the transaction using the given
TransactionTimeProvider. Raises a runtime error if the transaction end
time has already been set by a previous commit, or if the transaction
start time has not been set.
"""
if not self.get("start_time"):
raise RuntimeError("Cannot end a transaction before it's started.")
raise RuntimeError("Cannot end an unstarted transaction.")
if self.get("end_time"):
raise RuntimeError("Cannot end a previously ended transaction.")
raise RuntimeError("Cannot end a completed transaction.")
end_time = self["end_time"] = time_provider.end_time()
return end_time

Expand Down Expand Up @@ -417,16 +471,6 @@ def _validate_txn_log_file(success_txn_log_file: str) -> None:
f"Transaction log file `{success_txn_log_file}` does not "
f"contain a valid start time."
) from e
if start_time < 0:
raise ValueError(
f"Transaction log file `{success_txn_log_file}` does not "
f"contain a valid start time ({start_time} is pre-epoch)."
)
if start_time > time.time_ns() // 1_000_000:
raise ValueError(
f"Transaction log file `{success_txn_log_file}` does not "
f"contain a valid start time ({start_time} is in the future)."
)
# ensure that the txn uuid is valid
txn_uuid_str = txn_log_parts[1]
try:
Expand All @@ -444,24 +488,19 @@ def _validate_txn_log_file(success_txn_log_file: str) -> None:
f"Transaction log file `{success_txn_log_file}` does not "
f"contain a valid end time."
) from e
# ensure that the transaction end time was recorded after the start time
# ensure transaction end time was not recorded before start time
if end_time < start_time:
raise OSError(
f"Transaction end time {end_time} is earlier than start "
f"time {start_time}! This may indicate a problem "
f"with either the system clock on the host serving the "
f"transaction, or a loss of precision in the filesystem "
f"recording the completed transaction file timestamp (at "
f"least millisecond precision is required). To preserve "
f"catalog integrity, the corresponding completed "
f"transaction log at `{success_txn_log_file}` has been removed."
f"time {start_time}! To preserve catalog integrity, the "
f"corresponding completed transaction log at "
f"`{success_txn_log_file}` has been removed."
)

def commit(
self,
catalog_root_dir: str,
filesystem: Optional[pyarrow.fs.FileSystem] = None,
time_provider: Optional[TransactionTimeProvider] = None,
) -> Union[List[ListResult[Metafile]], Tuple[List[str], str]]:
# TODO(pdames): allow transactions to be durably staged and resumed
# across multiple sessions prior to commit
Expand All @@ -484,8 +523,9 @@ def commit(
success_txn_log_dir = posixpath.join(txn_log_dir, SUCCESS_TXN_DIR_NAME)
filesystem.create_dir(success_txn_log_dir, recursive=False)

if not time_provider:
time_provider = TransactionSystemTimeProvider()
# TODO(pdames): Support injection of other time providers, but ensure
# that ALL transactions in a catalog use the same time provider.
time_provider = TransactionSystemTimeProvider()

# record the transaction start time
txn._mark_start_time(time_provider)
Expand Down Expand Up @@ -563,6 +603,12 @@ def _commit_write(
packed = msgpack.dumps(self.to_serializable())
file.write(packed)

###################################################################
###################################################################
# failure past here telegraphs a failed transaction cleanup attempt
###################################################################
###################################################################

# delete all files written during the failed transaction
known_write_paths = chain.from_iterable(
[
Expand All @@ -577,7 +623,7 @@ def _commit_write(

# delete the in-progress transaction log file entry
filesystem.delete_file(running_txn_log_file_path)

# failed transaction cleanup is now complete
raise

# record the completed transaction
Expand All @@ -604,7 +650,6 @@ def _commit_write(
except Exception as e1:
try:
# move the txn log from success dir to failed dir
# keep parent success txn log dir to telegraph validation fail
failed_txn_log_file_path = posixpath.join(
failed_txn_log_dir,
self.id,
Expand All @@ -613,6 +658,14 @@ def _commit_write(
src=success_txn_log_file_path,
dest=failed_txn_log_file_path,
)
# keep parent success txn log dir to telegraph failed validation

###############################################################
###############################################################
# failure past here telegraphs a failed transaction validation
# cleanup attempt
###############################################################
###############################################################
except Exception as e2:
raise OSError(
f"Failed to cleanup bad transaction log file at "
Expand Down
10 changes: 4 additions & 6 deletions deltacat/tests/storage/model/test_metafile_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
Metafile,
MetafileRevisionInfo,
)
from deltacat.constants import TXN_DIR_NAME, SUCCESS_TXN_DIR_NAME
from deltacat.constants import TXN_DIR_NAME, SUCCESS_TXN_DIR_NAME, NANOS_PER_SEC
from deltacat.utils.filesystem import resolve_path_and_filesystem


Expand Down Expand Up @@ -392,7 +392,7 @@ def test_txn_bad_end_time_fails(self, temp_dir, mocker):
for expected, actual, _ in commit_results:
assert expected.equivalent_to(actual)
# given a transaction with an ending timestamp set in the past
past_timestamp = time.time_ns() // 1_000_000 - 1000
past_timestamp = time.time_ns() - NANOS_PER_SEC
mocker.patch(
"deltacat.storage.model.transaction.Transaction._parse_end_time",
return_value=past_timestamp,
Expand Down Expand Up @@ -442,15 +442,15 @@ def test_txn_conflict_concurrent_complete(self, temp_dir, mocker):
filesystem.create_dir(txn_log_file_dir, recursive=True)
txn_log_file_path = os.path.join(
txn_log_file_dir,
str(time.time_ns() // 1_000_000),
str(time.time_ns()),
)
with filesystem.open_output_stream(txn_log_file_path):
pass # Just create an empty log to mark the txn as complete

# and a concurrent transaction that started before that transaction
# completed, writes the same delta metafile revision, then sees the
# conflict
past_timestamp = time.time_ns() // 1_000_000 - 1000
past_timestamp = time.time_ns() - NANOS_PER_SEC
future_timestamp = 9999999999999
end_time_mock = mocker.patch(
"deltacat.storage.model.transaction.Transaction._parse_end_time",
Expand Down Expand Up @@ -1909,8 +1909,6 @@ def test_table_rename_bad_order_txn_op_chaining(self, temp_dir):
write_paths, txn_log_path = transaction.commit(temp_dir)
assert len(write_paths) == 2

# TODO(pdames): Test isolation of creating a duplicate namespace/table/etc.
# between multiple concurrent transactions.
def test_create_duplicate_namespace(self, temp_dir):
namespace_locator = NamespaceLocator.of(namespace="test_namespace")
namespace = Namespace.of(locator=namespace_locator)
Expand Down

0 comments on commit 9455b4d

Please sign in to comment.