Skip to content

Commit

Permalink
code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
kondratyevd committed Mar 5, 2024
1 parent c8e7ffa commit c880097
Show file tree
Hide file tree
Showing 12 changed files with 41 additions and 35 deletions.
42 changes: 19 additions & 23 deletions af_benchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
import glob
import pandas as pd

from profiling.timing import time_profiler as tp
from data_access.loader import get_file_list
from processor.uproot_processor import UprootProcessor
from time_profiler import time_profiler as tp
from data_loader import get_file_list
from uproot_processor import UprootProcessor

from executor.sequential import SequentialExecutor
from executor.futures import FuturesExecutor
from executor.dask import DaskLocalExecutor, DaskGatewayExecutor
from executors.sequential import SequentialExecutor
from executors.futures import FuturesExecutor
from executors.dask import DaskLocalExecutor, DaskGatewayExecutor
executors = {
'sequential': SequentialExecutor,
'futures': FuturesExecutor,
Expand All @@ -23,14 +23,12 @@
def read_yaml(file_path):
try:
with open(file_path, 'r') as file:
config = yaml.safe_load(file)
# this allows dotted notation while parsing config
config = scalpl.Cut(config)
config = scalpl.Cut(yaml.safe_load(file))
return config
except FileNotFoundError:
raise FileNotFoundError(f"Config file not found at path: {file_path}")
except FileNotFoundError as e:
raise FileNotFoundError(f"Config file not found at path: {file_path}") from e
except yaml.YAMLError as e:
raise ValueError(f"YAML error: {e}")
raise ValueError(f"YAML error: {e}") from e


class Benchmark:
Expand All @@ -39,7 +37,10 @@ def __init__(self, config_path=None):
self.processor = None
self.report_df = pd.DataFrame()
self.col_stats = pd.DataFrame()
self.label = None # arbitrary label

# arbitrary label for interpreting outputs
self.label = None

if config_path:
self.reload_config(config_path)

Expand Down Expand Up @@ -68,8 +69,7 @@ def reset_executor(self, **kwargs):
n_workers = self.config.get('executor.n_workers', 1)

if keep_cluster and hasattr(self.executor, "cluster"):
if reset_workers:
self.executor.wait_for_workers(0)
self.executor.wait_for_workers(0) if reset_workers else None
self.executor.wait_for_workers(n_workers)
else:
self.executor = executors[self.backend](n_workers=n_workers)
Expand All @@ -91,16 +91,11 @@ def run(self):
load_into_memory=True
)


def update_report(self):
n_cols_read = self.config.get('processor.columns', [])
if isinstance(n_cols_read, list):
n_cols_read = len(n_cols_read)

def update_report(self):
report = {
"label": self.label,
"n_files": self.n_files,
"n_columns_read": n_cols_read,
"n_columns_read": len(self.processor.columns),
"n_events": self.col_stats.nevents.sum(),
"operation": self.config.get('processor.operation', 'nothing'),
"executor": self.backend,
Expand All @@ -116,14 +111,15 @@ def update_report(self):
)
)

# Add measurements to common DataFrame
self.report_df = pd.concat([
self.report_df,
pd.DataFrame([report])
]).reset_index(drop=True)


def run_benchmark(config_path):

# Read config from YAML or a directory with multiple YAMLs
if config_path.endswith(".yaml") or config_path.endswith(".yml"):
configs = [config_path]
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@

def get_file_list(cls):
mode = cls.config.get('data-access.mode', 'local')

if mode == 'explicit-files':
file_list = cls.config.get('data-access.files', [])

elif mode == 'explicit-dirs':
dirs = cls.config.get('data-access.directories', [])
file_list = []
for dir in dirs:
file_list.extend(glob.glob(dir+"/**/*.root", recursive = True))

elif mode == 'dbs-datasets':
dbsdatasets = cls.config.get('data-access.datasets', [])
xrootdserver = cls.config.get('data-access.xrootdserver', 'eos.cms.rcac.purdue.edu:1094')
Expand All @@ -20,6 +23,7 @@ def get_file_list(cls):
for dataset in dbsdatasets
for file in dbs.listFiles(dataset=dataset)
]

elif mode == 'dbs-blocks':
dbsblocks = cls.config.get('data-access.blocks', [])
xrootdserver = cls.config.get('data-access.xrootdserver', 'eos.cms.rcac.purdue.edu:1094')
Expand All @@ -29,11 +33,13 @@ def get_file_list(cls):
for block in dbsblocks
for file in dbs.listFiles(block_name=block)
]

elif mode == 'dbs-files':
dbsfiles = cls.config.get('data-access.files', [])
xrootdserver = cls.config.get('data-access.xrootdserver', 'cms-xcache.rcac.purdue.edu:1094')
dbs = DbsApi('https://cmsweb.cern.ch/dbs/prod/global/DBSReader')
file_list = ["root://"+xrootdserver+"/"+file for file in dbsfiles]

else:
raise NotImplementedError(
f"Data access mode {mode} not implemented"
Expand Down
Empty file removed af_benchmark/executor/__init__.py
Empty file.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from executor.base import BaseExecutor
from executors.base import BaseExecutor
import dask
from dask.distributed import LocalCluster, Client
from dask_gateway import Gateway
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from executor.base import BaseExecutor
from executors.base import BaseExecutor
from concurrent import futures

class FuturesExecutor(BaseExecutor):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from executor.base import BaseExecutor
from executors.base import BaseExecutor


class SequentialExecutor(BaseExecutor):
Expand Down
Empty file removed af_benchmark/processor/__init__.py
Empty file.
Empty file removed af_benchmark/profiling/__init__.py
Empty file.
20 changes: 12 additions & 8 deletions af_benchmark/profiling/timing.py → af_benchmark/time_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,18 @@ def wrapper(*args, **kwargs):
func_file_short = "/".join(func_file.split("/")[-3:])

# If by accident there are multiple results - save all
for ft in func_time:
df = pd.DataFrame([{
'func_file': func_file_short,
'func_ln': func_ln,
'func_name': func_name,
'func_time': ft,
}])
self.report_df = pd.concat([self.report_df, df]).reset_index(drop=True)
self.report_df = pd.concat([
self.report_df,
pd.DataFrame([
{
'func_file': func_file_short,
'func_ln': func_ln,
'func_name': func_name,
'func_time': ft
} for ft in func_time
])
]).reset_index(drop=True)

return result
return wrapper

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from profiling.timing import time_profiler as tp
from time_profiler import time_profiler as tp
import pandas as pd
import numpy as np
import uproot
Expand Down

0 comments on commit c880097

Please sign in to comment.