Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
kondratyevd committed Mar 4, 2024
1 parent a03079a commit 671b3a1
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 21 deletions.
23 changes: 5 additions & 18 deletions af_benchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,13 @@ def reset_executor(self, **kwargs):
keep_cluster = kwargs.get("keep_cluster", False)
reset_workers = kwargs.get("reset_workers", True)

self.backend = self.config.get('executor.backend')
self.backend = self.config.get('executor.backend', 'sequential')
if self.backend not in executors:
raise NotImplementedError(
f"Invalid backend: {self.backend}. Allowed values are: {executors.keys()}"
)

n_workers = self.config.get('executor.n_workers')
if n_workers is None:
n_workers = 1
n_workers = self.config.get('executor.n_workers', 1)

if keep_cluster and hasattr(self.executor, "cluster"):
if reset_workers:
Expand All @@ -88,32 +86,21 @@ def run(self):
self.col_stats = self.processor.process_columns(
files,
self.executor,
parallelize_over=self.config.get('processor.parallelize_over'),
parallelize_over=self.config.get('processor.parallelize_over', 'files'),
load_into_memory=True
)

# outputs_ = self.processor.run_operation(
# column_data,
# self.executor
# )

# if outputs_:
# self.col_stats = pd.concat([o[1] for o in outputs_]).reset_index(drop=True)

# outputs = [o[0] for o in outputs_]

# return outputs_

def update_report(self):
n_cols_read = self.config.get('processor.columns')
n_cols_read = self.config.get('processor.columns', [])
if isinstance(n_cols_read, list):
n_cols_read = len(n_cols_read)

report = {
"n_files": self.n_files,
"n_columns_read": n_cols_read,
"n_events": self.col_stats.nevents.sum(),
"operation": self.config.get('processor.operation'),
"operation": self.config.get('processor.operation', 'nothing'),
"executor": self.backend,
"n_workers": self.executor.get_n_workers(),
"compressed_bytes": self.col_stats.compressed_bytes.sum(),
Expand Down
6 changes: 3 additions & 3 deletions af_benchmark/processor/uproot_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def open_nanoaod(self, file_path, **kwargs):
return tree

def get_column_list(self, file):
columns_to_read = self.config.get('processor.columns')
columns_to_read = self.config.get('processor.columns', [])
tree = self.open_nanoaod(file)
if isinstance(columns_to_read, list):
if any(c not in tree.keys() for c in columns_to_read):
Expand All @@ -28,7 +28,7 @@ def get_column_list(self, file):

@tp.enable
def process_columns(self, files, executor, **kwargs):
parallelize_over = kwargs.get("parallelize_over")
parallelize_over = kwargs.get("parallelize_over", 'files')
arg_dict = {
"files": files,
"columns": self.columns
Expand Down Expand Up @@ -72,7 +72,7 @@ def process_column(self, file, column, **kwargs):


def run_operation(self, column_data, **kwargs):
operation = self.config.get('processor.operation', None)
operation = self.config.get('processor.operation', 'nothing')

if (not operation) or (operation=='nothing'):
return
Expand Down

0 comments on commit 671b3a1

Please sign in to comment.