Skip to content

Commit 0b83015

Browse files
committed
Restructure deltacat metrics to allow for decorators
1 parent 634bfcb commit 0b83015

24 files changed

+233
-304
lines changed

deltacat/compute/compactor/compaction_session.py

+17-9
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
from deltacat.compute.compactor.model.compactor_version import CompactorVersion
5454
from deltacat.compute.compactor.utils.sort_key import validate_sort_keys
5555
from deltacat.utils.resources import get_current_node_peak_memory_usage_in_bytes
56+
from deltacat.utils.metrics import MetricsConfigSingleton
5657

5758

5859
if importlib.util.find_spec("memray"):
@@ -139,6 +140,10 @@ def compact_partition(
139140
if s3_client_kwargs is None:
140141
s3_client_kwargs = {}
141142

143+
if metrics_config:
144+
# Initialize MetricsConfigSingleton
145+
MetricsConfigSingleton.instance(metrics_config)
146+
142147
# memray official documentation link:
143148
# https://bloomberg.github.io/memray/getting_started.html
144149
with memray.Tracker(
@@ -166,7 +171,6 @@ def compact_partition(
166171
rebase_source_partition_locator,
167172
rebase_source_partition_high_watermark,
168173
enable_profiler,
169-
metrics_config,
170174
list_deltas_kwargs,
171175
read_kwargs_provider,
172176
s3_table_writer_kwargs,
@@ -217,7 +221,6 @@ def _execute_compaction_round(
217221
rebase_source_partition_locator: Optional[PartitionLocator],
218222
rebase_source_partition_high_watermark: Optional[int],
219223
enable_profiler: Optional[bool],
220-
metrics_config: Optional[MetricsConfig],
221224
list_deltas_kwargs: Optional[Dict[str, Any]],
222225
read_kwargs_provider: Optional[ReadKwargsProvider],
223226
s3_table_writer_kwargs: Optional[Dict[str, Any]],
@@ -438,7 +441,6 @@ def _execute_compaction_round(
438441
num_buckets=hash_bucket_count,
439442
num_groups=max_parallelism,
440443
enable_profiler=enable_profiler,
441-
metrics_config=metrics_config,
442444
read_kwargs_provider=read_kwargs_provider,
443445
object_store=object_store,
444446
deltacat_storage=deltacat_storage,
@@ -453,7 +455,8 @@ def _execute_compaction_round(
453455
hb_end = time.monotonic()
454456
hb_results_retrieved_at = time.time()
455457

456-
telemetry_time_hb = compaction_audit.save_step_stats(
458+
cluster_util_after_task_latency = 0
459+
cluster_util_after_task_latency += compaction_audit.save_step_stats(
457460
CompactionSessionAuditInfo.HASH_BUCKET_STEP_NAME,
458461
hb_results,
459462
hb_results_retrieved_at,
@@ -528,7 +531,6 @@ def _execute_compaction_round(
528531
sort_keys=sort_keys,
529532
num_materialize_buckets=num_materialize_buckets,
530533
enable_profiler=enable_profiler,
531-
metrics_config=metrics_config,
532534
object_store=object_store,
533535
)
534536

@@ -546,7 +548,7 @@ def _execute_compaction_round(
546548
total_dd_record_count = sum([ddr.deduped_record_count for ddr in dd_results])
547549
logger.info(f"Deduped {total_dd_record_count} records...")
548550

549-
telemetry_time_dd = compaction_audit.save_step_stats(
551+
cluster_util_after_task_latency += compaction_audit.save_step_stats(
550552
CompactionSessionAuditInfo.DEDUPE_STEP_NAME,
551553
dd_results,
552554
dedupe_results_retrieved_at,
@@ -605,7 +607,6 @@ def _execute_compaction_round(
605607
max_records_per_output_file=records_per_compacted_file,
606608
compacted_file_content_type=compacted_file_content_type,
607609
enable_profiler=enable_profiler,
608-
metrics_config=metrics_config,
609610
read_kwargs_provider=read_kwargs_provider,
610611
s3_table_writer_kwargs=s3_table_writer_kwargs,
611612
object_store=object_store,
@@ -623,7 +624,7 @@ def _execute_compaction_round(
623624
materialize_end = time.monotonic()
624625
materialize_results_retrieved_at = time.time()
625626

626-
telemetry_time_materialize = compaction_audit.save_step_stats(
627+
cluster_util_after_task_latency += compaction_audit.save_step_stats(
627628
CompactionSessionAuditInfo.MATERIALIZE_STEP_NAME,
628629
mat_results,
629630
materialize_results_retrieved_at,
@@ -685,8 +686,15 @@ def _execute_compaction_round(
685686
session_peak_memory
686687
)
687688

689+
metrics_telemetry_time = 0
690+
try:
691+
metrics_telemetry_time = MetricsConfigSingleton.instance().total_telemetry_time
692+
except Exception as e:
693+
logger.warn(
694+
f"Skipping calculating metrics telemetry time due to exception: {e}"
695+
)
688696
compaction_audit.save_round_completion_stats(
689-
mat_results, telemetry_time_hb + telemetry_time_dd + telemetry_time_materialize
697+
mat_results, cluster_util_after_task_latency + metrics_telemetry_time
690698
)
691699

692700
s3_utils.upload(

deltacat/compute/compactor/model/compact_partition_params.py

-11
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
)
2424
from deltacat.constants import PYARROW_INFLATION_MULTIPLIER
2525
from deltacat.compute.compactor.utils.sort_key import validate_sort_keys
26-
from deltacat.utils.metrics import MetricsConfig
2726

2827

2928
class CompactPartitionParams(dict):
@@ -90,8 +89,6 @@ def of(params: Optional[Dict]) -> CompactPartitionParams:
9089
result.drop_duplicates = params.get("drop_duplicates", DROP_DUPLICATES)
9190
result.ray_custom_resources = params.get("ray_custom_resources")
9291

93-
result.metrics_config = params.get("metrics_config")
94-
9592
if not importlib.util.find_spec("memray"):
9693
result.enable_profiler = False
9794

@@ -354,14 +351,6 @@ def sort_keys(self) -> Optional[List[SortKey]]:
354351
def sort_keys(self, keys: List[SortKey]) -> None:
355352
self["sort_keys"] = keys
356353

357-
@property
358-
def metrics_config(self) -> Optional[MetricsConfig]:
359-
return self.get("metrics_config")
360-
361-
@metrics_config.setter
362-
def metrics_config(self, config: MetricsConfig) -> None:
363-
self["metrics_config"] = config
364-
365354
@staticmethod
366355
def json_handler_for_compact_partition_params(obj):
367356
"""

deltacat/compute/compactor/model/compaction_session_audit_info.py

+1-6
Original file line numberDiff line numberDiff line change
@@ -832,7 +832,6 @@ def save_step_stats(
832832
f"{step_name}PostObjectStoreMemoryUsedBytes"
833833
] = cluster_utilization_after_task.used_object_store_memory_bytes
834834

835-
telemetry_time = 0
836835
if task_results:
837836
last_task_completed_at = max(
838837
result.task_completed_at for result in task_results
@@ -846,13 +845,9 @@ def save_step_stats(
846845
result.peak_memory_usage_bytes for result in task_results
847846
)
848847

849-
telemetry_time = sum(
850-
result.telemetry_time_in_seconds for result in task_results
851-
)
852-
853848
self[f"{step_name}TaskPeakMemoryUsedBytes"] = peak_task_memory.item()
854849

855-
return cluster_util_after_task_latency + telemetry_time
850+
return cluster_util_after_task_latency
856851

857852
def save_round_completion_stats(
858853
self, mat_results: List[MaterializeResult], total_telemetry_time: float

deltacat/compute/compactor/model/dedupe_result.py

-1
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,4 @@ class DedupeResult(NamedTuple):
77
mat_bucket_idx_to_obj_id: Dict[int, Tuple]
88
deduped_record_count: np.int64
99
peak_memory_usage_bytes: np.double
10-
telemetry_time_in_seconds: np.double
1110
task_completed_at: np.double

deltacat/compute/compactor/model/hash_bucket_result.py

-1
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,4 @@ class HashBucketResult(NamedTuple):
77
hash_bucket_group_to_obj_id: np.ndarray
88
hb_record_count: np.int64
99
peak_memory_usage_bytes: np.double
10-
telemetry_time_in_seconds: np.double
1110
task_completed_at: np.double

deltacat/compute/compactor/model/materialize_result.py

-6
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ def of(
1616
pyarrow_write_result: PyArrowWriteResult,
1717
referenced_pyarrow_write_result: Optional[PyArrowWriteResult] = None,
1818
peak_memory_usage_bytes: Optional[np.double] = None,
19-
telemetry_time_in_seconds: Optional[np.double] = None,
2019
task_completed_at: Optional[np.double] = None,
2120
) -> MaterializeResult:
2221
materialize_result = MaterializeResult()
@@ -25,7 +24,6 @@ def of(
2524
materialize_result["paWriteResult"] = pyarrow_write_result
2625
materialize_result["referencedPaWriteResult"] = referenced_pyarrow_write_result
2726
materialize_result["peakMemoryUsageBytes"] = peak_memory_usage_bytes
28-
materialize_result["telemetryTimeInSeconds"] = telemetry_time_in_seconds
2927
materialize_result["taskCompletedAt"] = task_completed_at
3028
return materialize_result
3129

@@ -44,10 +42,6 @@ def task_index(self) -> int:
4442
def peak_memory_usage_bytes(self) -> Optional[np.double]:
4543
return self["peakMemoryUsageBytes"]
4644

47-
@property
48-
def telemetry_time_in_seconds(self) -> Optional[np.double]:
49-
return self["telemetryTimeInSeconds"]
50-
5145
@property
5246
def pyarrow_write_result(self) -> PyArrowWriteResult:
5347
val: Dict[str, Any] = self.get("paWriteResult")

deltacat/compute/compactor/repartition_session.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
PartitionLocator,
3131
interface as unimplemented_deltacat_storage,
3232
)
33-
from deltacat.utils.metrics import MetricsConfig
33+
from deltacat.utils.metrics import MetricsConfig, MetricsConfigSingleton
3434
from deltacat.compute.compactor.utils.sort_key import validate_sort_keys
3535

3636
logger = logs.configure_deltacat_logger(logging.getLogger(__name__))
@@ -59,6 +59,8 @@ def repartition(
5959
deltacat_storage=unimplemented_deltacat_storage,
6060
**kwargs,
6161
) -> Optional[str]:
62+
# Initialize MetricsConfigSingleton
63+
MetricsConfigSingleton.instance(metrics_config)
6264

6365
node_resource_keys = None
6466
if pg_config: # use resource in each placement group
@@ -130,7 +132,6 @@ def repartition(
130132
max_records_per_output_file=records_per_repartitioned_file,
131133
destination_partition=partition,
132134
enable_profiler=enable_profiler,
133-
metrics_config=metrics_config,
134135
read_kwargs_provider=read_kwargs_provider,
135136
s3_table_writer_kwargs=s3_table_writer_kwargs,
136137
repartitioned_file_content_type=repartitioned_file_content_type,

deltacat/compute/compactor/steps/dedupe.py

+7-47
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
get_current_ray_worker_id,
2424
)
2525
from deltacat.utils.performance import timed_invocation
26-
from deltacat.utils.metrics import emit_timer_metrics, MetricsConfig
26+
from deltacat.utils.metrics import emit_timer_metrics
2727
from deltacat.io.object_store import IObjectStore
2828
from deltacat.utils.resources import get_current_node_peak_memory_usage_in_bytes
2929

@@ -100,15 +100,18 @@ def delta_file_locator_to_mat_bucket_index(
100100
return int.from_bytes(digest, "big") % materialize_bucket_count
101101

102102

103-
def _timed_dedupe(
103+
@ray.remote
104+
@emit_timer_metrics(metrics_name="dedupe")
105+
def dedupe(
104106
object_ids: List[Any],
105107
sort_keys: List[SortKey],
106108
num_materialize_buckets: int,
107109
dedupe_task_index: int,
108110
enable_profiler: bool,
109111
object_store: Optional[IObjectStore],
110112
**kwargs,
111-
):
113+
) -> DedupeResult:
114+
logger.info(f"[Dedupe task {dedupe_task_index}] Starting dedupe task...")
112115
task_id = get_current_ray_task_id()
113116
worker_id = get_current_ray_worker_id()
114117
with memray.Tracker(
@@ -229,53 +232,10 @@ def _timed_dedupe(
229232
)
230233

231234
peak_memory_usage_bytes = get_current_node_peak_memory_usage_in_bytes()
235+
logger.info(f"[Dedupe task index {dedupe_task_index}] Finished dedupe task...")
232236
return DedupeResult(
233237
mat_bucket_to_dd_idx_obj_id,
234238
np.int64(total_deduped_records),
235239
np.double(peak_memory_usage_bytes),
236-
np.double(0.0),
237240
np.double(time.time()),
238241
)
239-
240-
241-
@ray.remote
242-
def dedupe(
243-
object_ids: List[Any],
244-
sort_keys: List[SortKey],
245-
num_materialize_buckets: int,
246-
dedupe_task_index: int,
247-
enable_profiler: bool,
248-
metrics_config: MetricsConfig,
249-
object_store: Optional[IObjectStore],
250-
**kwargs,
251-
) -> DedupeResult:
252-
logger.info(f"[Dedupe task {dedupe_task_index}] Starting dedupe task...")
253-
dedupe_result, duration = timed_invocation(
254-
func=_timed_dedupe,
255-
object_ids=object_ids,
256-
sort_keys=sort_keys,
257-
num_materialize_buckets=num_materialize_buckets,
258-
dedupe_task_index=dedupe_task_index,
259-
enable_profiler=enable_profiler,
260-
object_store=object_store,
261-
**kwargs,
262-
)
263-
264-
emit_metrics_time = 0.0
265-
if metrics_config:
266-
emit_result, latency = timed_invocation(
267-
func=emit_timer_metrics,
268-
metrics_name="dedupe",
269-
value=duration,
270-
metrics_config=metrics_config,
271-
)
272-
emit_metrics_time = latency
273-
274-
logger.info(f"[Dedupe task index {dedupe_task_index}] Finished dedupe task...")
275-
return DedupeResult(
276-
dedupe_result[0],
277-
dedupe_result[1],
278-
dedupe_result[2],
279-
np.double(emit_metrics_time),
280-
dedupe_result[4],
281-
)

0 commit comments

Comments
 (0)