Source code for executorlib.standalone.command

import importlib.util
import os
import sys
from typing import Optional

SLURM_COMMAND = "srun"


[docs] def get_command_path(executable: str) -> str: """ Get path of the backend executable script Args: executable (str): Name of the backend executable script, either mpiexec.py or serial.py Returns: str: absolute path to the executable script """ return os.path.abspath(os.path.join(__file__, "..", "..", "backend", executable))
[docs] def get_cache_execute_command( file_name: str, cores: int = 1, backend: Optional[str] = None, exclusive: bool = False, openmpi_oversubscribe: bool = False, pmi_mode: Optional[str] = None, ) -> list: """ Get command to call backend as a list of two strings Args: file_name (str): The name of the file. cores (int, optional): Number of cores used to execute the task. Defaults to 1. backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"]. exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to False. openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False. pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None Returns: list[str]: List of strings containing the python executable path and the backend script to execute """ command_lst = [sys.executable] if cores > 1 and importlib.util.find_spec("mpi4py") is not None: if backend is None: command_lst = ( ["mpiexec", "-n", str(cores)] + command_lst + [get_command_path(executable="cache_parallel.py"), file_name] ) elif backend == "slurm": command_prepend = ["srun", "-n", str(cores)] if pmi_mode is not None: command_prepend += ["--mpi=" + pmi_mode] if openmpi_oversubscribe: command_prepend += ["--oversubscribe"] if exclusive: command_prepend += ["--exact"] command_lst = ( command_prepend + command_lst + [get_command_path(executable="cache_parallel.py"), file_name] ) elif backend == "flux": flux_command = ["flux", "run"] if pmi_mode is not None: flux_command += ["-o", "pmi=" + pmi_mode] if openmpi_oversubscribe: raise ValueError( "The option openmpi_oversubscribe is not available with the flux backend." ) if exclusive: raise ValueError( "The option exclusive is not available with the flux backend." ) command_lst = ( flux_command + ["-n", str(cores)] + command_lst + [get_command_path(executable="cache_parallel.py"), file_name] ) else: raise ValueError(f"backend should be None, slurm or flux, not {backend}") elif cores > 1: raise ImportError( "mpi4py is required for parallel calculations. Please install mpi4py." ) else: command_lst += [get_command_path(executable="cache_serial.py"), file_name] return command_lst
[docs] def get_interactive_execute_command( cores: int, ) -> list: """ Get command to call backend as a list of two strings Args: cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py else interactive_serial.py Returns: list[str]: List of strings containing the python executable path and the backend script to execute """ command_lst = [sys.executable] if cores > 1 and importlib.util.find_spec("mpi4py") is not None: command_lst += [get_command_path(executable="interactive_parallel.py")] elif cores > 1: raise ImportError( "mpi4py is required for parallel calculations. Please install mpi4py." ) else: command_lst += [get_command_path(executable="interactive_serial.py")] return command_lst
[docs] def generate_slurm_command( cores: int, cwd: Optional[str], threads_per_core: int = 1, gpus_per_core: int = 0, num_nodes: Optional[int] = None, exclusive: bool = False, openmpi_oversubscribe: bool = False, slurm_cmd_args: Optional[list[str]] = None, pmi_mode: Optional[str] = None, run_time_max: Optional[int] = None, ) -> list[str]: """ Generate the command list for the SLURM interface. Args: cores (int): The number of cores. cwd (str): The current working directory. threads_per_core (int, optional): The number of threads per core. Defaults to 1. gpus_per_core (int, optional): The number of GPUs per core. Defaults to 0. num_nodes (int, optional): The number of compute nodes to use for executing the task. Defaults to None. exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to False. openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False. slurm_cmd_args (list[str], optional): Additional command line arguments. Defaults to []. pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None run_time_max (int): The maximum runtime in seconds for each task. Default: None Returns: list[str]: The generated command list. """ command_prepend_lst = [SLURM_COMMAND, "-n", str(cores)] if cwd is not None: command_prepend_lst += ["-D", cwd] if pmi_mode is not None: command_prepend_lst += ["--mpi=" + pmi_mode] if num_nodes is not None: command_prepend_lst += ["-N", str(num_nodes)] if threads_per_core >= 1: command_prepend_lst += ["--cpus-per-task=" + str(threads_per_core)] if gpus_per_core > 0: command_prepend_lst += ["--gpus-per-task=" + str(gpus_per_core)] if exclusive: command_prepend_lst += ["--exact"] if openmpi_oversubscribe: command_prepend_lst += ["--oversubscribe"] if run_time_max is not None: command_prepend_lst += ["--time=" + str(run_time_max // 60 + 1)] if slurm_cmd_args is not None and len(slurm_cmd_args) > 0: command_prepend_lst += slurm_cmd_args return command_prepend_lst