Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Taskvine executor backport #1289

Open
wants to merge 7 commits into
base: backports-v0.7.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,30 @@ jobs:
conda-pack --output coffea-env.tar.gz
python wq.py coffea-env.tar.gz

testtaskvine:
runs-on: ubuntu-latest
timeout-minutes: 15
needs: pre-commit
strategy:
matrix:
python-version: ["3.11"]
name: test coffea-taskvine

steps:
- uses: actions/checkout@v3
- name: Set up Conda
uses: conda-incubator/[email protected]
with:
auto-update-conda: true
python-version: ${{ matrix.python-version }}
- name: Test TaskVine Executor
shell: bash -l {0}
run: |
conda create --yes --name coffea-env -c conda-forge python=${{ matrix.python-version }} ndcctools pytest 'setuptools<71'
conda activate coffea-env
python -m pip install --ignore-installed .
python -m pytest tests/test_taskvine_executor.py


# testskyhookjob:
# runs-on: ubuntu-latest
Expand Down
2 changes: 2 additions & 0 deletions coffea/processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
DaskExecutor,
ParslExecutor,
WorkQueueExecutor,
TaskVineExecutor,
Runner,
run_spark_job,
)
Expand Down Expand Up @@ -128,6 +129,7 @@ def _run_x_job(
"DaskExecutor",
"ParslExecutor",
"WorkQueueExecutor",
"TaskVineExecutor",
"Runner",
"run_spark_job",
"accumulate",
Expand Down
193 changes: 190 additions & 3 deletions coffea/processor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,187 @@ def _wait_for_merges(FH: _FuturesHolder, executor: ExecutorBase) -> Accumulatabl
)


@dataclass
class TaskVineExecutor(ExecutorBase):
"""Execute using Work Queue

For more information, see :ref:`intro-coffea-vine`

Parameters
----------
items : sequence or generator
Sequence of input arguments
function : callable
A function to be called on each input, which returns an accumulator instance
accumulator : Accumulatable
An accumulator to collect the output of the function
status : bool
If true (default), enable progress bar
unit : str
Label of progress bar unit
desc : str
Label of progress bar description
compression : int, optional
Compress accumulator outputs in flight with LZ4, at level specified (default 1)
`None`` sets level to 1 (minimal compression)
# taskvine specific options:
cores : int
Maximum number of cores for work queue task. If unset, use a whole worker.
memory : int
Maximum amount of memory (in MB) for work queue task. If unset, use a whole worker.
disk : int
Maximum amount of disk space (in MB) for work queue task. If unset, use a whole worker.
gpus : int
Number of GPUs to allocate to each task. If unset, use zero.
resource_monitor : str
If given, one of 'off', 'measure', or 'watchdog'. Default is 'off'.
- 'off': turns off resource monitoring. Overriden to 'watchdog' if resources_mode
is not set to 'fixed'.
- 'measure': turns on resource monitoring for Work Queue. The
resources used per task are measured.
- 'watchdog': in addition to measuring resources, tasks are terminated if they
go above the cores, memory, or disk specified.
resources_mode : str
one of 'fixed', 'max-seen', or 'max-throughput'. Default is 'max-seen'.
Sets the strategy to automatically allocate resources to tasks.
- 'fixed': allocate cores, memory, and disk specified for each task.
- 'max-seen' or 'auto': use the cores, memory, and disk given as maximum values to allocate,
but first try each task by allocating the maximum values seen. Leads
to a good compromise between parallelism and number of retries.
- 'max-throughput': Like max-seen, but first tries the task with an
allocation that maximizes overall throughput.
If resources_mode is other than 'fixed', preprocessing and
accumulation tasks always use the 'max-seen' strategy, as the
former tasks always use the same resources, the latter has a
distribution of resources that increases over time.
split_on_exhaustion: bool
Whether to split a processing task in half according to its chunksize when it exhausts its
the cores, memory, or disk allocated to it. If False, a task that exhausts resources
permanently fails. Default is True.
fast_terminate_workers: int
Terminate workers on which tasks have been running longer than average.
The time limit is computed by multiplying the average runtime of tasks
by the value of 'fast_terminate_workers'. Since there are
legitimately slow tasks, no task may trigger fast termination in
two distinct workers. Less than 1 disables it.
checkpoint_proportion: float
Whether to bring back accumulation results to the manager. If proportion of checkpointed inputs
is less than this proportion, the accumulation is checkpointed. >=1 forces all accumulations to
be checkpointed, with <=0 no accumulation is checkpointed. Default is 0.5

manager_name : str
Name to refer to this work queue manager.
Sets port to 0 (any available port) if port not given.
port : int or tuple(int, int)
Port number or range (inclusive of ports )for work queue manager program.
Defaults to 9123 if manager_name not given.
password_file: str
Location of a file containing a password used to authenticate workers.
ssl: bool or tuple(str, str)
Enable ssl encryption between manager and workers. If a tuple, then it
should be of the form (key, cert), where key and cert are paths to the files
containing the key and certificate in pem format. If True, auto-signed temporary
key and cert are generated for the session.

extra_input_files: list
A list of files in the current working directory to send along with each task.
Useful for small custom libraries and configuration files needed by the processor.
x509_proxy : str
Path to the X509 user proxy. If None (the default), use the value of the
environment variable X509_USER_PROXY, or fallback to the file /tmp/x509up_u${UID} if
exists. If False, disables the default behavior and no proxy is sent.

environment_file : optional, str
Conda python environment tarball to use. If not given, assume that
the python environment is already setup at the execution site.

treereduction : int
Number of processed chunks per accumulation task. Defaults is 20.
concurrent_reads : int
Number of processed chunks concurrently read per accumulation task. Defaults is 2.
replicas : int
Number of replicas for temporary results in the cluster before checkpointing to manager
with an accumulation. If less than 2, only one copy of a result is kept, which reduces
cluster disk usage, but results need to be regenerated if workers are lost.

verbose : bool
If true, emit a message on each task submission and completion.
Default is false.
print_stdout : bool
If true (default), print the standard output of work queue task on completion.

filepath: str
Path to the parent directory where to create the staging directory.
Default is "." (current working directory).

custom_init : function, optional
A function that takes as an argument the queue's WorkQueue object.
The function is called just before the first work unit is submitted
to the queue.
"""

# Standard executor options:
compression: Optional[int] = 1
retries: int = 2 # task executes at most 3 times
# wq executor options:
manager_name: Optional[str] = None
port: Optional[Union[int, Tuple[int, int]]] = None
filepath: str = "."
events_total: Optional[int] = None
x509_proxy: Optional[str] = None
verbose: bool = False
print_stdout: bool = False
status_display_interval: Optional[int] = 10
password_file: Optional[str] = None
ssl: Union[bool, Tuple[str, str]] = False
environment_file: Optional[str] = None
extra_input_files: List = field(default_factory=list)
resource_monitor: Optional[str] = "off"
resources_mode: Optional[str] = "max-seen"
split_on_exhaustion: Optional[bool] = True
fast_terminate_workers: Optional[int] = None
checkpoint_proportion: Optional[float] = 0.5
cores: Optional[int] = None
memory: Optional[int] = None
disk: Optional[int] = None
gpus: Optional[int] = None
treereduction: int = 10
concurrent_reads: int = 2
replicas: int = 3
chunksize: int = 100000
dynamic_chunksize: Optional[Dict] = None
custom_init: Optional[Callable] = None

# deprecated
bar_format: Optional[str] = None
chunks_accum_in_mem: Optional[int] = None
master_name: Optional[str] = None
chunks_per_accum: Optional[int] = None
wrapper: Optional[str] = None
debug_log: Optional[str] = None
stats_log: Optional[str] = None
transactions_log: Optional[str] = None
tasks_accum_log: Optional[str] = None

def __call__(
self,
items: Iterable,
function: Callable,
accumulator: Accumulatable,
):
from .taskvine_executor import run

return (
run(
self,
items,
function,
accumulator,
),
0,
)


@dataclass
class WorkQueueExecutor(ExecutorBase):
"""Execute using Work Queue
Expand Down Expand Up @@ -1814,7 +1995,9 @@ def run(
processor_instance=pi_to_send,
)

if self.format == "root" and isinstance(self.executor, WorkQueueExecutor):
if self.format == "root" and isinstance(
self.executor, (TaskVineExecutor, WorkQueueExecutor)
):
# keep chunks in generator, use a copy to count number of events
# this is cheap, as we are reading from the cache
chunks_to_count = self.preprocess(fileset, treename)
Expand All @@ -1829,7 +2012,7 @@ def run(
"unit": "chunk",
"function_name": type(processor_instance).__name__,
}
if isinstance(self.executor, WorkQueueExecutor):
if isinstance(self.executor, (TaskVineExecutor, WorkQueueExecutor)):
exe_args.update(
{
"unit": "event",
Expand All @@ -1856,7 +2039,11 @@ def run(
processor_instance.postprocess(wrapped_out["out"])

if "metrics" in wrapped_out.keys():
wrapped_out["metrics"]["chunks"] = len(chunks)
if isinstance(self.executor, (TaskVineExecutor, WorkQueueExecutor)):
wrapped_out["metrics"]["chunks"] = len(wrapped_out["processed"])
else:
wrapped_out["metrics"]["chunks"] = len(chunks_to_count)

for k, v in wrapped_out["metrics"].items():
if isinstance(v, set):
wrapped_out["metrics"][k] = list(v)
Expand Down
Loading
Loading