Skip to content
This repository was archived by the owner on Mar 29, 2023. It is now read-only.

Latest commit

Β 

History

History
457 lines (240 loc) Β· 15.3 KB

lazycluster.cluster.runtime_cluster.md

File metadata and controls

457 lines (240 loc) Β· 15.3 KB

module lazycluster.cluster.runtime_cluster

Module comprising the abstract RuntimeCluster class with its related launcher strategy classes.

Note: The design of the launcher classes follows the strategy pattern.


class MasterLauncher

Abstract class for implementing the strategy for launching the master instance of the cluster.

method __init__

__init__(runtime_group: RuntimeGroup)

Initialization method.

Args:

  • runtime_group: The group where the workers will be started.

property port

The port where the master instance is started on. Will be None if not yet started.

Returns:

  • Optional[int]: The master port.

property process

The process object where the master instance was started in.

Returns:

  • Optional[Popen]: The process object or None if not yet started.

method cleanup

cleanup() β†’ None

Release all resources.


method start

start(
    ports: Union[List[int], int],
    timeout: int = 3,
    debug: bool = False
) β†’ List[int]

Launch a master instance.

Note:

If you create a custom subclass of MasterLauncher which will not start the master instance on localhost then you should pass the debug flag on to execute_task() of the RuntimeGroup or Runtime so that you can benefit from the debug feature of RuntimeTask.execute().

Args:

  • ports: Port where the master should be started. If a list is given then the first port that is free in the RuntimeGroup will be used. The actual chosen port can requested via the property port.
  • timeout: Timeout (s) after which an MasterStartError is raised if master instance not started yet.
  • debug: If True, stdout/stderr from the runtime will be printed to stdout of localhost. If, False then the stdout/stderr will be added to python logger with level debug after each task step. Defaults to False.

Returns:

  • List[int]: In case a port list was given the updated port list will be returned. Otherwise an empty list.

Raises:

  • PortInUseError: If a single port is given and it is not free in the RuntimeGroup.
  • NoPortsLeftError: If a port list was given and none of the ports is actually free in the RuntimeGroup.
  • MasterStartError: If master was not started after the specified timeout.

class WorkerLauncher

Abstract class for implementing the strategy for launching worker instances within a RuntimeGroup.

In order to implement a new concrete WorkerLauncher subclass you need to implement the start method. Please consider the comments of the start method because some internal variables need to be set in the concrete implementation.

Moreover, the setup_worker_ssh_tunnels() method can be used to setup ssh tunnels so that all entities can talk to each other.

method __init__

__init__(runtime_group: RuntimeGroup)

Initialization method.

Args:

  • runtime_group: The group where the workers will be started in.

property ports_per_host

Dictionary with the host as key and a port list as value. The list contains all ports where a worker instance is reachable on the respective host.

Returns:

  • Dict[str, List[int]]: The ports per host as a dictionary.

method cleanup

cleanup() β†’ None

Release all resources.


method setup_worker_ssh_tunnels

setup_worker_ssh_tunnels() β†’ None

Set up ssh tunnel for workers such that all communication is routed over the local machine and all entities can talk to each other on localhost.

Note:

This method needs to be called if the communication between the worker instances is necessary, e.g. in case of DASK or Apache Flink, where data needs to be shuffled between the different entities.

Raises:

  • ValueError: If host is not contained.
  • PortInUseError: If group_port is occupied on the local machine.
  • NoPortsLeftError: If group_ports was given and none of the ports was free.

method start

start(
    worker_count: int,
    master_port: int,
    ports: List[int],
    debug: bool = False
) β†’ List[int]

Launches the worker instances in the RuntimeGroup.

Args:

  • worker_count: The number of worker instances to be started in the group.
  • master_port: The port of the master instance.
  • ports: The ports to be used for starting the workers. Only ports from the list will be chosen that are actually free.
  • debug: If True, stdout/stderr from the runtime will be printed to stdout of localhost. If, False then the stdout/stderr will be added to python logger with level debug after each task step. Defaults to False.

Returns:

  • List[int]: The updated port list after starting the workers, i.e. the used ones were removed.

Raises:

  • NoPortsLeftError: If there are not enough free ports for starting all workers.

class RuntimeCluster

Abstract cluster class.

All further cluster implementations should inherit from this class either directly (e.g. the abstract class MasterWorkerCluster) or indirectly (e.g. the DaskCluster which is an concrete implementation of the MasterWorkerCluster).


class MasterWorkerCluster

Class for clusters following a master-worker architecture.

Usually you want to inherit from this class and do not want to use it directly. It is recommended to treat this class as an abstract class or an interface.

Examples: Create a cluster with all Runtimes detected by the RuntimeManager.

     from lazycluster import RuntimeManager
     cluster = MyMasterWorkerClusterImpl(RuntimeManager().create_group())
     cluster.start()
    ``` 

 Use different strategies for launching the master and the worker instance as the default ones by providing  custom implementation of `MasterLauncher` and `WorkerLauncher`. 

```python
     cluster = MyMasterWorkerClusterImpl(RuntimeManager().create_group(),
                                         MyMasterLauncherImpl(),
                                         MyWorkerLauncherImpl()
     cluster.start()
    ``` 

<a href="https://github.com/ml-tooling/lazycluster/blob/main/src/lazycluster/cluster/runtime_cluster.py#L224"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>

### <kbd>method</kbd> `__init__`

```python
__init__(
    runtime_group: RuntimeGroup,
    ports: Optional[List[int]] = None,
    master_launcher: Optional[MasterLauncher] = None,
    worker_launcher: Optional[WorkerLauncher] = None
)

Initialization method.

Args:

  • runtime_group: The RuntimeGroup contains all Runtimes which can be used for starting the cluster entities.
  • ports: The list of ports which will be used to instantiate a cluster. Defaults to list(range(self.DEFAULT_PORT_RANGE_START, self.DEFAULT_PORT_RANGE_END).
  • master_launcher: Optionally, an instance implementing the MasterLauncher interface can be given, which implements the strategy for launching the master instances in the cluster. If None, then the default of the concrete cluster implementation will be chosen.
  • worker_launcher: Optionally, an instance implementing the WorkerLauncher interface can be given, which implements the strategy for launching the worker instances. If None, then the default of the concrete cluster implementation will be chosen.

property master_port

The port where the master instance was started. None, if not yet started.

Returns:

  • Optional[int]: The master port.

property runtime_group

The RuntimeGroup.

Returns:

  • RuntimeGroup: The used group.

method cleanup

cleanup() β†’ None

Release all resources.


method print_log

print_log() β†’ None

Print the execution log.

Note:

This method is a convenient wrapper for the equivalent method of the contained RuntimeGroup.


method start

start(
    worker_count: Optional[int] = None,
    master_port: Optional[int] = None,
    debug: bool = False
) β†’ None

Convenient method for launching the cluster.

Internally, self.start_master() and self.start_workers() will be called.

Args:

  • master_port: Port of the cluster master. Will be passed on to self.start(), hence see respective method for further details.
  • worker_count: The number of worker instances to be started in the cluster. Will be passed on to self.start(), hence see respective method for further details.
  • debug: If True, stdout/stderr from the runtime will be printed to stdout of localhost. If, False then the stdout/stderr will be added to python logger with level debug after each RuntimeTask step. Defaults to False.

method start_master

start_master(
    master_port: Optional[int] = None,
    timeout: int = 3,
    debug: bool = False
) β†’ None

Start the master instance.

Note:

How the master is actually started is determined by the the actual MasterLauncher implementation. Another implementation adhering to the MasterLauncher interface can be provided in the constructor of the cluster class.

Args:

  • master_port: Port of the master instance. Defaults to self.DEFAULT_MASTER_PORT, but another one is chosen if the port is not free within the group. The actual chosen port can be requested via self.master_port.
  • timeout: Timeout (s) after which an MasterStartError is raised if master instance not started yet.
  • debug: If True, stdout/stderr from the runtime will be printed to stdout of localhost. Has no effect for if the master instance is started locally, what default MasterLauncher implementations usually do.

Raises:

  • PortInUseError: If a single port is given and it is not free in the RuntimeGroup.
  • NoPortsLeftError: If there are no free ports left in the port list for instantiating the master.
  • MasterStartError: If master was not started after the specified timeout.

method start_workers

start_workers(count: Optional[int] = None, debug: bool = False) β†’ None

Start the worker instances.

Note:

How workers are actually started is determined by the the actual WorkerLauncher implementation. Another implementation adhering to the WorkerLauncher interface can be provided in the constructor of the cluster class.

Args:

  • count: The number of worker instances to be started in the cluster. Defaults to the number of runtimes in the cluster.
  • debug: If True, stdout/stderr from the runtime will be printed to stdout of localhost. If, False then the stdout/stderr will be added to python logger with level debug after each RuntimeTask step. Defaults to False.

Raises:

  • NoPortsLeftError: If there are no free ports left in the port list for instantiating new worker entities.

This file was automatically generated via lazydocs.