diff --git a/src/coffea/dataset_tools/__init__.py b/src/coffea/dataset_tools/__init__.py index 647fd336c..cc9488672 100644 --- a/src/coffea/dataset_tools/__init__.py +++ b/src/coffea/dataset_tools/__init__.py @@ -1,4 +1,9 @@ -from coffea.dataset_tools.apply_processor import apply_to_dataset, apply_to_fileset +from coffea.dataset_tools.apply_processor import ( + apply_to_dataset, + apply_to_fileset, + load_taskgraph, + save_taskgraph, +) from coffea.dataset_tools.manipulations import ( filter_files, get_failed_steps_for_dataset, @@ -14,6 +19,8 @@ "preprocess", "apply_to_dataset", "apply_to_fileset", + "save_taskgraph", + "load_taskgraph", "max_chunks", "slice_chunks", "filter_files", diff --git a/src/coffea/dataset_tools/apply_processor.py b/src/coffea/dataset_tools/apply_processor.py index 489b8b6f0..e5736bf2f 100644 --- a/src/coffea/dataset_tools/apply_processor.py +++ b/src/coffea/dataset_tools/apply_processor.py @@ -1,11 +1,15 @@ from __future__ import annotations import copy +from functools import partial from typing import Any, Callable, Dict, Hashable, List, Set, Tuple, Union import awkward +import cloudpickle import dask.base +import dask.delayed import dask_awkward +import lz4.frame from coffea.dataset_tools.preprocess import ( DatasetSpec, @@ -15,7 +19,7 @@ ) from coffea.nanoevents import BaseSchema, NanoAODSchema, NanoEventsFactory from coffea.processor import ProcessorABC -from coffea.util import decompress_form +from coffea.util import decompress_form, load, save DaskOutputBaseType = Union[ dask.base.DaskMethodsMixin, @@ -31,13 +35,62 @@ GenericHEPAnalysis = Callable[[dask_awkward.Array], DaskOutputType] +def _pack_meta_to_wire(*collections): + unpacked, repacker = dask.base.unpack_collections(*collections) + + output = [] + for i in range(len(unpacked)): + output.append(unpacked[i]) + if isinstance( + unpacked[i], (dask_awkward.Array, dask_awkward.Record, dask_awkward.Scalar) + ): + output[-1]._meta = awkward.Array( + unpacked[i]._meta.layout.form.length_zero_array(), + behavior=unpacked[i]._meta.behavior, + attrs=unpacked[i]._meta.attrs, + ) + packed_out = repacker(output) + return packed_out + + +def _unpack_meta_from_wire(*collections): + unpacked, repacker = dask.base.unpack_collections(*collections) + + output = [] + for i in range(len(unpacked)): + output.append(unpacked[i]) + if isinstance( + unpacked[i], (dask_awkward.Array, dask_awkward.Record, dask_awkward.Scalar) + ): + output[-1]._meta = awkward.Array( + unpacked[i]._meta.layout.to_typetracer(forget_length=True), + behavior=unpacked[i]._meta.behavior, + attrs=unpacked[i]._meta.attrs, + ) + packed_out = repacker(output) + return packed_out + + +def _apply_analysis_wire(analysis, events_wire): + (events,) = _unpack_meta_from_wire(events_wire) + events._meta.attrs["@original_array"] = events + out = analysis(events) + out_wire = _pack_meta_to_wire(out) + return out_wire + + def apply_to_dataset( data_manipulation: ProcessorABC | GenericHEPAnalysis, dataset: DatasetSpec | DatasetSpecOptional, schemaclass: BaseSchema = NanoAODSchema, metadata: dict[Hashable, Any] = {}, uproot_options: dict[str, Any] = {}, -) -> DaskOutputType | tuple[DaskOutputType, dask_awkward.Array]: + parallelize_with_dask: bool = False, +) -> ( + DaskOutputType + | tuple[DaskOutputType, dask_awkward.Array] + | tuple[dask_awkward.Array, DaskOutputType, dask_awkward.Array] +): """ Apply the supplied function or processor to the supplied dataset. Parameters @@ -52,6 +105,10 @@ def apply_to_dataset( Metadata for the dataset that is accessible by the input analysis. Should also be dask-serializable. uproot_options: dict[str, Any], default {} Options to pass to uproot. Pass at least {"allow_read_errors_with_report": True} to turn on file access reports. + parallelize_with_dask: bool, default False + Create dask.delayed objects that will return the the computable dask collections for the analysis when computed. + return_events: bool, default True + Return the created events object, or not. Returns ------- @@ -64,7 +121,7 @@ def apply_to_dataset( if maybe_base_form is not None: maybe_base_form = awkward.forms.from_json(decompress_form(maybe_base_form)) files = dataset["files"] - events = NanoEventsFactory.from_root( + events_and_maybe_report = NanoEventsFactory.from_root( files, metadata=metadata, schemaclass=schemaclass, @@ -72,21 +129,40 @@ def apply_to_dataset( uproot_options=uproot_options, ).events() + events = events_and_maybe_report report = None if isinstance(events, tuple): events, report = events - out = None + analysis = None if isinstance(data_manipulation, ProcessorABC): - out = data_manipulation.process(events) + analysis = data_manipulation.process elif isinstance(data_manipulation, Callable): - out = data_manipulation(events) + analysis = data_manipulation else: raise ValueError("data_manipulation must either be a ProcessorABC or Callable") + out = None + if parallelize_with_dask: + (wired_events,) = _pack_meta_to_wire(events) + out = dask.delayed( + lambda: lz4.frame.compress( + cloudpickle.dumps( + partial(_apply_analysis_wire, analysis, wired_events)() + ), + compression_level=6, + ) + )() + if hasattr(dask.base, "function_cache"): + dask.base.function_cache.clear() + else: + out = analysis(events) + if not isinstance(out, tuple): + out = (out,) + if report is not None: - return out, report - return (out,) + return events, out, report + return events, out def apply_to_fileset( @@ -94,7 +170,18 @@ def apply_to_fileset( fileset: FilesetSpec | FilesetSpecOptional, schemaclass: BaseSchema = NanoAODSchema, uproot_options: dict[str, Any] = {}, -) -> dict[str, DaskOutputType] | tuple[dict[str, DaskOutputType], dask_awkward.Array]: + parallelize_with_dask: bool = False, + scheduler: Callable | str | None = None, + return_events: bool = False, +) -> ( + dict[str, DaskOutputType] + | tuple[dict[str, DaskOutputType], dict[str, dask_awkward.Array]] + | tuple[ + dict[str, dask_awkward.Array], + dict[str, DaskOutputType], + dict[str, dask_awkward.Array], + ] +): """ Apply the supplied function or processor to the supplied fileset (set of datasets). Parameters @@ -107,15 +194,23 @@ def apply_to_fileset( The nanoevents schema to interpret the input dataset with. uproot_options: dict[str, Any], default {} Options to pass to uproot. Pass at least {"allow_read_errors_with_report": True} to turn on file access reports. + parallelize_with_dask: bool, default False + Create dask.delayed objects that will return the the computable dask collections for the analysis when computed. + scheduler: Callable | str | None, default None + If parallelize_with_dask is True, this specifies the dask scheduler used to calculate task graphs in parallel. Returns ------- + events: dict[str, dask_awkward.Array] + The NanoEvents objects the analysis function was applied to. out : dict[str, DaskOutputType] The output of the analysis workflow applied to the datasets, keyed by dataset name. report : dask_awkward.Array, optional The file access report for running the analysis on the input dataset. Needs to be computed in simultaneously with the analysis to be accurate. """ + events = {} out = {} + analyses_to_compute = {} report = {} for name, dataset in fileset.items(): metadata = copy.deepcopy(dataset.get("metadata", {})) @@ -123,12 +218,109 @@ def apply_to_fileset( metadata = {} metadata.setdefault("dataset", name) dataset_out = apply_to_dataset( - data_manipulation, dataset, schemaclass, metadata, uproot_options + data_manipulation, + dataset, + schemaclass, + metadata, + uproot_options, + parallelize_with_dask, ) - if isinstance(dataset_out, tuple) and len(dataset_out) > 1: - out[name], report[name] = dataset_out + + if parallelize_with_dask: + if len(dataset_out) == 3: + events[name], analyses_to_compute[name], report[name] = dataset_out + elif len(dataset_out) == 2: + events[name], analyses_to_compute[name] = dataset_out + print(dataset_out) + else: + raise ValueError( + "apply_to_dataset only returns (events, outputs) or (events, outputs, reports)" + ) + elif isinstance(dataset_out, tuple) and len(dataset_out) == 3: + events[name], out[name], report[name] = dataset_out + elif isinstance(dataset_out, tuple) and len(dataset_out) == 2: + events[name], out[name] = dataset_out else: - out[name] = dataset_out[0] + raise ValueError( + "apply_to_dataset only returns (events, outputs) or (events, outputs, reports)" + ) + + if parallelize_with_dask: + (calculated_graphs,) = dask.compute(analyses_to_compute, scheduler=scheduler) + for name, compressed_taskgraph in calculated_graphs.items(): + dataset_out_wire = cloudpickle.loads( + lz4.frame.decompress(compressed_taskgraph) + ) + (out[name],) = _unpack_meta_from_wire(*dataset_out_wire) + + for name in out: + if isinstance(out[name], tuple) and len(out[name]) == 1: + out[name] = out[name][0] + if len(report) > 0: - return out, report - return out + return (events, out, report) if return_events else (out, report) + return (events, out) if return_events else out + + +def save_taskgraph(filename, events, *data_products, optimize_graph=False): + """ + Save a task graph and its originating nanoevents to a file + Parameters + ---------- + filename: str + Where to save the resulting serialized taskgraph and nanoevents. + Suggested postfix ".hlg", after dask's HighLevelGraph object. + events: dict[str, dask_awkward.Array] + A dictionary of nanoevents objects. + data_products: dict[str, DaskOutputBaseType] + The data products resulting from applying an analysis to + a NanoEvents object. This may include report objects. + optimize_graph: bool, default False + Whether or not to save the task graph in its optimized form. + + Returns + ------- + """ + (events_wire,) = _pack_meta_to_wire(events) + + if len(data_products) == 0: + raise ValueError( + "You must supply at least one analysis data product to save a task graph!" + ) + + data_products_out = data_products + if optimize_graph: + data_products_out = dask.optimize(data_products) + + data_products_wire = _pack_meta_to_wire(*data_products_out) + + save( + { + "events": events_wire, + "data_products": data_products_wire, + "optimized": optimize_graph, + }, + filename, + ) + + +def load_taskgraph(filename): + """ + Load a task graph and its originating nanoevents from a file. + Parameters + ---------- + filename: str + The file from which to load the task graph. + Returns + _______ + """ + graph_information_wire = load(filename) + + (events,) = _unpack_meta_from_wire(graph_information_wire["events"]) + (data_products,) = _unpack_meta_from_wire(*graph_information_wire["data_products"]) + optimized = graph_information_wire["optimized"] + + for dataset_name in events: + events[dataset_name]._meta.attrs["@original_array"] = events[dataset_name] + + return events, data_products, optimized diff --git a/src/coffea/lumi_tools/lumi_tools.py b/src/coffea/lumi_tools/lumi_tools.py index edc5db34c..2ef89ad06 100644 --- a/src/coffea/lumi_tools/lumi_tools.py +++ b/src/coffea/lumi_tools/lumi_tools.py @@ -170,10 +170,12 @@ def __call__(self, runs, lumis): """ def apply(runs, lumis): + backend = awkward.backend(runs) # fill numba typed dict _masks = Dict.empty(key_type=types.uint32, value_type=types.uint32[:]) - for k, v in self._masks.items(): - _masks[k] = v + if backend != "typetracer": + for k, v in self._masks.items(): + _masks[k] = v runs_orig = runs if isinstance(runs, awkward.highlevel.Array): @@ -185,10 +187,11 @@ def apply(runs, lumis): awkward.typetracer.length_zero_if_typetracer(lumis) ) mask_out = numpy.zeros(dtype="bool", shape=runs.shape) - LumiMask._apply_run_lumi_mask_kernel(_masks, runs, lumis, mask_out) + if backend != "typetracer": + LumiMask._apply_run_lumi_mask_kernel(_masks, runs, lumis, mask_out) if isinstance(runs_orig, awkward.Array): mask_out = awkward.Array(mask_out) - if awkward.backend(runs_orig) == "typetracer": + if backend == "typetracer": mask_out = awkward.Array( mask_out.layout.to_typetracer(forget_length=True) ) diff --git a/src/coffea/ml_tools/xgboost_wrapper.py b/src/coffea/ml_tools/xgboost_wrapper.py index 6c8271914..f549059b2 100644 --- a/src/coffea/ml_tools/xgboost_wrapper.py +++ b/src/coffea/ml_tools/xgboost_wrapper.py @@ -29,8 +29,10 @@ def __init__(self, fname): ) raise _xgboost_import_error - nonserializable_attribute.__init__(self, ["xgbooster"]) + nonserializable_attribute.__init__(self, []) + self.xgboost_file = fname + self.xgbooster = self._create_xgbooster() def _create_xgbooster(self) -> xgboost.Booster: # Automatic detection of compressed model file diff --git a/src/coffea/processor/test_items/NanoEventsProcessor.py b/src/coffea/processor/test_items/NanoEventsProcessor.py index 9da016b7d..53c074f3f 100644 --- a/src/coffea/processor/test_items/NanoEventsProcessor.py +++ b/src/coffea/processor/test_items/NanoEventsProcessor.py @@ -69,6 +69,9 @@ def process(self, events): # print(dimuon["0"].behavior) dimuon = dimuon["0"] + dimuon["1"] + if "GenPart" in events.fields: + _ = events.GenPart.parent.pt + output["pt"].fill(dataset=dataset, pt=dak.flatten(muon_pt)) output["mass"].fill(dataset=dataset, mass=dak.flatten(dimuon.mass)) output["cutflow"]["%s_pt" % dataset] = dak.sum(dak.num(events.Muon, axis=1)) diff --git a/src/coffea/util.py b/src/coffea/util.py index bfb0b5119..55a99232a 100644 --- a/src/coffea/util.py +++ b/src/coffea/util.py @@ -36,21 +36,20 @@ import lz4.frame -def load(filename): +def load(filename, mode="rb"): """Load a coffea file from disk""" - with lz4.frame.open(filename) as fin: + with lz4.frame.open(filename, mode) as fin: output = cloudpickle.load(fin) return output -def save(output, filename): +def save(output, filename, mode="wb"): """Save a coffea object or collection thereof to disk This function can accept any picklable object. Suggested suffix: ``.coffea`` """ - with lz4.frame.open(filename, "wb") as fout: - thepickle = cloudpickle.dumps(output) - fout.write(thepickle) + with lz4.frame.open(filename, mode) as fout: + cloudpickle.dump(output, fout) def _hex(string): diff --git a/tests/test_dataset_tools.py b/tests/test_dataset_tools.py index 0a8ec3b43..12566869d 100644 --- a/tests/test_dataset_tools.py +++ b/tests/test_dataset_tools.py @@ -7,9 +7,11 @@ apply_to_fileset, filter_files, get_failed_steps_for_fileset, + load_taskgraph, max_chunks, max_files, preprocess, + save_taskgraph, slice_chunks, slice_files, ) @@ -214,12 +216,13 @@ def test_tuple_data_manipulation_output(allow_read_errors_with_report): _my_analysis_output_2, _runnable_result, uproot_options={"allow_read_errors_with_report": allow_read_errors_with_report}, + return_events=True, ) if allow_read_errors_with_report: assert isinstance(out, tuple) - assert len(out) == 2 - out, report = out + assert len(out) == 3 + _, out, report = out assert isinstance(out, dict) assert isinstance(report, dict) assert out.keys() == {"ZJets", "Data"} @@ -234,8 +237,10 @@ def test_tuple_data_manipulation_output(allow_read_errors_with_report): assert isinstance(report["ZJets"], dask_awkward.Array) assert isinstance(report["Data"], dask_awkward.Array) else: - assert isinstance(out, dict) + assert isinstance(out, tuple) assert len(out) == 2 + _, out = out + assert isinstance(out, dict) assert out.keys() == {"ZJets", "Data"} assert isinstance(out["ZJets"], tuple) assert isinstance(out["Data"], tuple) @@ -249,12 +254,13 @@ def test_tuple_data_manipulation_output(allow_read_errors_with_report): _my_analysis_output_3, _runnable_result, uproot_options={"allow_read_errors_with_report": allow_read_errors_with_report}, + return_events=True, ) if allow_read_errors_with_report: assert isinstance(out, tuple) - assert len(out) == 2 - out, report = out + assert len(out) == 3 + _, out, report = out assert isinstance(out, dict) assert isinstance(report, dict) assert out.keys() == {"ZJets", "Data"} @@ -269,8 +275,10 @@ def test_tuple_data_manipulation_output(allow_read_errors_with_report): assert isinstance(report["ZJets"], dask_awkward.Array) assert isinstance(report["Data"], dask_awkward.Array) else: - assert isinstance(out, dict) + assert isinstance(out, tuple) assert len(out) == 2 + _, out = out + assert isinstance(out, dict) assert out.keys() == {"ZJets", "Data"} assert isinstance(out["ZJets"], tuple) assert isinstance(out["Data"], tuple) @@ -285,14 +293,17 @@ def test_tuple_data_manipulation_output(allow_read_errors_with_report): "proc_and_schema", [(NanoTestProcessor, BaseSchema), (NanoEventsProcessor, NanoAODSchema)], ) -def test_apply_to_fileset(proc_and_schema): +@pytest.mark.parametrize("delayed_taskgraph_calc", [True, False]) +def test_apply_to_fileset(proc_and_schema, delayed_taskgraph_calc): proc, schemaclass = proc_and_schema with Client() as _: - to_compute = apply_to_fileset( + _, to_compute = apply_to_fileset( proc(), _runnable_result, schemaclass=schemaclass, + parallelize_with_dask=delayed_taskgraph_calc, + return_events=True, ) out = dask.compute(to_compute)[0] @@ -301,10 +312,12 @@ def test_apply_to_fileset(proc_and_schema): assert out["Data"]["cutflow"]["Data_pt"] == 84 assert out["Data"]["cutflow"]["Data_mass"] == 66 - to_compute = apply_to_fileset( + _, to_compute = apply_to_fileset( proc(), max_chunks(_runnable_result, 1), schemaclass=schemaclass, + parallelize_with_dask=delayed_taskgraph_calc, + return_events=True, ) out = dask.compute(to_compute)[0] @@ -325,10 +338,11 @@ def test_apply_to_fileset_hinted_form(): save_form=True, ) - to_compute = apply_to_fileset( + _, to_compute = apply_to_fileset( NanoEventsProcessor(), dataset_runnable, schemaclass=NanoAODSchema, + return_events=True, ) out = dask.compute(to_compute)[0] @@ -536,15 +550,18 @@ def test_slice_chunks(): } -def test_recover_failed_chunks(): +@pytest.mark.parametrize("delayed_taskgraph_calc", [True, False]) +def test_recover_failed_chunks(delayed_taskgraph_calc): with Client() as _: - to_compute = apply_to_fileset( + _, to_compute, reports = apply_to_fileset( NanoEventsProcessor(), _starting_fileset_with_steps, schemaclass=NanoAODSchema, uproot_options={"allow_read_errors_with_report": True}, + parallelize_with_dask=delayed_taskgraph_calc, + return_events=True, ) - out, reports = dask.compute(*to_compute) + out, reports = dask.compute(to_compute, reports) failed_fset = get_failed_steps_for_fileset(_starting_fileset_with_steps, reports) assert failed_fset == { @@ -566,3 +583,51 @@ def test_recover_failed_chunks(): } } } + + +@pytest.mark.parametrize( + "proc_and_schema", + [(NanoTestProcessor, BaseSchema), (NanoEventsProcessor, NanoAODSchema)], +) +@pytest.mark.parametrize( + "with_report", + [True, False], +) +def test_task_graph_serialization(proc_and_schema, with_report): + proc, schemaclass = proc_and_schema + + with Client() as _: + output = apply_to_fileset( + proc(), + _runnable_result, + schemaclass=schemaclass, + parallelize_with_dask=False, + uproot_options={"allow_read_errors_with_report": with_report}, + return_events=True, + ) + + events = output[0] + to_compute = output[1:] + + save_taskgraph( + "./test_task_graph_serialization.hlg", + events, + to_compute, + optimize_graph=False, + ) + + _, to_compute_serdes, is_optimized = load_taskgraph( + "./test_task_graph_serialization.hlg" + ) + + print(to_compute_serdes) + + if len(to_compute_serdes) > 1: + (out, _) = dask.compute(*to_compute_serdes) + else: + (out,) = dask.compute(*to_compute_serdes) + + assert out["ZJets"]["cutflow"]["ZJets_pt"] == 18 + assert out["ZJets"]["cutflow"]["ZJets_mass"] == 6 + assert out["Data"]["cutflow"]["Data_pt"] == 84 + assert out["Data"]["cutflow"]["Data_mass"] == 66