Source code for executorlib.task_scheduler.interactive.spawner_slurm
import os
from typing import Optional
from executorlib.standalone.command import generate_slurm_command
from executorlib.standalone.interactive.spawner import SubprocessSpawner
[docs]
def validate_max_workers(max_workers: int, cores: int, threads_per_core: int):
env = os.environ
if "SLURM_NTASKS" in env and "SLURM_CPUS_PER_TASK" in env:
cores_total = int(env["SLURM_NTASKS"]) * int(env["SLURM_CPUS_PER_TASK"])
cores_requested = max_workers * cores * threads_per_core
if cores_total < cores_requested:
raise ValueError(
"The number of requested cores is larger than the available cores "
+ str(cores_total)
+ " < "
+ str(cores_requested)
)
[docs]
class SrunSpawner(SubprocessSpawner):
[docs]
def __init__(
self,
cwd: Optional[str] = None,
cores: int = 1,
threads_per_core: int = 1,
gpus_per_core: int = 0,
num_nodes: Optional[int] = None,
worker_id: int = 0,
exclusive: bool = False,
openmpi_oversubscribe: bool = False,
slurm_cmd_args: Optional[list[str]] = None,
pmi_mode: Optional[str] = None,
run_time_max: Optional[int] = None,
):
"""
Srun interface implementation.
Args:
cwd (str, optional): The current working directory. Defaults to None.
cores (int, optional): The number of cores to use. Defaults to 1.
threads_per_core (int, optional): The number of threads per core. Defaults to 1.
gpus_per_core (int, optional): The number of GPUs per core. Defaults to 0.
num_nodes (int, optional): The number of compute nodes to use for executing the task. Defaults to None.
worker_id (int): The worker ID. Defaults to 0.
exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to False.
openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
slurm_cmd_args (list[str], optional): Additional command line arguments. Defaults to [].
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
run_time_max (int): The maximum runtime in seconds for each task. Default: None
"""
super().__init__(
cwd=cwd,
cores=cores,
worker_id=worker_id,
openmpi_oversubscribe=openmpi_oversubscribe,
threads_per_core=threads_per_core,
)
self._gpus_per_core = gpus_per_core
self._slurm_cmd_args = slurm_cmd_args
self._num_nodes = num_nodes
self._exclusive = exclusive
self._pmi_mode = pmi_mode
self._run_time_max = run_time_max
[docs]
def generate_command(self, command_lst: list[str]) -> list[str]:
"""
Generate the command list for the Srun interface.
Args:
command_lst (list[str]): The command list.
Returns:
list[str]: The generated command list.
"""
command_prepend_lst = generate_slurm_command(
cores=self._cores,
cwd=self._cwd,
threads_per_core=self._threads_per_core,
gpus_per_core=self._gpus_per_core,
num_nodes=self._num_nodes,
exclusive=self._exclusive,
openmpi_oversubscribe=self._openmpi_oversubscribe,
slurm_cmd_args=self._slurm_cmd_args,
pmi_mode=self._pmi_mode,
run_time_max=self._run_time_max,
)
return super().generate_command(
command_lst=command_prepend_lst + command_lst,
)