Module for conveniently managing a DASK cluster.
Concrete implementation of the MasterLauncher
interface. See its documentation to get a list of the inherited methods and attributes.
This class implements the logic for starting a the DASK master instance (i.e. scheduler in DASK terms) on localhost.
The port where the master instance is started on. Will be None if not yet started.
Returns:
Optional[int]
: The master port.
The process object where the master instance was started in.
Returns:
Optional[Popen]
: The process object or None if not yet started.
cleanup() → None
Release all resources.
start(
ports: Union[List[int], int],
timeout: int = 3,
debug: bool = False
) → List[int]
Launch a master instance.
Note:
If you create a custom subclass of MasterLauncher which will not start the master instance on localhost then you should pass the debug flag on to
execute_task()
of theRuntimeGroup
orRuntime
so that you can benefit from the debug feature ofRuntimeTask.execute()
.
Args:
ports
: Port where the master should be started. If a list is given then the first port that is free in theRuntimeGroup
will be used. The actual chosen port can be requested via the propertyport
.timeout
: Timeout (s) after which an MasterStartError is raised if master instance not started yet.debug
: IfTrue
, stdout/stderr from the runtime will be printed to stdout of localhost. If,False
then the stdout/stderr will be added to python logger with level debug after eachRuntimeTask
step. Defaults toFalse
.
Returns:
List[int]
: In case a port list was given the updated port list will be returned. Otherwise an empty list.
Raises:
PortInUseError
: If a single port is given and it is not free in theRuntimeGroup
.NoPortsLeftError
: If a port list was given and none of the ports is actually free in theRuntimeGroup
.MasterStartError
: If master was not started after the specifiedtimeout
.
WorkerLauncher implementation for launching DASK workers in a round robin manner. See its documentation to get a list of the inherited methods and attributes.
__init__(runtime_group: RuntimeGroup)
Initialization method.
Args:
runtime_group
: The group where the workers will be started.
Dictionary with the host as key and a port list as value. The list contains all ports where a worker instance is reachable on the respective host.
Returns:
Dict[str, List[int]]
: The ports per host as a dictionary.
cleanup() → None
Release all resources.
start(
worker_count: int,
master_port: int,
ports: List[int],
debug: bool = False
) → List[int]
Launches the worker instances in the RuntimeGroup
.
Args:
worker_count
: The number of worker instances to be started in the group.master_port
: The port of the master instance.ports
: The ports to be used for starting the workers. Only ports from the list will be chosen that are actually free.debug
: IfTrue
, stdout/stderr from the runtime will be printed to stdout of localhost. If,False
then the stdout/stderr will be added to python logger with level debug after eachRuntimeTask
step. Defaults toFalse
.
Returns:
List[int]
: The updated port list after starting the workers, i.e. the used ones were removed.
Raises:
NoPortsLeftError
: If there are not enough free ports for starting all workers.
Convenient class for launching a Dask cluster in a RuntimeGroup
.
DaskCluster inherits from MasterWorkerCluster. See its documentation to get a list of the inherited methods and attributes.
The number of DASK workers defaults to the number of Runtimes
in the used RuntimeGroup
. This number can be adjusted so that more or less workers than available Runtimes
can be used. Per default the desired number of workers is started in a round robin way as implemented in RoundRobinLauncher
. Consequently, this leads to an equal distribution of DASK workers in the RuntimeGroup
. You can provide a custom implementation inheriting from the WorkerLauncher
class in order to execute a different strategy how workers should be started. The DASK master (i.e. scheduler) will always be started on localhost as implemented in LocalMasterLauncher
. This behavior can also be changed by providing a custom implementation inheriting from the MasterLauncher
.
__init__(
runtime_group: RuntimeGroup,
ports: Optional[List[int]] = None,
master_launcher: Optional[MasterLauncher] = None,
worker_launcher: Optional[WorkerLauncher] = None
)
Initialization method.
Args:
runtime_group
: TheRuntimeGroup
contains allRuntimes
which can be used for starting the DASK entities.ports
: The list of ports which will be used to instantiate a cluster. Defaults tolist(range(self.DEFAULT_PORT_RANGE_START, self.DEFAULT_PORT_RANGE_END))
.master_launcher
: Optionally, an instance implementing theMasterLauncher
interface can be given, which implements the strategy for launching the master instances in the cluster. If None, thenLocalMasterLauncher
is used.worker_launcher
: Optionally, an instance implementing theWorkerLauncher
interface can be given, which implements the strategy for launching the worker instances. If None, thenRoundRobinLauncher
is used.
The port where the master instance was started. None, if not yet started.
Returns:
Optional[int]
: The master port.
The RuntimeGroup.
Returns:
RuntimeGroup
: The used group.
cleanup() → None
Release all resources.
get_client(timeout: int = 2) → Client
Get a connected Dask client.
Args:
timeout
: The timeout (s) value passed on to the DaskClient
constructor. Defaults to 2.
Raises:
TimeoutError
: If client connectiontimeout
expires.
This file was automatically generated via lazydocs.