Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: calculate analysis task graphs in parallel via dask.delayed #1019

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
9 changes: 8 additions & 1 deletion src/coffea/dataset_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
from coffea.dataset_tools.apply_processor import apply_to_dataset, apply_to_fileset
from coffea.dataset_tools.apply_processor import (
apply_to_dataset,
apply_to_fileset,
load_taskgraph,
save_taskgraph,
)
from coffea.dataset_tools.manipulations import (
filter_files,
get_failed_steps_for_dataset,
Expand All @@ -14,6 +19,8 @@
"preprocess",
"apply_to_dataset",
"apply_to_fileset",
"save_taskgraph",
"load_taskgraph",
"max_chunks",
"slice_chunks",
"filter_files",
Expand Down
222 changes: 207 additions & 15 deletions src/coffea/dataset_tools/apply_processor.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from __future__ import annotations

import copy
from functools import partial
from typing import Any, Callable, Dict, Hashable, List, Set, Tuple, Union

import awkward
import cloudpickle
import dask.base
import dask.delayed
import dask_awkward
import lz4.frame

from coffea.dataset_tools.preprocess import (
DatasetSpec,
Expand All @@ -15,7 +19,7 @@
)
from coffea.nanoevents import BaseSchema, NanoAODSchema, NanoEventsFactory
from coffea.processor import ProcessorABC
from coffea.util import decompress_form
from coffea.util import decompress_form, load, save

DaskOutputBaseType = Union[
dask.base.DaskMethodsMixin,
Expand All @@ -31,13 +35,62 @@
GenericHEPAnalysis = Callable[[dask_awkward.Array], DaskOutputType]


def _pack_meta_to_wire(*collections):
unpacked, repacker = dask.base.unpack_collections(*collections)

output = []
for i in range(len(unpacked)):
output.append(unpacked[i])
if isinstance(
unpacked[i], (dask_awkward.Array, dask_awkward.Record, dask_awkward.Scalar)
):
output[-1]._meta = awkward.Array(
unpacked[i]._meta.layout.form.length_zero_array(),
behavior=unpacked[i]._meta.behavior,
attrs=unpacked[i]._meta.attrs,
)
packed_out = repacker(output)
return packed_out


def _unpack_meta_from_wire(*collections):
unpacked, repacker = dask.base.unpack_collections(*collections)

output = []
for i in range(len(unpacked)):
output.append(unpacked[i])
if isinstance(
unpacked[i], (dask_awkward.Array, dask_awkward.Record, dask_awkward.Scalar)
):
output[-1]._meta = awkward.Array(
unpacked[i]._meta.layout.to_typetracer(forget_length=True),
behavior=unpacked[i]._meta.behavior,
attrs=unpacked[i]._meta.attrs,
)
packed_out = repacker(output)
return packed_out


def _apply_analysis_wire(analysis, events_wire):
(events,) = _unpack_meta_from_wire(events_wire)
events._meta.attrs["@original_array"] = events
out = analysis(events)
out_wire = _pack_meta_to_wire(out)
return out_wire


def apply_to_dataset(
data_manipulation: ProcessorABC | GenericHEPAnalysis,
dataset: DatasetSpec | DatasetSpecOptional,
schemaclass: BaseSchema = NanoAODSchema,
metadata: dict[Hashable, Any] = {},
uproot_options: dict[str, Any] = {},
) -> DaskOutputType | tuple[DaskOutputType, dask_awkward.Array]:
parallelize_with_dask: bool = False,
) -> (
DaskOutputType
| tuple[DaskOutputType, dask_awkward.Array]
| tuple[dask_awkward.Array, DaskOutputType, dask_awkward.Array]
):
"""
Apply the supplied function or processor to the supplied dataset.
Parameters
Expand All @@ -52,6 +105,10 @@ def apply_to_dataset(
Metadata for the dataset that is accessible by the input analysis. Should also be dask-serializable.
uproot_options: dict[str, Any], default {}
Options to pass to uproot. Pass at least {"allow_read_errors_with_report": True} to turn on file access reports.
parallelize_with_dask: bool, default False
Create dask.delayed objects that will return the the computable dask collections for the analysis when computed.
return_events: bool, default True
Return the created events object, or not.

Returns
-------
Expand All @@ -64,37 +121,67 @@ def apply_to_dataset(
if maybe_base_form is not None:
maybe_base_form = awkward.forms.from_json(decompress_form(maybe_base_form))
files = dataset["files"]
events = NanoEventsFactory.from_root(
events_and_maybe_report = NanoEventsFactory.from_root(
files,
metadata=metadata,
schemaclass=schemaclass,
known_base_form=maybe_base_form,
uproot_options=uproot_options,
).events()

events = events_and_maybe_report
report = None
if isinstance(events, tuple):
events, report = events

out = None
analysis = None
if isinstance(data_manipulation, ProcessorABC):
out = data_manipulation.process(events)
analysis = data_manipulation.process
elif isinstance(data_manipulation, Callable):
out = data_manipulation(events)
analysis = data_manipulation
else:
raise ValueError("data_manipulation must either be a ProcessorABC or Callable")

out = None
if parallelize_with_dask:
(wired_events,) = _pack_meta_to_wire(events)
out = dask.delayed(
lambda: lz4.frame.compress(
cloudpickle.dumps(
partial(_apply_analysis_wire, analysis, wired_events)()
),
compression_level=6,
)
)()
if hasattr(dask.base, "function_cache"):
dask.base.function_cache.clear()
else:
out = analysis(events)
if not isinstance(out, tuple):
out = (out,)

if report is not None:
return out, report
return (out,)
return events, out, report
return events, out


def apply_to_fileset(
data_manipulation: ProcessorABC | GenericHEPAnalysis,
fileset: FilesetSpec | FilesetSpecOptional,
schemaclass: BaseSchema = NanoAODSchema,
uproot_options: dict[str, Any] = {},
) -> dict[str, DaskOutputType] | tuple[dict[str, DaskOutputType], dask_awkward.Array]:
parallelize_with_dask: bool = False,
scheduler: Callable | str | None = None,
return_events: bool = False,
) -> (
dict[str, DaskOutputType]
| tuple[dict[str, DaskOutputType], dict[str, dask_awkward.Array]]
| tuple[
dict[str, dask_awkward.Array],
dict[str, DaskOutputType],
dict[str, dask_awkward.Array],
]
):
"""
Apply the supplied function or processor to the supplied fileset (set of datasets).
Parameters
Expand All @@ -107,28 +194,133 @@ def apply_to_fileset(
The nanoevents schema to interpret the input dataset with.
uproot_options: dict[str, Any], default {}
Options to pass to uproot. Pass at least {"allow_read_errors_with_report": True} to turn on file access reports.
parallelize_with_dask: bool, default False
Create dask.delayed objects that will return the the computable dask collections for the analysis when computed.
scheduler: Callable | str | None, default None
If parallelize_with_dask is True, this specifies the dask scheduler used to calculate task graphs in parallel.

Returns
-------
events: dict[str, dask_awkward.Array]
The NanoEvents objects the analysis function was applied to.
out : dict[str, DaskOutputType]
The output of the analysis workflow applied to the datasets, keyed by dataset name.
report : dask_awkward.Array, optional
The file access report for running the analysis on the input dataset. Needs to be computed in simultaneously with the analysis to be accurate.
"""
events = {}
out = {}
analyses_to_compute = {}
report = {}
for name, dataset in fileset.items():
metadata = copy.deepcopy(dataset.get("metadata", {}))
if metadata is None:
metadata = {}
metadata.setdefault("dataset", name)
dataset_out = apply_to_dataset(
data_manipulation, dataset, schemaclass, metadata, uproot_options
data_manipulation,
dataset,
schemaclass,
metadata,
uproot_options,
parallelize_with_dask,
)
if isinstance(dataset_out, tuple) and len(dataset_out) > 1:
out[name], report[name] = dataset_out

if parallelize_with_dask:
if len(dataset_out) == 3:
events[name], analyses_to_compute[name], report[name] = dataset_out
elif len(dataset_out) == 2:
events[name], analyses_to_compute[name] = dataset_out
print(dataset_out)
else:
raise ValueError(
"apply_to_dataset only returns (events, outputs) or (events, outputs, reports)"
)
elif isinstance(dataset_out, tuple) and len(dataset_out) == 3:
events[name], out[name], report[name] = dataset_out
elif isinstance(dataset_out, tuple) and len(dataset_out) == 2:
events[name], out[name] = dataset_out
else:
out[name] = dataset_out[0]
raise ValueError(
"apply_to_dataset only returns (events, outputs) or (events, outputs, reports)"
)

if parallelize_with_dask:
(calculated_graphs,) = dask.compute(analyses_to_compute, scheduler=scheduler)
for name, compressed_taskgraph in calculated_graphs.items():
dataset_out_wire = cloudpickle.loads(
lz4.frame.decompress(compressed_taskgraph)
)
(out[name],) = _unpack_meta_from_wire(*dataset_out_wire)

for name in out:
if isinstance(out[name], tuple) and len(out[name]) == 1:
out[name] = out[name][0]

if len(report) > 0:
return out, report
return out
return (events, out, report) if return_events else (out, report)
return (events, out) if return_events else out


def save_taskgraph(filename, events, *data_products, optimize_graph=False):
"""
Save a task graph and its originating nanoevents to a file
Parameters
----------
filename: str
Where to save the resulting serialized taskgraph and nanoevents.
Suggested postfix ".hlg", after dask's HighLevelGraph object.
events: dict[str, dask_awkward.Array]
A dictionary of nanoevents objects.
data_products: dict[str, DaskOutputBaseType]
The data products resulting from applying an analysis to
a NanoEvents object. This may include report objects.
optimize_graph: bool, default False
Whether or not to save the task graph in its optimized form.

Returns
-------
"""
(events_wire,) = _pack_meta_to_wire(events)

if len(data_products) == 0:
raise ValueError(
"You must supply at least one analysis data product to save a task graph!"
)

data_products_out = data_products
if optimize_graph:
data_products_out = dask.optimize(data_products)

data_products_wire = _pack_meta_to_wire(*data_products_out)

save(
{
"events": events_wire,
"data_products": data_products_wire,
"optimized": optimize_graph,
},
filename,
)


def load_taskgraph(filename):
"""
Load a task graph and its originating nanoevents from a file.
Parameters
----------
filename: str
The file from which to load the task graph.
Returns
_______
"""
graph_information_wire = load(filename)

(events,) = _unpack_meta_from_wire(graph_information_wire["events"])
(data_products,) = _unpack_meta_from_wire(*graph_information_wire["data_products"])
optimized = graph_information_wire["optimized"]

for dataset_name in events:
events[dataset_name]._meta.attrs["@original_array"] = events[dataset_name]

return events, data_products, optimized
11 changes: 7 additions & 4 deletions src/coffea/lumi_tools/lumi_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,12 @@ def __call__(self, runs, lumis):
"""

def apply(runs, lumis):
backend = awkward.backend(runs)
# fill numba typed dict
_masks = Dict.empty(key_type=types.uint32, value_type=types.uint32[:])
for k, v in self._masks.items():
_masks[k] = v
if backend != "typetracer":
for k, v in self._masks.items():
_masks[k] = v

runs_orig = runs
if isinstance(runs, awkward.highlevel.Array):
Expand All @@ -185,10 +187,11 @@ def apply(runs, lumis):
awkward.typetracer.length_zero_if_typetracer(lumis)
)
mask_out = numpy.zeros(dtype="bool", shape=runs.shape)
LumiMask._apply_run_lumi_mask_kernel(_masks, runs, lumis, mask_out)
if backend != "typetracer":
LumiMask._apply_run_lumi_mask_kernel(_masks, runs, lumis, mask_out)
if isinstance(runs_orig, awkward.Array):
mask_out = awkward.Array(mask_out)
if awkward.backend(runs_orig) == "typetracer":
if backend == "typetracer":
mask_out = awkward.Array(
mask_out.layout.to_typetracer(forget_length=True)
)
Expand Down
4 changes: 3 additions & 1 deletion src/coffea/ml_tools/xgboost_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ def __init__(self, fname):
)
raise _xgboost_import_error

nonserializable_attribute.__init__(self, ["xgbooster"])
nonserializable_attribute.__init__(self, [])

self.xgboost_file = fname
self.xgbooster = self._create_xgbooster()

def _create_xgbooster(self) -> xgboost.Booster:
# Automatic detection of compressed model file
Expand Down
3 changes: 3 additions & 0 deletions src/coffea/processor/test_items/NanoEventsProcessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ def process(self, events):
# print(dimuon["0"].behavior)
dimuon = dimuon["0"] + dimuon["1"]

if "GenPart" in events.fields:
_ = events.GenPart.parent.pt

output["pt"].fill(dataset=dataset, pt=dak.flatten(muon_pt))
output["mass"].fill(dataset=dataset, mass=dak.flatten(dimuon.mass))
output["cutflow"]["%s_pt" % dataset] = dak.sum(dak.num(events.Muon, axis=1))
Expand Down
Loading
Loading