import contextlib
import os
from typing import Callable, Optional
import flux
import flux.job
from executorlib.standalone.interactive.spawner import (
BaseSpawner,
set_current_directory_in_environment,
)
[docs]
def validate_max_workers(max_workers: int, cores: int, threads_per_core: int):
handle = flux.Flux()
cores_total = flux.resource.list.resource_list(handle).get().up.ncores
cores_requested = max_workers * cores * threads_per_core
if cores_total < cores_requested:
raise ValueError(
"The number of requested cores is larger than the available cores "
+ str(cores_total)
+ " < "
+ str(cores_requested)
)
[docs]
class FluxPythonSpawner(BaseSpawner):
"""
A class representing the FluxPythonInterface.
Args:
cwd (str, optional): The current working directory. Defaults to None.
cores (int, optional): The number of cores. Defaults to 1.
threads_per_core (int, optional): The number of threads per base. Defaults to 1.
gpus_per_core (int, optional): The number of GPUs per base. Defaults to 0.
num_nodes (int, optional): The number of compute nodes to use for executing the task. Defaults to None.
worker_id (int): The worker ID. Defaults to 0.
exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to
False.
openmpi_oversubscribe (bool, optional): Whether to oversubscribe. Defaults to False.
priority (int, optional): job urgency 0 (lowest) through 31 (highest) (default is 16). Priorities 0 through 15
are restricted to the instance owner.
pmi_mode (str, optional): The PMI option. Defaults to None.
flux_executor (flux.job.FluxExecutor, optional): The FluxExecutor instance. Defaults to None.
flux_executor_nesting (bool, optional): Whether to use nested FluxExecutor. Defaults to False.
flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
run_time_max (int): The maximum runtime in seconds for each task. Default: None
"""
[docs]
def __init__(
self,
cwd: Optional[str] = None,
cores: int = 1,
threads_per_core: int = 1,
gpus_per_core: int = 0,
num_nodes: Optional[int] = None,
worker_id: int = 0,
exclusive: bool = False,
priority: Optional[int] = None,
openmpi_oversubscribe: bool = False,
pmi_mode: Optional[str] = None,
flux_executor: Optional[flux.job.FluxExecutor] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
run_time_max: Optional[int] = None,
):
super().__init__(
cwd=cwd,
cores=cores,
worker_id=worker_id,
openmpi_oversubscribe=openmpi_oversubscribe,
)
self._threads_per_core = threads_per_core
self._gpus_per_core = gpus_per_core
self._num_nodes = num_nodes
self._exclusive = exclusive
self._flux_executor = flux_executor
self._pmi_mode = pmi_mode
self._flux_executor_nesting = flux_executor_nesting
self._flux_log_files = flux_log_files
self._priority = priority
self._future = None
self._run_time_max = run_time_max
[docs]
def bootup(
self,
command_lst: list[str],
stop_function: Optional[Callable] = None,
) -> bool:
"""
Boot up the client process to connect to the SocketInterface.
Args:
command_lst (list[str]): List of strings to start the client process.
stop_function (Callable): Function to stop the interface.
Raises:
ValueError: If oversubscribing is not supported for the Flux adapter or if conda environments are not supported.
Returns:
bool: Whether the interface was successfully started.
"""
if self._openmpi_oversubscribe:
raise ValueError(
"Oversubscribing is currently not supported for the Flux adapter."
)
if self._flux_executor is None:
self._flux_executor = flux.job.FluxExecutor()
if not self._flux_executor_nesting:
jobspec = flux.job.JobspecV1.from_command(
command=command_lst,
num_tasks=self._cores,
cores_per_task=self._threads_per_core,
gpus_per_task=self._gpus_per_core,
num_nodes=self._num_nodes,
exclusive=self._exclusive,
)
else:
jobspec = flux.job.JobspecV1.from_nest_command(
command=command_lst,
num_slots=self._cores,
cores_per_slot=self._threads_per_core,
gpus_per_slot=self._gpus_per_core,
num_nodes=self._num_nodes,
exclusive=self._exclusive,
)
set_current_directory_in_environment()
jobspec.environment = dict(os.environ)
if self._pmi_mode is not None:
jobspec.setattr_shell_option("pmi", self._pmi_mode)
if self._cwd is not None:
jobspec.cwd = self._cwd
os.makedirs(self._cwd, exist_ok=True)
if self._run_time_max is not None:
jobspec.duration = self._run_time_max
file_prefix = "flux_" + str(self._worker_id)
if self._flux_log_files and self._cwd is not None:
jobspec.stderr = os.path.join(self._cwd, file_prefix + ".err")
jobspec.stdout = os.path.join(self._cwd, file_prefix + ".out")
elif self._flux_log_files:
jobspec.stderr = os.path.abspath(file_prefix + ".err")
jobspec.stdout = os.path.abspath(file_prefix + ".out")
if self._priority is not None:
self._future = self._flux_executor.submit(
jobspec=jobspec, urgency=self._priority
)
else:
self._future = self._flux_executor.submit(jobspec=jobspec)
if self._future is not None:
self._future.jobid()
return self.poll()
[docs]
def shutdown(self, wait: bool = True):
"""
Shutdown the FluxPythonInterface.
Args:
wait (bool, optional): Whether to wait for the execution to complete. Defaults to True.
"""
if self._future is not None:
if self.poll():
self._future.cancel()
# The flux future objects are not instantly updated,
# still showing running after cancel was called,
# so we wait until the execution is completed.
with contextlib.suppress(flux.job.event.JobException):
self._future.result()
[docs]
def poll(self):
"""
Check if the FluxPythonInterface is running.
Returns:
bool: True if the interface is running, False otherwise.
"""
return self._future is not None and not self._future.done()