Skip to content

Commit

Permalink
Restructure deltacat metrics to allow for and use decorators
Browse files Browse the repository at this point in the history
  • Loading branch information
Kevin Yan committed Oct 25, 2023
1 parent 634bfcb commit a8f3b61
Show file tree
Hide file tree
Showing 22 changed files with 345 additions and 202 deletions.
36 changes: 27 additions & 9 deletions deltacat/compute/compactor/compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from deltacat.compute.compactor.model.compactor_version import CompactorVersion
from deltacat.compute.compactor.utils.sort_key import validate_sort_keys
from deltacat.utils.resources import get_current_node_peak_memory_usage_in_bytes
from deltacat.utils.metrics import MetricsConfigSingleton


if importlib.util.find_spec("memray"):
Expand Down Expand Up @@ -139,6 +140,10 @@ def compact_partition(
if s3_client_kwargs is None:
s3_client_kwargs = {}

if metrics_config:
# Initialize MetricsConfigSingleton
MetricsConfigSingleton.instance(metrics_config)

# memray official documentation link:
# https://bloomberg.github.io/memray/getting_started.html
with memray.Tracker(
Expand Down Expand Up @@ -166,7 +171,6 @@ def compact_partition(
rebase_source_partition_locator,
rebase_source_partition_high_watermark,
enable_profiler,
metrics_config,
list_deltas_kwargs,
read_kwargs_provider,
s3_table_writer_kwargs,
Expand Down Expand Up @@ -217,7 +221,6 @@ def _execute_compaction_round(
rebase_source_partition_locator: Optional[PartitionLocator],
rebase_source_partition_high_watermark: Optional[int],
enable_profiler: Optional[bool],
metrics_config: Optional[MetricsConfig],
list_deltas_kwargs: Optional[Dict[str, Any]],
read_kwargs_provider: Optional[ReadKwargsProvider],
s3_table_writer_kwargs: Optional[Dict[str, Any]],
Expand All @@ -234,6 +237,13 @@ def _execute_compaction_round(
if rebase_source_partition_locator
else source_partition_locator
)

# We need to pass in metrics_config to each step, as they are run on separate processes
try:
metrics_config = MetricsConfigSingleton.instance().metrics_config
except Exception:
metrics_config = None

base_audit_url = rcf_source_partition_locator.path(
f"s3://{compaction_artifact_s3_bucket}/compaction-audit"
)
Expand Down Expand Up @@ -438,11 +448,11 @@ def _execute_compaction_round(
num_buckets=hash_bucket_count,
num_groups=max_parallelism,
enable_profiler=enable_profiler,
metrics_config=metrics_config,
read_kwargs_provider=read_kwargs_provider,
object_store=object_store,
deltacat_storage=deltacat_storage,
deltacat_storage_kwargs=deltacat_storage_kwargs,
metrics_config=metrics_config,
**kwargs,
)
hb_invoke_end = time.monotonic()
Expand All @@ -453,7 +463,8 @@ def _execute_compaction_round(
hb_end = time.monotonic()
hb_results_retrieved_at = time.time()

telemetry_time_hb = compaction_audit.save_step_stats(
cluster_util_after_task_latency = 0
cluster_util_after_task_latency += compaction_audit.save_step_stats(
CompactionSessionAuditInfo.HASH_BUCKET_STEP_NAME,
hb_results,
hb_results_retrieved_at,
Expand Down Expand Up @@ -528,8 +539,8 @@ def _execute_compaction_round(
sort_keys=sort_keys,
num_materialize_buckets=num_materialize_buckets,
enable_profiler=enable_profiler,
metrics_config=metrics_config,
object_store=object_store,
metrics_config=metrics_config,
)

dedupe_invoke_end = time.monotonic()
Expand All @@ -546,7 +557,7 @@ def _execute_compaction_round(
total_dd_record_count = sum([ddr.deduped_record_count for ddr in dd_results])
logger.info(f"Deduped {total_dd_record_count} records...")

telemetry_time_dd = compaction_audit.save_step_stats(
cluster_util_after_task_latency += compaction_audit.save_step_stats(
CompactionSessionAuditInfo.DEDUPE_STEP_NAME,
dd_results,
dedupe_results_retrieved_at,
Expand Down Expand Up @@ -605,12 +616,12 @@ def _execute_compaction_round(
max_records_per_output_file=records_per_compacted_file,
compacted_file_content_type=compacted_file_content_type,
enable_profiler=enable_profiler,
metrics_config=metrics_config,
read_kwargs_provider=read_kwargs_provider,
s3_table_writer_kwargs=s3_table_writer_kwargs,
object_store=object_store,
deltacat_storage=deltacat_storage,
deltacat_storage_kwargs=deltacat_storage_kwargs,
metrics_config=metrics_config,
)

materialize_invoke_end = time.monotonic()
Expand All @@ -623,7 +634,7 @@ def _execute_compaction_round(
materialize_end = time.monotonic()
materialize_results_retrieved_at = time.time()

telemetry_time_materialize = compaction_audit.save_step_stats(
cluster_util_after_task_latency += compaction_audit.save_step_stats(
CompactionSessionAuditInfo.MATERIALIZE_STEP_NAME,
mat_results,
materialize_results_retrieved_at,
Expand Down Expand Up @@ -685,8 +696,15 @@ def _execute_compaction_round(
session_peak_memory
)

metrics_telemetry_time = 0
try:
metrics_telemetry_time = MetricsConfigSingleton.instance().total_telemetry_time
except Exception as e:
logger.warn(
f"Skipping calculating metrics telemetry time due to exception: {e}"
)
compaction_audit.save_round_completion_stats(
mat_results, telemetry_time_hb + telemetry_time_dd + telemetry_time_materialize
mat_results, cluster_util_after_task_latency + metrics_telemetry_time
)

s3_utils.upload(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,6 @@ def save_step_stats(
f"{step_name}PostObjectStoreMemoryUsedBytes"
] = cluster_utilization_after_task.used_object_store_memory_bytes

telemetry_time = 0
if task_results:
last_task_completed_at = max(
result.task_completed_at for result in task_results
Expand All @@ -846,13 +845,9 @@ def save_step_stats(
result.peak_memory_usage_bytes for result in task_results
)

telemetry_time = sum(
result.telemetry_time_in_seconds for result in task_results
)

self[f"{step_name}TaskPeakMemoryUsedBytes"] = peak_task_memory.item()

return cluster_util_after_task_latency + telemetry_time
return cluster_util_after_task_latency

def save_round_completion_stats(
self, mat_results: List[MaterializeResult], total_telemetry_time: float
Expand Down
1 change: 0 additions & 1 deletion deltacat/compute/compactor/model/dedupe_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,4 @@ class DedupeResult(NamedTuple):
mat_bucket_idx_to_obj_id: Dict[int, Tuple]
deduped_record_count: np.int64
peak_memory_usage_bytes: np.double
telemetry_time_in_seconds: np.double
task_completed_at: np.double
1 change: 0 additions & 1 deletion deltacat/compute/compactor/model/hash_bucket_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,4 @@ class HashBucketResult(NamedTuple):
hash_bucket_group_to_obj_id: np.ndarray
hb_record_count: np.int64
peak_memory_usage_bytes: np.double
telemetry_time_in_seconds: np.double
task_completed_at: np.double
6 changes: 0 additions & 6 deletions deltacat/compute/compactor/model/materialize_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ def of(
pyarrow_write_result: PyArrowWriteResult,
referenced_pyarrow_write_result: Optional[PyArrowWriteResult] = None,
peak_memory_usage_bytes: Optional[np.double] = None,
telemetry_time_in_seconds: Optional[np.double] = None,
task_completed_at: Optional[np.double] = None,
) -> MaterializeResult:
materialize_result = MaterializeResult()
Expand All @@ -25,7 +24,6 @@ def of(
materialize_result["paWriteResult"] = pyarrow_write_result
materialize_result["referencedPaWriteResult"] = referenced_pyarrow_write_result
materialize_result["peakMemoryUsageBytes"] = peak_memory_usage_bytes
materialize_result["telemetryTimeInSeconds"] = telemetry_time_in_seconds
materialize_result["taskCompletedAt"] = task_completed_at
return materialize_result

Expand All @@ -44,10 +42,6 @@ def task_index(self) -> int:
def peak_memory_usage_bytes(self) -> Optional[np.double]:
return self["peakMemoryUsageBytes"]

@property
def telemetry_time_in_seconds(self) -> Optional[np.double]:
return self["telemetryTimeInSeconds"]

@property
def pyarrow_write_result(self) -> PyArrowWriteResult:
val: Dict[str, Any] = self.get("paWriteResult")
Expand Down
6 changes: 4 additions & 2 deletions deltacat/compute/compactor/repartition_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
PartitionLocator,
interface as unimplemented_deltacat_storage,
)
from deltacat.utils.metrics import MetricsConfig
from deltacat.utils.metrics import MetricsConfig, MetricsConfigSingleton
from deltacat.compute.compactor.utils.sort_key import validate_sort_keys

logger = logs.configure_deltacat_logger(logging.getLogger(__name__))
Expand Down Expand Up @@ -59,6 +59,9 @@ def repartition(
deltacat_storage=unimplemented_deltacat_storage,
**kwargs,
) -> Optional[str]:
# Initialize MetricsConfigSingleton
if metrics_config:
MetricsConfigSingleton.instance(metrics_config)

node_resource_keys = None
if pg_config: # use resource in each placement group
Expand Down Expand Up @@ -130,7 +133,6 @@ def repartition(
max_records_per_output_file=records_per_repartitioned_file,
destination_partition=partition,
enable_profiler=enable_profiler,
metrics_config=metrics_config,
read_kwargs_provider=read_kwargs_provider,
s3_table_writer_kwargs=s3_table_writer_kwargs,
repartitioned_file_content_type=repartitioned_file_content_type,
Expand Down
41 changes: 15 additions & 26 deletions deltacat/compute/compactor/steps/dedupe.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
get_current_ray_worker_id,
)
from deltacat.utils.performance import timed_invocation
from deltacat.utils.metrics import emit_timer_metrics, MetricsConfig
from deltacat.utils.metrics import (
emit_timer_metrics,
MetricsConfig,
MetricsConfigSingleton,
)
from deltacat.io.object_store import IObjectStore
from deltacat.utils.resources import get_current_node_peak_memory_usage_in_bytes

Expand Down Expand Up @@ -100,6 +104,7 @@ def delta_file_locator_to_mat_bucket_index(
return int.from_bytes(digest, "big") % materialize_bucket_count


@emit_timer_metrics(metrics_name="dedupe")
def _timed_dedupe(
object_ids: List[Any],
sort_keys: List[SortKey],
Expand All @@ -108,7 +113,8 @@ def _timed_dedupe(
enable_profiler: bool,
object_store: Optional[IObjectStore],
**kwargs,
):
) -> DedupeResult:
logger.info(f"[Dedupe task {dedupe_task_index}] Starting dedupe task...")
task_id = get_current_ray_task_id()
worker_id = get_current_ray_worker_id()
with memray.Tracker(
Expand Down Expand Up @@ -229,11 +235,11 @@ def _timed_dedupe(
)

peak_memory_usage_bytes = get_current_node_peak_memory_usage_in_bytes()
logger.info(f"[Dedupe task index {dedupe_task_index}] Finished dedupe task...")
return DedupeResult(
mat_bucket_to_dd_idx_obj_id,
np.int64(total_deduped_records),
np.double(peak_memory_usage_bytes),
np.double(0.0),
np.double(time.time()),
)

Expand All @@ -245,13 +251,15 @@ def dedupe(
num_materialize_buckets: int,
dedupe_task_index: int,
enable_profiler: bool,
metrics_config: MetricsConfig,
object_store: Optional[IObjectStore],
metrics_config: Optional[MetricsConfig] = None,
**kwargs,
) -> DedupeResult:
logger.info(f"[Dedupe task {dedupe_task_index}] Starting dedupe task...")
dedupe_result, duration = timed_invocation(
func=_timed_dedupe,
# initialize singleton on new process
if metrics_config:
MetricsConfigSingleton.instance(metrics_config)

return _timed_dedupe(
object_ids=object_ids,
sort_keys=sort_keys,
num_materialize_buckets=num_materialize_buckets,
Expand All @@ -260,22 +268,3 @@ def dedupe(
object_store=object_store,
**kwargs,
)

emit_metrics_time = 0.0
if metrics_config:
emit_result, latency = timed_invocation(
func=emit_timer_metrics,
metrics_name="dedupe",
value=duration,
metrics_config=metrics_config,
)
emit_metrics_time = latency

logger.info(f"[Dedupe task index {dedupe_task_index}] Finished dedupe task...")
return DedupeResult(
dedupe_result[0],
dedupe_result[1],
dedupe_result[2],
np.double(emit_metrics_time),
dedupe_result[4],
)
Loading

0 comments on commit a8f3b61

Please sign in to comment.