From 774a5b25d6f88ec29ee557be980049657e32ae3e Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Wed, 31 Jan 2024 18:04:13 -0500 Subject: [PATCH 01/12] add in hooks for delayed calculation of task graph --- src/coffea/dataset_tools/apply_processor.py | 80 ++++++++++++++++++--- 1 file changed, 72 insertions(+), 8 deletions(-) diff --git a/src/coffea/dataset_tools/apply_processor.py b/src/coffea/dataset_tools/apply_processor.py index 489b8b6f0..d5aff5bc6 100644 --- a/src/coffea/dataset_tools/apply_processor.py +++ b/src/coffea/dataset_tools/apply_processor.py @@ -1,10 +1,12 @@ from __future__ import annotations import copy +from functools import partial from typing import Any, Callable, Dict, Hashable, List, Set, Tuple, Union import awkward import dask.base +import dask.delayed import dask_awkward from coffea.dataset_tools.preprocess import ( @@ -31,12 +33,66 @@ GenericHEPAnalysis = Callable[[dask_awkward.Array], DaskOutputType] +def _pack_meta_to_wire(*collections): + unpacked, repacker = dask.base.unpack_collections(*collections) + + output = [] + for i in range(len(unpacked)): + output.append(unpacked[i]) + if isinstance( + unpacked[i], (dask_awkward.Array, dask_awkward.Record, dask_awkward.Scalar) + ): + output[-1]._meta = awkward.Array( + unpacked[i]._meta.layout.form.length_zero_array(), + behavior=unpacked[i]._meta.behavior, + attrs=unpacked[i]._meta.attrs, + ) + packed_out = repacker(output) + if len(packed_out) == 1: + return packed_out[0] + return packed_out + + +def _unpack_meta_from_wire(*collections): + unpacked, repacker = dask.base.unpack_collections(*collections) + + output = [] + for i in range(len(unpacked)): + output.append(unpacked[i]) + if isinstance( + unpacked[i], (dask_awkward.Array, dask_awkward.Record, dask_awkward.Scalar) + ): + output[-1]._meta = awkward.Array( + unpacked[i]._meta.layout.to_typetracer(forget_length=True), + behavior=unpacked[i]._meta.behavior, + attrs=unpacked[i]._meta.attrs, + ) + packed_out = repacker(output) + if len(packed_out) == 1: + return packed_out[0] + return packed_out + + +def _apply_analysis(analysis, events_and_maybe_report): + events = events_and_maybe_report + report = None + if isinstance(events_and_maybe_report, tuple): + events, report = events_and_maybe_report + + out = analysis(events) + + if report is not None: + return out, report + return out + + def apply_to_dataset( data_manipulation: ProcessorABC | GenericHEPAnalysis, dataset: DatasetSpec | DatasetSpecOptional, schemaclass: BaseSchema = NanoAODSchema, metadata: dict[Hashable, Any] = {}, uproot_options: dict[str, Any] = {}, + parallelize_with_dask: bool = False, ) -> DaskOutputType | tuple[DaskOutputType, dask_awkward.Array]: """ Apply the supplied function or processor to the supplied dataset. @@ -52,6 +108,8 @@ def apply_to_dataset( Metadata for the dataset that is accessible by the input analysis. Should also be dask-serializable. uproot_options: dict[str, Any], default {} Options to pass to uproot. Pass at least {"allow_read_errors_with_report": True} to turn on file access reports. + parallelize_with_dask: bool, default False + Create dask.delayed objects that will return the the computable dask collections for the analysis when computed. Returns ------- @@ -64,7 +122,7 @@ def apply_to_dataset( if maybe_base_form is not None: maybe_base_form = awkward.forms.from_json(decompress_form(maybe_base_form)) files = dataset["files"] - events = NanoEventsFactory.from_root( + events_and_maybe_report = NanoEventsFactory.from_root( files, metadata=metadata, schemaclass=schemaclass, @@ -72,18 +130,24 @@ def apply_to_dataset( uproot_options=uproot_options, ).events() - report = None - if isinstance(events, tuple): - events, report = events - - out = None + analysis = None if isinstance(data_manipulation, ProcessorABC): - out = data_manipulation.process(events) + analysis = data_manipulation.process elif isinstance(data_manipulation, Callable): - out = data_manipulation(events) + out = data_manipulation else: raise ValueError("data_manipulation must either be a ProcessorABC or Callable") + out = None + if parallelize_with_dask: + out = dask.delayed(partial(_apply_analysis, analysis, events_and_maybe_report)) + else: + out = _apply_analysis(analysis, events_and_maybe_report) + + report = None + if isinstance(out, tuple): + out, report = out + if report is not None: return out, report return (out,) From c80634e1c13766da6224c140419f9d8bfaf58b83 Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Thu, 1 Feb 2024 17:23:53 -0500 Subject: [PATCH 02/12] first working implementation --- src/coffea/dataset_tools/apply_processor.py | 42 ++++++++++++++++++--- tests/test_dataset_tools.py | 9 ++++- 2 files changed, 44 insertions(+), 7 deletions(-) diff --git a/src/coffea/dataset_tools/apply_processor.py b/src/coffea/dataset_tools/apply_processor.py index d5aff5bc6..fbb443767 100644 --- a/src/coffea/dataset_tools/apply_processor.py +++ b/src/coffea/dataset_tools/apply_processor.py @@ -86,6 +86,12 @@ def _apply_analysis(analysis, events_and_maybe_report): return out +def _apply_analysis_wire(analysis, events_and_maybe_report_wire): + events_and_maybe_report = _unpack_meta_from_wire(events_and_maybe_report_wire) + out = _apply_analysis(analysis, events_and_maybe_report) + return _pack_meta_to_wire(out) + + def apply_to_dataset( data_manipulation: ProcessorABC | GenericHEPAnalysis, dataset: DatasetSpec | DatasetSpecOptional, @@ -134,18 +140,19 @@ def apply_to_dataset( if isinstance(data_manipulation, ProcessorABC): analysis = data_manipulation.process elif isinstance(data_manipulation, Callable): - out = data_manipulation + analysis = data_manipulation else: raise ValueError("data_manipulation must either be a ProcessorABC or Callable") out = None if parallelize_with_dask: - out = dask.delayed(partial(_apply_analysis, analysis, events_and_maybe_report)) + wired_events = _pack_meta_to_wire(events_and_maybe_report) + out = dask.delayed(partial(_apply_analysis_wire, analysis, wired_events))() else: out = _apply_analysis(analysis, events_and_maybe_report) report = None - if isinstance(out, tuple): + if isinstance(out, tuple) and not parallelize_with_dask: out, report = out if report is not None: @@ -158,6 +165,8 @@ def apply_to_fileset( fileset: FilesetSpec | FilesetSpecOptional, schemaclass: BaseSchema = NanoAODSchema, uproot_options: dict[str, Any] = {}, + parallelize_with_dask: bool = False, + scheduler: Callable | str | None = None, ) -> dict[str, DaskOutputType] | tuple[dict[str, DaskOutputType], dask_awkward.Array]: """ Apply the supplied function or processor to the supplied fileset (set of datasets). @@ -171,6 +180,10 @@ def apply_to_fileset( The nanoevents schema to interpret the input dataset with. uproot_options: dict[str, Any], default {} Options to pass to uproot. Pass at least {"allow_read_errors_with_report": True} to turn on file access reports. + parallelize_with_dask: bool, default False + Create dask.delayed objects that will return the the computable dask collections for the analysis when computed. + scheduler: Callable | str | None, default None + If parallelize_with_dask is True, this specifies the dask scheduler used to calculate task graphs in parallel. Returns ------- @@ -180,6 +193,7 @@ def apply_to_fileset( The file access report for running the analysis on the input dataset. Needs to be computed in simultaneously with the analysis to be accurate. """ out = {} + analyses_to_compute = {} report = {} for name, dataset in fileset.items(): metadata = copy.deepcopy(dataset.get("metadata", {})) @@ -187,12 +201,30 @@ def apply_to_fileset( metadata = {} metadata.setdefault("dataset", name) dataset_out = apply_to_dataset( - data_manipulation, dataset, schemaclass, metadata, uproot_options + data_manipulation, + dataset, + schemaclass, + metadata, + uproot_options, + parallelize_with_dask, ) - if isinstance(dataset_out, tuple) and len(dataset_out) > 1: + + if parallelize_with_dask: + analyses_to_compute[name] = dataset_out + elif isinstance(dataset_out, tuple): out[name], report[name] = dataset_out else: out[name] = dataset_out[0] + + if parallelize_with_dask: + (calculated_graphs,) = dask.compute(analyses_to_compute, scheduler=scheduler) + for name, dataset_out_wire in calculated_graphs.items(): + dataset_out = _unpack_meta_from_wire(dataset_out_wire) + if isinstance(dataset_out, tuple): + out[name], report[name] = dataset_out + else: + out[name] = dataset_out[0] + if len(report) > 0: return out, report return out diff --git a/tests/test_dataset_tools.py b/tests/test_dataset_tools.py index 0a8ec3b43..fbf652f5d 100644 --- a/tests/test_dataset_tools.py +++ b/tests/test_dataset_tools.py @@ -285,7 +285,8 @@ def test_tuple_data_manipulation_output(allow_read_errors_with_report): "proc_and_schema", [(NanoTestProcessor, BaseSchema), (NanoEventsProcessor, NanoAODSchema)], ) -def test_apply_to_fileset(proc_and_schema): +@pytest.mark.parametrize("delayed_taskgraph_calc", [True, False]) +def test_apply_to_fileset(proc_and_schema, delayed_taskgraph_calc): proc, schemaclass = proc_and_schema with Client() as _: @@ -293,6 +294,7 @@ def test_apply_to_fileset(proc_and_schema): proc(), _runnable_result, schemaclass=schemaclass, + parallelize_with_dask=delayed_taskgraph_calc, ) out = dask.compute(to_compute)[0] @@ -305,6 +307,7 @@ def test_apply_to_fileset(proc_and_schema): proc(), max_chunks(_runnable_result, 1), schemaclass=schemaclass, + parallelize_with_dask=delayed_taskgraph_calc, ) out = dask.compute(to_compute)[0] @@ -536,13 +539,15 @@ def test_slice_chunks(): } -def test_recover_failed_chunks(): +@pytest.mark.parametrize("delayed_taskgraph_calc", [True, False]) +def test_recover_failed_chunks(delayed_taskgraph_calc): with Client() as _: to_compute = apply_to_fileset( NanoEventsProcessor(), _starting_fileset_with_steps, schemaclass=NanoAODSchema, uproot_options={"allow_read_errors_with_report": True}, + parallelize_with_dask=delayed_taskgraph_calc, ) out, reports = dask.compute(*to_compute) From bcbfb40c504eebda26b47b65538a87a99dd54120 Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Thu, 1 Feb 2024 20:45:30 -0500 Subject: [PATCH 03/12] ensure that we can still do GenParticle recursions and such when we do remote task graph calculation --- src/coffea/dataset_tools/apply_processor.py | 29 ++++++++----------- .../test_items/NanoEventsProcessor.py | 3 ++ 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/src/coffea/dataset_tools/apply_processor.py b/src/coffea/dataset_tools/apply_processor.py index fbb443767..a3ed2d759 100644 --- a/src/coffea/dataset_tools/apply_processor.py +++ b/src/coffea/dataset_tools/apply_processor.py @@ -73,22 +73,16 @@ def _unpack_meta_from_wire(*collections): return packed_out -def _apply_analysis(analysis, events_and_maybe_report): - events = events_and_maybe_report +def _apply_analysis_wire(analysis, events_and_maybe_report_wire): + events = _unpack_meta_from_wire(events_and_maybe_report_wire) report = None - if isinstance(events_and_maybe_report, tuple): - events, report = events_and_maybe_report + if isinstance(events, tuple): + events, report = events + events._meta.attrs["@original_array"] = events out = analysis(events) - if report is not None: - return out, report - return out - - -def _apply_analysis_wire(analysis, events_and_maybe_report_wire): - events_and_maybe_report = _unpack_meta_from_wire(events_and_maybe_report_wire) - out = _apply_analysis(analysis, events_and_maybe_report) + return _pack_meta_to_wire(out, report) return _pack_meta_to_wire(out) @@ -136,6 +130,11 @@ def apply_to_dataset( uproot_options=uproot_options, ).events() + events = events_and_maybe_report + report = None + if isinstance(events, tuple): + events, report = events + analysis = None if isinstance(data_manipulation, ProcessorABC): analysis = data_manipulation.process @@ -149,11 +148,7 @@ def apply_to_dataset( wired_events = _pack_meta_to_wire(events_and_maybe_report) out = dask.delayed(partial(_apply_analysis_wire, analysis, wired_events))() else: - out = _apply_analysis(analysis, events_and_maybe_report) - - report = None - if isinstance(out, tuple) and not parallelize_with_dask: - out, report = out + out = analysis(events) if report is not None: return out, report diff --git a/src/coffea/processor/test_items/NanoEventsProcessor.py b/src/coffea/processor/test_items/NanoEventsProcessor.py index 9da016b7d..53c074f3f 100644 --- a/src/coffea/processor/test_items/NanoEventsProcessor.py +++ b/src/coffea/processor/test_items/NanoEventsProcessor.py @@ -69,6 +69,9 @@ def process(self, events): # print(dimuon["0"].behavior) dimuon = dimuon["0"] + dimuon["1"] + if "GenPart" in events.fields: + _ = events.GenPart.parent.pt + output["pt"].fill(dataset=dataset, pt=dak.flatten(muon_pt)) output["mass"].fill(dataset=dataset, mass=dak.flatten(dimuon.mass)) output["cutflow"]["%s_pt" % dataset] = dak.sum(dak.num(events.Muon, axis=1)) From 99441924ad6cf747607b5ef7ffb6987d27e836fa Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Thu, 1 Feb 2024 20:59:06 -0500 Subject: [PATCH 04/12] more proper handling of things that might be tuples --- src/coffea/dataset_tools/apply_processor.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/coffea/dataset_tools/apply_processor.py b/src/coffea/dataset_tools/apply_processor.py index a3ed2d759..61e48dbdc 100644 --- a/src/coffea/dataset_tools/apply_processor.py +++ b/src/coffea/dataset_tools/apply_processor.py @@ -145,7 +145,9 @@ def apply_to_dataset( out = None if parallelize_with_dask: - wired_events = _pack_meta_to_wire(events_and_maybe_report) + if not isinstance(events_and_maybe_report, tuple): + events_and_maybe_report = (events_and_maybe_report,) + wired_events = _pack_meta_to_wire(*events_and_maybe_report) out = dask.delayed(partial(_apply_analysis_wire, analysis, wired_events))() else: out = analysis(events) @@ -214,7 +216,10 @@ def apply_to_fileset( if parallelize_with_dask: (calculated_graphs,) = dask.compute(analyses_to_compute, scheduler=scheduler) for name, dataset_out_wire in calculated_graphs.items(): - dataset_out = _unpack_meta_from_wire(dataset_out_wire) + to_unwire = dataset_out_wire + if not isinstance(dataset_out_wire, tuple): + to_unwire = (dataset_out_wire,) + dataset_out = _unpack_meta_from_wire(*to_unwire) if isinstance(dataset_out, tuple): out[name], report[name] = dataset_out else: From 208c81e02cc21eca5e989cfaed50bdfd7461b8cf Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Tue, 6 Feb 2024 16:07:12 +0100 Subject: [PATCH 05/12] provide interface for serializing taskgraphs to/from disk --- src/coffea/dataset_tools/__init__.py | 9 +- src/coffea/dataset_tools/apply_processor.py | 124 +++++++++++++++----- src/coffea/util.py | 11 +- tests/test_dataset_tools.py | 59 +++++++++- 4 files changed, 161 insertions(+), 42 deletions(-) diff --git a/src/coffea/dataset_tools/__init__.py b/src/coffea/dataset_tools/__init__.py index 647fd336c..cc9488672 100644 --- a/src/coffea/dataset_tools/__init__.py +++ b/src/coffea/dataset_tools/__init__.py @@ -1,4 +1,9 @@ -from coffea.dataset_tools.apply_processor import apply_to_dataset, apply_to_fileset +from coffea.dataset_tools.apply_processor import ( + apply_to_dataset, + apply_to_fileset, + load_taskgraph, + save_taskgraph, +) from coffea.dataset_tools.manipulations import ( filter_files, get_failed_steps_for_dataset, @@ -14,6 +19,8 @@ "preprocess", "apply_to_dataset", "apply_to_fileset", + "save_taskgraph", + "load_taskgraph", "max_chunks", "slice_chunks", "filter_files", diff --git a/src/coffea/dataset_tools/apply_processor.py b/src/coffea/dataset_tools/apply_processor.py index 61e48dbdc..6b48416b2 100644 --- a/src/coffea/dataset_tools/apply_processor.py +++ b/src/coffea/dataset_tools/apply_processor.py @@ -17,7 +17,7 @@ ) from coffea.nanoevents import BaseSchema, NanoAODSchema, NanoEventsFactory from coffea.processor import ProcessorABC -from coffea.util import decompress_form +from coffea.util import decompress_form, load, save DaskOutputBaseType = Union[ dask.base.DaskMethodsMixin, @@ -48,8 +48,6 @@ def _pack_meta_to_wire(*collections): attrs=unpacked[i]._meta.attrs, ) packed_out = repacker(output) - if len(packed_out) == 1: - return packed_out[0] return packed_out @@ -68,21 +66,13 @@ def _unpack_meta_from_wire(*collections): attrs=unpacked[i]._meta.attrs, ) packed_out = repacker(output) - if len(packed_out) == 1: - return packed_out[0] return packed_out -def _apply_analysis_wire(analysis, events_and_maybe_report_wire): - events = _unpack_meta_from_wire(events_and_maybe_report_wire) - report = None - if isinstance(events, tuple): - events, report = events +def _apply_analysis_wire(analysis, events_wire): + (events,) = _unpack_meta_from_wire(events_wire) events._meta.attrs["@original_array"] = events - out = analysis(events) - if report is not None: - return _pack_meta_to_wire(out, report) return _pack_meta_to_wire(out) @@ -145,16 +135,19 @@ def apply_to_dataset( out = None if parallelize_with_dask: - if not isinstance(events_and_maybe_report, tuple): - events_and_maybe_report = (events_and_maybe_report,) - wired_events = _pack_meta_to_wire(*events_and_maybe_report) + (wired_events,) = _pack_meta_to_wire(events) out = dask.delayed(partial(_apply_analysis_wire, analysis, wired_events))() else: out = analysis(events) if report is not None: +<<<<<<< HEAD return out, report return (out,) +======= + return events, out, report + return events, out +>>>>>>> aae802b3 (provide interface for serializing taskgraphs to/from disk) def apply_to_fileset( @@ -184,11 +177,14 @@ def apply_to_fileset( Returns ------- + events: dict[str, dask_awkward.Array] + The NanoEvents objects the analysis function was applied to. out : dict[str, DaskOutputType] The output of the analysis workflow applied to the datasets, keyed by dataset name. report : dask_awkward.Array, optional The file access report for running the analysis on the input dataset. Needs to be computed in simultaneously with the analysis to be accurate. """ + events = {} out = {} analyses_to_compute = {} report = {} @@ -207,24 +203,92 @@ def apply_to_fileset( ) if parallelize_with_dask: - analyses_to_compute[name] = dataset_out - elif isinstance(dataset_out, tuple): - out[name], report[name] = dataset_out + if len(dataset_out) == 3: + events[name], analyses_to_compute[name], report[name] = dataset_out + elif len(dataset_out) == 2: + events[name], analyses_to_compute[name] = dataset_out + else: + raise ValueError( + "apply_to_dataset only returns (events, outputs) or (events, outputs, reports)" + ) + elif isinstance(dataset_out, tuple) and len(dataset_out) == 3: + events[name], out[name], report[name] = dataset_out + elif isinstance(dataset_out, tuple) and len(dataset_out) == 2: + events[name], out[name] = dataset_out[0] else: - out[name] = dataset_out[0] + raise ValueError( + "apply_to_dataset only returns (events, outputs) or (events, outputs, reports)" + ) if parallelize_with_dask: (calculated_graphs,) = dask.compute(analyses_to_compute, scheduler=scheduler) for name, dataset_out_wire in calculated_graphs.items(): - to_unwire = dataset_out_wire - if not isinstance(dataset_out_wire, tuple): - to_unwire = (dataset_out_wire,) - dataset_out = _unpack_meta_from_wire(*to_unwire) - if isinstance(dataset_out, tuple): - out[name], report[name] = dataset_out - else: - out[name] = dataset_out[0] + (out[name],) = _unpack_meta_from_wire(*dataset_out_wire) if len(report) > 0: - return out, report - return out + return events, out, report + return events, out + + +def save_taskgraph(filename, events, *data_products, optimize_graph=False): + """ + Save a task graph and its originating nanoevents to a file + Parameters + ---------- + filename: str + Where to save the resulting serialized taskgraph and nanoevents. + Suggested postfix ".hlg", after dask's HighLevelGraph object. + events: dict[str, dask_awkward.Array] + A dictionary of nanoevents objects. + data_products: dict[str, DaskOutputBaseType] + The data products resulting from applying an analysis to + a NanoEvents object. This may include report objects. + optimize_graph: bool, default False + Whether or not to save the task graph in its optimized form. + + Returns + ------- + """ + (events_wire,) = _pack_meta_to_wire(events) + + if len(data_products) == 0: + raise ValueError( + "You must supply at least one analysis data product to save a task graph!" + ) + + data_products_out = data_products + if optimize_graph: + data_products_out = dask.optimize(data_products) + + data_products_wire = _pack_meta_to_wire(*data_products_out) + + save( + { + "events": events_wire, + "data_products": data_products_wire, + "optimized": optimize_graph, + }, + filename, + ) + + +def load_taskgraph(filename): + """ + Load a task graph and its originating nanoevents from a file. + Parameters + ---------- + filename: str + The file from which to load the task graph. + Returns + _______ + """ + graph_information_wire = load(filename) + + (events,) = _unpack_meta_from_wire(graph_information_wire["events"]) + (data_products,) = _unpack_meta_from_wire(*graph_information_wire["data_products"]) + optimized = graph_information_wire["optimized"] + + for dataset_name in events: + events[dataset_name]._meta.attrs["@original_array"] = events[dataset_name] + + return events, data_products, optimized diff --git a/src/coffea/util.py b/src/coffea/util.py index bfb0b5119..55a99232a 100644 --- a/src/coffea/util.py +++ b/src/coffea/util.py @@ -36,21 +36,20 @@ import lz4.frame -def load(filename): +def load(filename, mode="rb"): """Load a coffea file from disk""" - with lz4.frame.open(filename) as fin: + with lz4.frame.open(filename, mode) as fin: output = cloudpickle.load(fin) return output -def save(output, filename): +def save(output, filename, mode="wb"): """Save a coffea object or collection thereof to disk This function can accept any picklable object. Suggested suffix: ``.coffea`` """ - with lz4.frame.open(filename, "wb") as fout: - thepickle = cloudpickle.dumps(output) - fout.write(thepickle) + with lz4.frame.open(filename, mode) as fout: + cloudpickle.dump(output, fout) def _hex(string): diff --git a/tests/test_dataset_tools.py b/tests/test_dataset_tools.py index fbf652f5d..324c287a6 100644 --- a/tests/test_dataset_tools.py +++ b/tests/test_dataset_tools.py @@ -7,9 +7,11 @@ apply_to_fileset, filter_files, get_failed_steps_for_fileset, + load_taskgraph, max_chunks, max_files, preprocess, + save_taskgraph, slice_chunks, slice_files, ) @@ -290,7 +292,7 @@ def test_apply_to_fileset(proc_and_schema, delayed_taskgraph_calc): proc, schemaclass = proc_and_schema with Client() as _: - to_compute = apply_to_fileset( + _, to_compute = apply_to_fileset( proc(), _runnable_result, schemaclass=schemaclass, @@ -303,7 +305,7 @@ def test_apply_to_fileset(proc_and_schema, delayed_taskgraph_calc): assert out["Data"]["cutflow"]["Data_pt"] == 84 assert out["Data"]["cutflow"]["Data_mass"] == 66 - to_compute = apply_to_fileset( + _, to_compute = apply_to_fileset( proc(), max_chunks(_runnable_result, 1), schemaclass=schemaclass, @@ -328,7 +330,7 @@ def test_apply_to_fileset_hinted_form(): save_form=True, ) - to_compute = apply_to_fileset( + _, to_compute = apply_to_fileset( NanoEventsProcessor(), dataset_runnable, schemaclass=NanoAODSchema, @@ -542,14 +544,14 @@ def test_slice_chunks(): @pytest.mark.parametrize("delayed_taskgraph_calc", [True, False]) def test_recover_failed_chunks(delayed_taskgraph_calc): with Client() as _: - to_compute = apply_to_fileset( + _, to_compute, reports = apply_to_fileset( NanoEventsProcessor(), _starting_fileset_with_steps, schemaclass=NanoAODSchema, uproot_options={"allow_read_errors_with_report": True}, parallelize_with_dask=delayed_taskgraph_calc, ) - out, reports = dask.compute(*to_compute) + out, reports = dask.compute(to_compute, reports) failed_fset = get_failed_steps_for_fileset(_starting_fileset_with_steps, reports) assert failed_fset == { @@ -571,3 +573,50 @@ def test_recover_failed_chunks(delayed_taskgraph_calc): } } } + + +@pytest.mark.parametrize( + "proc_and_schema", + [(NanoTestProcessor, BaseSchema), (NanoEventsProcessor, NanoAODSchema)], +) +@pytest.mark.parametrize( + "with_report", + [True, False], +) +def test_task_graph_serialization(proc_and_schema, with_report): + proc, schemaclass = proc_and_schema + + with Client() as _: + output = apply_to_fileset( + proc(), + _runnable_result, + schemaclass=schemaclass, + parallelize_with_dask=False, + uproot_options={"allow_read_errors_with_report": with_report}, + ) + + events = output[0] + to_compute = output[1:] + + save_taskgraph( + "./test_task_graph_serialization.hlg", + events, + to_compute, + optimize_graph=False, + ) + + _, to_compute_serdes, is_optimized = load_taskgraph( + "./test_task_graph_serialization.hlg" + ) + + print(to_compute_serdes) + + if len(to_compute_serdes) > 1: + (out, _) = dask.compute(*to_compute_serdes) + else: + (out,) = dask.compute(*to_compute_serdes) + + assert out["ZJets"]["cutflow"]["ZJets_pt"] == 18 + assert out["ZJets"]["cutflow"]["ZJets_mass"] == 6 + assert out["Data"]["cutflow"]["Data_pt"] == 84 + assert out["Data"]["cutflow"]["Data_mass"] == 66 From a5b5df97aabb73e3c593c4e1b402438bbbaf0f4e Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Sun, 11 Feb 2024 06:55:17 -0800 Subject: [PATCH 06/12] pickle and compress computed taskgraphs --- src/coffea/dataset_tools/apply_processor.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/coffea/dataset_tools/apply_processor.py b/src/coffea/dataset_tools/apply_processor.py index 6b48416b2..c4a2d2089 100644 --- a/src/coffea/dataset_tools/apply_processor.py +++ b/src/coffea/dataset_tools/apply_processor.py @@ -5,9 +5,11 @@ from typing import Any, Callable, Dict, Hashable, List, Set, Tuple, Union import awkward +import cloudpickle import dask.base import dask.delayed import dask_awkward +import lz4.frame from coffea.dataset_tools.preprocess import ( DatasetSpec, @@ -73,7 +75,8 @@ def _apply_analysis_wire(analysis, events_wire): (events,) = _unpack_meta_from_wire(events_wire) events._meta.attrs["@original_array"] = events out = analysis(events) - return _pack_meta_to_wire(out) + out_wire = _pack_meta_to_wire(out) + return out_wire def apply_to_dataset( @@ -136,7 +139,15 @@ def apply_to_dataset( out = None if parallelize_with_dask: (wired_events,) = _pack_meta_to_wire(events) - out = dask.delayed(partial(_apply_analysis_wire, analysis, wired_events))() + out = dask.delayed( + lambda: lz4.frame.compress( + cloudpickle.dumps( + partial(_apply_analysis_wire, analysis, wired_events)() + ), + compression_level=6, + ) + )() + dask.base.function_cache.clear() else: out = analysis(events) @@ -222,7 +233,10 @@ def apply_to_fileset( if parallelize_with_dask: (calculated_graphs,) = dask.compute(analyses_to_compute, scheduler=scheduler) - for name, dataset_out_wire in calculated_graphs.items(): + for name, compressed_taskgraph in calculated_graphs.items(): + dataset_out_wire = cloudpickle.loads( + lz4.frame.decompress(compressed_taskgraph) + ) (out[name],) = _unpack_meta_from_wire(*dataset_out_wire) if len(report) > 0: From 15abc3973eca9e3df59e0d6edb4e8361412552e2 Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Sun, 11 Feb 2024 06:56:25 -0800 Subject: [PATCH 07/12] old strategy of lazily instantiating models is no longer efficient, better to embed in taskgraph --- src/coffea/ml_tools/xgboost_wrapper.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/coffea/ml_tools/xgboost_wrapper.py b/src/coffea/ml_tools/xgboost_wrapper.py index 6c8271914..f549059b2 100644 --- a/src/coffea/ml_tools/xgboost_wrapper.py +++ b/src/coffea/ml_tools/xgboost_wrapper.py @@ -29,8 +29,10 @@ def __init__(self, fname): ) raise _xgboost_import_error - nonserializable_attribute.__init__(self, ["xgbooster"]) + nonserializable_attribute.__init__(self, []) + self.xgboost_file = fname + self.xgbooster = self._create_xgbooster() def _create_xgbooster(self) -> xgboost.Booster: # Automatic detection of compressed model file From 710dee10de4e44e2ac8d6959376159d4c1e783b7 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 15 Feb 2024 17:49:06 +0000 Subject: [PATCH 08/12] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/coffea/dataset_tools/apply_processor.py | 25 ++++++++++----------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/src/coffea/dataset_tools/apply_processor.py b/src/coffea/dataset_tools/apply_processor.py index c4a2d2089..9293db42c 100644 --- a/src/coffea/dataset_tools/apply_processor.py +++ b/src/coffea/dataset_tools/apply_processor.py @@ -139,26 +139,25 @@ def apply_to_dataset( out = None if parallelize_with_dask: (wired_events,) = _pack_meta_to_wire(events) - out = dask.delayed( - lambda: lz4.frame.compress( - cloudpickle.dumps( - partial(_apply_analysis_wire, analysis, wired_events)() - ), - compression_level=6, - ) - )() + out = ( + dask.delayed( + lambda: lz4.frame.compress( + cloudpickle.dumps( + partial(_apply_analysis_wire, analysis, wired_events)() + ), + compression_level=6, + ) + )(), + ) dask.base.function_cache.clear() else: out = analysis(events) + if not isinstance(out, tuple): + out = (out,) if report is not None: -<<<<<<< HEAD - return out, report - return (out,) -======= return events, out, report return events, out ->>>>>>> aae802b3 (provide interface for serializing taskgraphs to/from disk) def apply_to_fileset( From 730932d5a076cc18eb31ac63ff41ca0481a16c94 Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Thu, 15 Feb 2024 16:57:42 -0600 Subject: [PATCH 09/12] ressurect tests after tuple-out fix --- src/coffea/dataset_tools/apply_processor.py | 25 ++++++++++++--------- tests/test_dataset_tools.py | 16 ++++++++----- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/src/coffea/dataset_tools/apply_processor.py b/src/coffea/dataset_tools/apply_processor.py index 9293db42c..b93b4c6fd 100644 --- a/src/coffea/dataset_tools/apply_processor.py +++ b/src/coffea/dataset_tools/apply_processor.py @@ -139,16 +139,14 @@ def apply_to_dataset( out = None if parallelize_with_dask: (wired_events,) = _pack_meta_to_wire(events) - out = ( - dask.delayed( - lambda: lz4.frame.compress( - cloudpickle.dumps( - partial(_apply_analysis_wire, analysis, wired_events)() - ), - compression_level=6, - ) - )(), - ) + out = dask.delayed( + lambda: lz4.frame.compress( + cloudpickle.dumps( + partial(_apply_analysis_wire, analysis, wired_events)() + ), + compression_level=6, + ) + )() dask.base.function_cache.clear() else: out = analysis(events) @@ -217,6 +215,7 @@ def apply_to_fileset( events[name], analyses_to_compute[name], report[name] = dataset_out elif len(dataset_out) == 2: events[name], analyses_to_compute[name] = dataset_out + print(dataset_out) else: raise ValueError( "apply_to_dataset only returns (events, outputs) or (events, outputs, reports)" @@ -224,7 +223,7 @@ def apply_to_fileset( elif isinstance(dataset_out, tuple) and len(dataset_out) == 3: events[name], out[name], report[name] = dataset_out elif isinstance(dataset_out, tuple) and len(dataset_out) == 2: - events[name], out[name] = dataset_out[0] + events[name], out[name] = dataset_out else: raise ValueError( "apply_to_dataset only returns (events, outputs) or (events, outputs, reports)" @@ -238,6 +237,10 @@ def apply_to_fileset( ) (out[name],) = _unpack_meta_from_wire(*dataset_out_wire) + for name in out: + if isinstance(out[name], tuple) and len(out[name]) == 1: + out[name] = out[name][0] + if len(report) > 0: return events, out, report return events, out diff --git a/tests/test_dataset_tools.py b/tests/test_dataset_tools.py index 324c287a6..8fd9235c1 100644 --- a/tests/test_dataset_tools.py +++ b/tests/test_dataset_tools.py @@ -220,8 +220,8 @@ def test_tuple_data_manipulation_output(allow_read_errors_with_report): if allow_read_errors_with_report: assert isinstance(out, tuple) - assert len(out) == 2 - out, report = out + assert len(out) == 3 + _, out, report = out assert isinstance(out, dict) assert isinstance(report, dict) assert out.keys() == {"ZJets", "Data"} @@ -236,8 +236,10 @@ def test_tuple_data_manipulation_output(allow_read_errors_with_report): assert isinstance(report["ZJets"], dask_awkward.Array) assert isinstance(report["Data"], dask_awkward.Array) else: - assert isinstance(out, dict) + assert isinstance(out, tuple) assert len(out) == 2 + _, out = out + assert isinstance(out, dict) assert out.keys() == {"ZJets", "Data"} assert isinstance(out["ZJets"], tuple) assert isinstance(out["Data"], tuple) @@ -255,8 +257,8 @@ def test_tuple_data_manipulation_output(allow_read_errors_with_report): if allow_read_errors_with_report: assert isinstance(out, tuple) - assert len(out) == 2 - out, report = out + assert len(out) == 3 + _, out, report = out assert isinstance(out, dict) assert isinstance(report, dict) assert out.keys() == {"ZJets", "Data"} @@ -271,8 +273,10 @@ def test_tuple_data_manipulation_output(allow_read_errors_with_report): assert isinstance(report["ZJets"], dask_awkward.Array) assert isinstance(report["Data"], dask_awkward.Array) else: - assert isinstance(out, dict) + assert isinstance(out, tuple) assert len(out) == 2 + _, out = out + assert isinstance(out, dict) assert out.keys() == {"ZJets", "Data"} assert isinstance(out["ZJets"], tuple) assert isinstance(out["Data"], tuple) From f589b93c712aafcfc9b57d906de65e389d250352 Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Wed, 21 Feb 2024 15:47:10 -0600 Subject: [PATCH 10/12] make interface changes backwards compatible --- src/coffea/dataset_tools/apply_processor.py | 23 +++++++++++++++++---- tests/test_dataset_tools.py | 7 +++++++ 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/src/coffea/dataset_tools/apply_processor.py b/src/coffea/dataset_tools/apply_processor.py index b93b4c6fd..362f47961 100644 --- a/src/coffea/dataset_tools/apply_processor.py +++ b/src/coffea/dataset_tools/apply_processor.py @@ -86,7 +86,11 @@ def apply_to_dataset( metadata: dict[Hashable, Any] = {}, uproot_options: dict[str, Any] = {}, parallelize_with_dask: bool = False, -) -> DaskOutputType | tuple[DaskOutputType, dask_awkward.Array]: +) -> ( + DaskOutputType + | tuple[DaskOutputType, dask_awkward.Array] + | tuple[dask_awkward.Array, DaskOutputType, dask_awkward.Array] +): """ Apply the supplied function or processor to the supplied dataset. Parameters @@ -103,6 +107,8 @@ def apply_to_dataset( Options to pass to uproot. Pass at least {"allow_read_errors_with_report": True} to turn on file access reports. parallelize_with_dask: bool, default False Create dask.delayed objects that will return the the computable dask collections for the analysis when computed. + return_events: bool, default True + Return the created events object, or not. Returns ------- @@ -165,7 +171,16 @@ def apply_to_fileset( uproot_options: dict[str, Any] = {}, parallelize_with_dask: bool = False, scheduler: Callable | str | None = None, -) -> dict[str, DaskOutputType] | tuple[dict[str, DaskOutputType], dask_awkward.Array]: + return_events: bool = False, +) -> ( + dict[str, DaskOutputType] + | tuple[dict[str, DaskOutputType], dict[str, dask_awkward.Array]] + | tuple[ + dict[str, dask_awkward.Array], + dict[str, DaskOutputType], + dict[str, dask_awkward.Array], + ] +): """ Apply the supplied function or processor to the supplied fileset (set of datasets). Parameters @@ -242,8 +257,8 @@ def apply_to_fileset( out[name] = out[name][0] if len(report) > 0: - return events, out, report - return events, out + return (events, out, report) if return_events else (out, report) + return (events, out) if return_events else out def save_taskgraph(filename, events, *data_products, optimize_graph=False): diff --git a/tests/test_dataset_tools.py b/tests/test_dataset_tools.py index 8fd9235c1..12566869d 100644 --- a/tests/test_dataset_tools.py +++ b/tests/test_dataset_tools.py @@ -216,6 +216,7 @@ def test_tuple_data_manipulation_output(allow_read_errors_with_report): _my_analysis_output_2, _runnable_result, uproot_options={"allow_read_errors_with_report": allow_read_errors_with_report}, + return_events=True, ) if allow_read_errors_with_report: @@ -253,6 +254,7 @@ def test_tuple_data_manipulation_output(allow_read_errors_with_report): _my_analysis_output_3, _runnable_result, uproot_options={"allow_read_errors_with_report": allow_read_errors_with_report}, + return_events=True, ) if allow_read_errors_with_report: @@ -301,6 +303,7 @@ def test_apply_to_fileset(proc_and_schema, delayed_taskgraph_calc): _runnable_result, schemaclass=schemaclass, parallelize_with_dask=delayed_taskgraph_calc, + return_events=True, ) out = dask.compute(to_compute)[0] @@ -314,6 +317,7 @@ def test_apply_to_fileset(proc_and_schema, delayed_taskgraph_calc): max_chunks(_runnable_result, 1), schemaclass=schemaclass, parallelize_with_dask=delayed_taskgraph_calc, + return_events=True, ) out = dask.compute(to_compute)[0] @@ -338,6 +342,7 @@ def test_apply_to_fileset_hinted_form(): NanoEventsProcessor(), dataset_runnable, schemaclass=NanoAODSchema, + return_events=True, ) out = dask.compute(to_compute)[0] @@ -554,6 +559,7 @@ def test_recover_failed_chunks(delayed_taskgraph_calc): schemaclass=NanoAODSchema, uproot_options={"allow_read_errors_with_report": True}, parallelize_with_dask=delayed_taskgraph_calc, + return_events=True, ) out, reports = dask.compute(to_compute, reports) @@ -597,6 +603,7 @@ def test_task_graph_serialization(proc_and_schema, with_report): schemaclass=schemaclass, parallelize_with_dask=False, uproot_options={"allow_read_errors_with_report": with_report}, + return_events=True, ) events = output[0] From 25ffa723ae5d870c07b767061aa6a208c693ea2f Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Wed, 21 Feb 2024 16:04:39 -0600 Subject: [PATCH 11/12] performance optimization for lumimask taskgraphs --- src/coffea/lumi_tools/lumi_tools.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/coffea/lumi_tools/lumi_tools.py b/src/coffea/lumi_tools/lumi_tools.py index edc5db34c..2ef89ad06 100644 --- a/src/coffea/lumi_tools/lumi_tools.py +++ b/src/coffea/lumi_tools/lumi_tools.py @@ -170,10 +170,12 @@ def __call__(self, runs, lumis): """ def apply(runs, lumis): + backend = awkward.backend(runs) # fill numba typed dict _masks = Dict.empty(key_type=types.uint32, value_type=types.uint32[:]) - for k, v in self._masks.items(): - _masks[k] = v + if backend != "typetracer": + for k, v in self._masks.items(): + _masks[k] = v runs_orig = runs if isinstance(runs, awkward.highlevel.Array): @@ -185,10 +187,11 @@ def apply(runs, lumis): awkward.typetracer.length_zero_if_typetracer(lumis) ) mask_out = numpy.zeros(dtype="bool", shape=runs.shape) - LumiMask._apply_run_lumi_mask_kernel(_masks, runs, lumis, mask_out) + if backend != "typetracer": + LumiMask._apply_run_lumi_mask_kernel(_masks, runs, lumis, mask_out) if isinstance(runs_orig, awkward.Array): mask_out = awkward.Array(mask_out) - if awkward.backend(runs_orig) == "typetracer": + if backend == "typetracer": mask_out = awkward.Array( mask_out.layout.to_typetracer(forget_length=True) ) From e0ff08339b63b7a0cc6dc16b080490a5e5f6b3ab Mon Sep 17 00:00:00 2001 From: Lindsey Gray Date: Wed, 6 Mar 2024 13:44:21 -0600 Subject: [PATCH 12/12] guard function_cache --- src/coffea/dataset_tools/apply_processor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/coffea/dataset_tools/apply_processor.py b/src/coffea/dataset_tools/apply_processor.py index 362f47961..e5736bf2f 100644 --- a/src/coffea/dataset_tools/apply_processor.py +++ b/src/coffea/dataset_tools/apply_processor.py @@ -153,7 +153,8 @@ def apply_to_dataset( compression_level=6, ) )() - dask.base.function_cache.clear() + if hasattr(dask.base, "function_cache"): + dask.base.function_cache.clear() else: out = analysis(events) if not isinstance(out, tuple):