Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructure deltacat metrics to allow for decorators #241

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions deltacat/compute/compactor/compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from deltacat.compute.compactor.model.compactor_version import CompactorVersion
from deltacat.compute.compactor.utils.sort_key import validate_sort_keys
from deltacat.utils.resources import get_current_node_peak_memory_usage_in_bytes
from deltacat.utils.metrics import MetricsConfigSingleton


if importlib.util.find_spec("memray"):
Expand Down Expand Up @@ -139,6 +140,10 @@ def compact_partition(
if s3_client_kwargs is None:
s3_client_kwargs = {}

if metrics_config:
# Initialize MetricsConfigSingleton
MetricsConfigSingleton.instance(metrics_config)

# memray official documentation link:
# https://bloomberg.github.io/memray/getting_started.html
with memray.Tracker(
Expand Down Expand Up @@ -166,7 +171,6 @@ def compact_partition(
rebase_source_partition_locator,
rebase_source_partition_high_watermark,
enable_profiler,
metrics_config,
list_deltas_kwargs,
read_kwargs_provider,
s3_table_writer_kwargs,
Expand Down Expand Up @@ -217,7 +221,6 @@ def _execute_compaction_round(
rebase_source_partition_locator: Optional[PartitionLocator],
rebase_source_partition_high_watermark: Optional[int],
enable_profiler: Optional[bool],
metrics_config: Optional[MetricsConfig],
list_deltas_kwargs: Optional[Dict[str, Any]],
read_kwargs_provider: Optional[ReadKwargsProvider],
s3_table_writer_kwargs: Optional[Dict[str, Any]],
Expand All @@ -234,6 +237,13 @@ def _execute_compaction_round(
if rebase_source_partition_locator
else source_partition_locator
)

# We need to pass in metrics_config to each step, as they are run on separate processes
try:
metrics_config = MetricsConfigSingleton.instance().metrics_config
except Exception:
metrics_config = None

base_audit_url = rcf_source_partition_locator.path(
f"s3://{compaction_artifact_s3_bucket}/compaction-audit"
)
Expand Down Expand Up @@ -437,11 +447,11 @@ def _execute_compaction_round(
num_buckets=hash_bucket_count,
num_groups=max_parallelism,
enable_profiler=enable_profiler,
metrics_config=metrics_config,
read_kwargs_provider=read_kwargs_provider,
object_store=object_store,
deltacat_storage=deltacat_storage,
deltacat_storage_kwargs=deltacat_storage_kwargs,
metrics_config=metrics_config,
**kwargs,
)
hb_invoke_end = time.monotonic()
Expand All @@ -452,7 +462,8 @@ def _execute_compaction_round(
hb_end = time.monotonic()
hb_results_retrieved_at = time.time()

telemetry_time_hb = compaction_audit.save_step_stats(
worker_telemetry_time = 0
worker_telemetry_time += compaction_audit.save_step_stats(
CompactionSessionAuditInfo.HASH_BUCKET_STEP_NAME,
hb_results,
hb_results_retrieved_at,
Expand Down Expand Up @@ -527,8 +538,8 @@ def _execute_compaction_round(
sort_keys=sort_keys,
num_materialize_buckets=num_materialize_buckets,
enable_profiler=enable_profiler,
metrics_config=metrics_config,
object_store=object_store,
metrics_config=metrics_config,
)

dedupe_invoke_end = time.monotonic()
Expand All @@ -545,7 +556,7 @@ def _execute_compaction_round(
total_dd_record_count = sum([ddr.deduped_record_count for ddr in dd_results])
logger.info(f"Deduped {total_dd_record_count} records...")

telemetry_time_dd = compaction_audit.save_step_stats(
worker_telemetry_time += compaction_audit.save_step_stats(
CompactionSessionAuditInfo.DEDUPE_STEP_NAME,
dd_results,
dedupe_results_retrieved_at,
Expand Down Expand Up @@ -604,12 +615,12 @@ def _execute_compaction_round(
max_records_per_output_file=records_per_compacted_file,
compacted_file_content_type=compacted_file_content_type,
enable_profiler=enable_profiler,
metrics_config=metrics_config,
read_kwargs_provider=read_kwargs_provider,
s3_table_writer_kwargs=s3_table_writer_kwargs,
object_store=object_store,
deltacat_storage=deltacat_storage,
deltacat_storage_kwargs=deltacat_storage_kwargs,
metrics_config=metrics_config,
)

materialize_invoke_end = time.monotonic()
Expand All @@ -622,7 +633,7 @@ def _execute_compaction_round(
materialize_end = time.monotonic()
materialize_results_retrieved_at = time.time()

telemetry_time_materialize = compaction_audit.save_step_stats(
worker_telemetry_time += compaction_audit.save_step_stats(
CompactionSessionAuditInfo.MATERIALIZE_STEP_NAME,
mat_results,
materialize_results_retrieved_at,
Expand Down Expand Up @@ -684,8 +695,15 @@ def _execute_compaction_round(
session_peak_memory
)

metrics_telemetry_time = 0
try:
metrics_telemetry_time = MetricsConfigSingleton.instance().total_telemetry_time
except Exception as e:
logger.warn(
f"Skipping calculating metrics telemetry time due to exception: {e}"
)
compaction_audit.save_round_completion_stats(
mat_results, telemetry_time_hb + telemetry_time_dd + telemetry_time_materialize
mat_results, worker_telemetry_time + metrics_telemetry_time
)

s3_utils.upload(
Expand Down
6 changes: 4 additions & 2 deletions deltacat/compute/compactor/repartition_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
PartitionLocator,
interface as unimplemented_deltacat_storage,
)
from deltacat.utils.metrics import MetricsConfig
from deltacat.utils.metrics import MetricsConfig, MetricsConfigSingleton
from deltacat.compute.compactor.utils.sort_key import validate_sort_keys

logger = logs.configure_deltacat_logger(logging.getLogger(__name__))
Expand Down Expand Up @@ -59,6 +59,9 @@ def repartition(
deltacat_storage=unimplemented_deltacat_storage,
**kwargs,
) -> Optional[str]:
# Initialize MetricsConfigSingleton
if metrics_config:
MetricsConfigSingleton.instance(metrics_config)

node_resource_keys = None
if pg_config: # use resource in each placement group
Expand Down Expand Up @@ -130,7 +133,6 @@ def repartition(
max_records_per_output_file=records_per_repartitioned_file,
destination_partition=partition,
enable_profiler=enable_profiler,
metrics_config=metrics_config,
read_kwargs_provider=read_kwargs_provider,
s3_table_writer_kwargs=s3_table_writer_kwargs,
repartitioned_file_content_type=repartitioned_file_content_type,
Expand Down
47 changes: 21 additions & 26 deletions deltacat/compute/compactor/steps/dedupe.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
get_current_ray_worker_id,
)
from deltacat.utils.performance import timed_invocation
from deltacat.utils.metrics import emit_timer_metrics, MetricsConfig
from deltacat.utils.metrics import (
emit_timer_metrics,
MetricsConfig,
MetricsConfigSingleton,
)
from deltacat.io.object_store import IObjectStore
from deltacat.utils.resources import get_current_node_peak_memory_usage_in_bytes

Expand Down Expand Up @@ -100,6 +104,7 @@ def delta_file_locator_to_mat_bucket_index(
return int.from_bytes(digest, "big") % materialize_bucket_count


@emit_timer_metrics(metrics_name="dedupe")
def _timed_dedupe(
object_ids: List[Any],
sort_keys: List[SortKey],
Expand All @@ -108,7 +113,8 @@ def _timed_dedupe(
enable_profiler: bool,
object_store: Optional[IObjectStore],
**kwargs,
):
) -> DedupeResult:
logger.info(f"[Dedupe task {dedupe_task_index}] Starting dedupe task...")
task_id = get_current_ray_task_id()
worker_id = get_current_ray_worker_id()
with memray.Tracker(
Expand Down Expand Up @@ -229,11 +235,17 @@ def _timed_dedupe(
)

peak_memory_usage_bytes = get_current_node_peak_memory_usage_in_bytes()
logger.info(f"[Dedupe task index {dedupe_task_index}] Finished dedupe task...")

try:
telemetry_time = MetricsConfigSingleton.instance().total_telemetry_time
except Exception:
telemetry_time = 0
return DedupeResult(
mat_bucket_to_dd_idx_obj_id,
np.int64(total_deduped_records),
np.double(peak_memory_usage_bytes),
np.double(0.0),
np.double(telemetry_time),
np.double(time.time()),
)

Expand All @@ -245,13 +257,15 @@ def dedupe(
num_materialize_buckets: int,
dedupe_task_index: int,
enable_profiler: bool,
metrics_config: MetricsConfig,
object_store: Optional[IObjectStore],
metrics_config: Optional[MetricsConfig] = None,
**kwargs,
) -> DedupeResult:
logger.info(f"[Dedupe task {dedupe_task_index}] Starting dedupe task...")
dedupe_result, duration = timed_invocation(
func=_timed_dedupe,
# initialize singleton on new process
if metrics_config:
MetricsConfigSingleton.instance(metrics_config)

return _timed_dedupe(
object_ids=object_ids,
sort_keys=sort_keys,
num_materialize_buckets=num_materialize_buckets,
Expand All @@ -260,22 +274,3 @@ def dedupe(
object_store=object_store,
**kwargs,
)

emit_metrics_time = 0.0
if metrics_config:
emit_result, latency = timed_invocation(
func=emit_timer_metrics,
metrics_name="dedupe",
value=duration,
metrics_config=metrics_config,
)
emit_metrics_time = latency

logger.info(f"[Dedupe task index {dedupe_task_index}] Finished dedupe task...")
return DedupeResult(
dedupe_result[0],
dedupe_result[1],
dedupe_result[2],
np.double(emit_metrics_time),
dedupe_result[4],
)
54 changes: 23 additions & 31 deletions deltacat/compute/compactor/steps/hash_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@
get_current_ray_worker_id,
)
from deltacat.utils.common import ReadKwargsProvider
from deltacat.utils.performance import timed_invocation
from deltacat.utils.metrics import emit_timer_metrics, MetricsConfig
from deltacat.utils.metrics import (
emit_timer_metrics,
MetricsConfig,
MetricsConfigSingleton,
)
from deltacat.io.object_store import IObjectStore
from deltacat.utils.resources import get_current_node_peak_memory_usage_in_bytes

Expand Down Expand Up @@ -181,6 +184,7 @@ def _read_delta_file_envelopes(
return delta_file_envelopes, total_record_count


@emit_timer_metrics("hash_bucket")
def _timed_hash_bucket(
annotated_delta: DeltaAnnotated,
round_completion_info: Optional[RoundCompletionInfo],
Expand All @@ -189,12 +193,13 @@ def _timed_hash_bucket(
num_buckets: int,
num_groups: int,
enable_profiler: bool,
read_kwargs_provider: Optional[ReadKwargsProvider] = None,
object_store: Optional[IObjectStore] = None,
read_kwargs_provider: Optional[ReadKwargsProvider],
object_store: Optional[IObjectStore],
deltacat_storage=unimplemented_deltacat_storage,
deltacat_storage_kwargs: Optional[Dict[str, Any]] = None,
**kwargs,
):
) -> HashBucketResult:
logger.info(f"Starting hash bucket task...")
if deltacat_storage_kwargs is None:
deltacat_storage_kwargs = {}
task_id = get_current_ray_task_id()
Expand Down Expand Up @@ -229,11 +234,17 @@ def _timed_hash_bucket(
)

peak_memory_usage_bytes = get_current_node_peak_memory_usage_in_bytes()
logger.info(f"Finished hash bucket task...")

try:
telemetry_time = MetricsConfigSingleton.instance().total_telemetry_time
except Exception:
telemetry_time = 0
return HashBucketResult(
hash_bucket_group_to_obj_id,
np.int64(total_record_count),
np.double(peak_memory_usage_bytes),
np.double(0.0),
np.double(telemetry_time),
np.double(time.time()),
)

Expand All @@ -247,18 +258,18 @@ def hash_bucket(
num_buckets: int,
num_groups: int,
enable_profiler: bool,
metrics_config: MetricsConfig,
read_kwargs_provider: Optional[ReadKwargsProvider],
object_store: Optional[IObjectStore],
deltacat_storage=unimplemented_deltacat_storage,
metrics_config: Optional[MetricsConfig] = None,
deltacat_storage_kwargs: Optional[Dict[str, Any]] = None,
**kwargs,
) -> HashBucketResult:
if deltacat_storage_kwargs is None:
deltacat_storage_kwargs = {}
logger.info(f"Starting hash bucket task...")
hash_bucket_result, duration = timed_invocation(
func=_timed_hash_bucket,
# initialize singleton on new process
if metrics_config:
MetricsConfigSingleton.instance(metrics_config)

return _timed_hash_bucket(
annotated_delta=annotated_delta,
round_completion_info=round_completion_info,
primary_keys=primary_keys,
Expand All @@ -272,22 +283,3 @@ def hash_bucket(
deltacat_storage_kwargs=deltacat_storage_kwargs,
**kwargs,
)

emit_metrics_time = 0.0
if metrics_config:
emit_result, latency = timed_invocation(
func=emit_timer_metrics,
metrics_name="hash_bucket",
value=duration,
metrics_config=metrics_config,
)
emit_metrics_time = latency

logger.info(f"Finished hash bucket task...")
return HashBucketResult(
hash_bucket_result[0],
hash_bucket_result[1],
hash_bucket_result[2],
np.double(emit_metrics_time),
hash_bucket_result[4],
)
Loading