executorlib.executor.slurm.SlurmClusterExecutor#
- class executorlib.executor.slurm.SlurmClusterExecutor(max_workers: int | None = None, cache_directory: str | None = None, max_cores: int | None = None, resource_dict: dict | None = None, pysqa_config_directory: str | None = None, pmi_mode: str | None = None, hostname_localhost: bool | None = None, block_allocation: bool = False, init_function: Callable | None = None, disable_dependencies: bool = False, refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: str | None = None, export_workflow_filename: str | None = None, log_obj_size: bool = False, wait: bool = True, openmpi_oversubscribe: bool = False)[source]#
Bases:
BaseExecutorThe executorlib.SlurmClusterExecutor leverages either the message passing interface (MPI), the SLURM workload manager or preferable the flux framework for distributing python functions within a given resource allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the executorlib.SlurmClusterExecutor can be executed in a serial python process and does not require the python script to be executed with MPI. It is even possible to execute the executorlib.SlurmClusterExecutor directly in an interactive Jupyter notebook.
- Parameters:
max_workers (int) – for backwards compatibility with the standard library, max_workers also defines the number of cores which can be used in parallel - just like the max_cores parameter. Using max_cores is recommended, as computers have a limited number of compute cores.
cache_directory (str, optional) – The directory to store cache files. Defaults to “executorlib_cache”.
max_cores (int) – defines the number cores which can be used in parallel
resource_dict (dict) –
A dictionary of resources required by the task. With the following keys: * cores (int): number of MPI cores to be used for each function call * threads_per_core (int): number of OpenMP threads to be used for each function call * gpus_per_core (int): number of GPUs per worker - defaults to 0 * cwd (str): current working directory where the parallel python task is executed * cache_key (str): Rather than using the internal hashing of executorlib the user can
provide an external cache_key to identify tasks on the file system.
cache_directory (str): The directory to store cache files.
num_nodes (int): number of compute nodes used for the evaluation of the Python function.
- exclusive (bool): boolean flag to reserve exclusive access to selected compute nodes -
do not allow other tasks to use the same compute node.
- error_log_file (str): path to the error log file, primarily used to merge the log of
multiple tasks in one file.
- run_time_max (int): the maximum time the execution of the submitted Python function is
allowed to take in seconds.
- priority (int): the queuing system priority assigned to a given Python function to
influence the scheduling.
slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
pysqa_config_directory (str, optional) – path to the pysqa config directory (only for pysqa based backend).
pmi_mode (str) – PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean) – use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And in principle any computer should be able to resolve that their own hostname points to the same address as localhost. Still MacOS >= 12 seems to disable this look up for security reasons. So on MacOS it is required to set this option to true
block_allocation (boolean) – To accelerate the submission of a series of python functions with the same resource requirements, executorlib supports block allocation. In this case all resources have to be defined on the executor, rather than during the submission of the individual function.
init_function (None) – optional function to preset arguments for functions which are submitted later
disable_dependencies (boolean) – Disable resolving future objects during the submission.
refresh_rate (float) – Set the refresh rate in seconds, how frequently the input queue is checked.
plot_dependency_graph (bool) – Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str) – Name of the file to store the plotted graph in.
export_workflow_filename (str) – Name of the file to store the exported workflow graph in.
log_obj_size (bool) – Enable debug mode which reports the size of the communicated objects.
wait (bool) – Whether to wait for the completion of all tasks before shutting down the executor.
openmpi_oversubscribe (bool) – adds the –oversubscribe command flag (OpenMPI and SLURM) - default False
Examples
` >>> import numpy as np >>> from executorlib import SlurmClusterExecutor >>> >>> def calc(i, j, k): >>> from mpi4py import MPI >>> size = MPI.COMM_WORLD.Get_size() >>> rank = MPI.COMM_WORLD.Get_rank() >>> return np.array([i, j, k]), size, rank >>> >>> def init_k(): >>> return {"k": 3} >>> >>> with SlurmClusterExecutor(max_workers=2, init_function=init_k) as p: >>> fs = p.submit(calc, 2, j=4) >>> print(fs.result()) [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] `- __init__(max_workers: int | None = None, cache_directory: str | None = None, max_cores: int | None = None, resource_dict: dict | None = None, pysqa_config_directory: str | None = None, pmi_mode: str | None = None, hostname_localhost: bool | None = None, block_allocation: bool = False, init_function: Callable | None = None, disable_dependencies: bool = False, refresh_rate: float = 0.01, plot_dependency_graph: bool = False, plot_dependency_graph_filename: str | None = None, export_workflow_filename: str | None = None, log_obj_size: bool = False, wait: bool = True, openmpi_oversubscribe: bool = False)[source]#
The executorlib.SlurmClusterExecutor leverages either the message passing interface (MPI), the SLURM workload manager or preferable the flux framework for distributing python functions within a given resource allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the executorlib.SlurmClusterExecutor can be executed in a serial python process and does not require the python script to be executed with MPI. It is even possible to execute the executorlib.SlurmClusterExecutor directly in an interactive Jupyter notebook.
- Parameters:
max_workers (int) – for backwards compatibility with the standard library, max_workers also defines the number of cores which can be used in parallel - just like the max_cores parameter. Using max_cores is recommended, as computers have a limited number of compute cores.
cache_directory (str, optional) – The directory to store cache files. Defaults to “executorlib_cache”.
max_cores (int) – defines the number cores which can be used in parallel
resource_dict (dict) –
A dictionary of resources required by the task. With the following keys: * cores (int): number of MPI cores to be used for each function call * threads_per_core (int): number of OpenMP threads to be used for each function call * gpus_per_core (int): number of GPUs per worker - defaults to 0 * cwd (str): current working directory where the parallel python task is executed * cache_key (str): Rather than using the internal hashing of executorlib the user can
provide an external cache_key to identify tasks on the file system.
cache_directory (str): The directory to store cache files.
- num_nodes (int): number of compute nodes used for the evaluation of the Python
function.
- exclusive (bool): boolean flag to reserve exclusive access to selected compute nodes
do not allow other tasks to use the same compute node.
- error_log_file (str): path to the error log file, primarily used to merge the log of
multiple tasks in one file.
- run_time_max (int): the maximum time the execution of the submitted Python
function is allowed to take in seconds.
- priority (int): the queuing system priority assigned to a given Python function to
influence the scheduling.
slurm_cmd_args (list): Additional command line arguments for the srun call.
pysqa_config_directory (str, optional) – path to the pysqa config directory (only for pysqa based backend).
pmi_mode (str) – PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean) – use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And in principle any computer should be able to resolve that their own hostname points to the same address as localhost. Still MacOS >= 12 seems to disable this look up for security reasons. So on MacOS it is required to set this option to true
block_allocation (boolean) – To accelerate the submission of a series of python functions with the same resource requirements, executorlib supports block allocation. In this case all resources have to be defined on the executor, rather than during the submission of the individual function.
init_function (None) – optional function to preset arguments for functions which are submitted later
disable_dependencies (boolean) – Disable resolving future objects during the submission.
refresh_rate (float) – Set the refresh rate in seconds, how frequently the input queue is checked.
plot_dependency_graph (bool) – Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str) – Name of the file to store the plotted graph in.
export_workflow_filename (str) – Name of the file to store the exported workflow graph in.
log_obj_size (bool) – Enable debug mode which reports the size of the communicated objects.
wait (bool) – Whether to wait for the completion of all tasks before shutting down the executor.
openmpi_oversubscribe (bool) – adds the –oversubscribe command flag (OpenMPI and SLURM) - default False
Methods
__init__([max_workers, cache_directory, ...])The executorlib.SlurmClusterExecutor leverages either the message passing interface (MPI), the SLURM workload manager or preferable the flux framework for distributing python functions within a given resource allocation.
batched(iterable, n)Batch futures from the iterable into tuples of length n.
map(fn, *iterables[, timeout, chunksize])Returns an iterator equivalent to map(fn, iter).
shutdown([wait, cancel_futures])Clean-up the resources associated with the Executor.
submit(fn, /, *args[, resource_dict])Submits a callable to be executed with the given arguments.
Attributes
Get the future queue.
Get the information about the executor.
Return the number of parallel workers configured for this executor.
- batched(iterable: list[Future], n: int) list[Future]#
Batch futures from the iterable into tuples of length n. The last batch may be shorter than n.
- Parameters:
iterable (list) – list of future objects to batch based on which future objects finish first
n (int) – badge size
- Returns:
list of future objects one for each batch
- Return type:
list[Future]
- property future_queue: Queue | None#
Get the future queue.
- Returns:
The future queue.
- Return type:
queue.Queue
- property info: dict | None#
Get the information about the executor.
- Returns:
Information about the executor.
- Return type:
Optional[dict]
- map(fn: Callable, *iterables, timeout: float | None = None, chunksize: int = 1)#
Returns an iterator equivalent to map(fn, iter).
- Parameters:
fn – A callable that will take as many arguments as there are passed iterables.
timeout – The maximum number of seconds to wait. If None, then there is no limit on the wait time.
chunksize – The size of the chunks the iterable will be broken into before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor.
- Returns:
map(func, *iterables) but the calls may be evaluated out-of-order.
- Return type:
An iterator equivalent to
- Raises:
TimeoutError – If the entire result iterator could not be generated before the given timeout.
Exception – If fn(*args) raises for any values.
- property max_workers: int | None#
Return the number of parallel workers configured for this executor.
- Returns:
The maximum number of parallel workers, or None if unconstrained.
- Return type:
Optional[int]
- shutdown(wait: bool = True, *, cancel_futures: bool = False)#
Clean-up the resources associated with the Executor.
It is safe to call this method several times. Otherwise, no other methods can be called after this one.
- Parameters:
wait (bool) – If True then shutdown will not return until all running futures have finished executing and the resources used by the parallel_executors have been reclaimed.
cancel_futures (bool) – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.
- submit(fn: Callable, /, *args, resource_dict: dict | None = None, **kwargs) Future#
Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as fn(*args, **kwargs) and returns a Future instance representing the execution of the callable.
- Parameters:
fn (callable) – function to submit for execution
args – arguments for the submitted function
kwargs – keyword arguments for the submitted function
resource_dict (dict) –
A dictionary of resources required by the task. With the following keys: - cores (int): number of MPI cores to be used for each function call - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_core (int): number of GPUs per worker - defaults to 0 - cwd (str/None): current working directory where the parallel python task is executed - openmpi_oversubscribe (bool): adds the –oversubscribe command line flag (OpenMPI and
SLURM only) - default False
slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
- Returns:
A Future representing the given call.
- Return type:
Future