Source code for executorlib.task_scheduler.file.task_scheduler

from threading import Thread
from typing import Callable, Optional

from executorlib.standalone.inputcheck import (
    check_executor,
    check_flux_log_files,
    check_hostname_localhost,
    check_max_workers_and_cores,
    check_nested_flux_executor,
    check_pmi_mode,
)
from executorlib.task_scheduler.base import TaskSchedulerBase, validate_resource_dict
from executorlib.task_scheduler.file.shared import execute_tasks_h5
from executorlib.task_scheduler.file.spawner_subprocess import (
    execute_in_subprocess,
    terminate_subprocess,
)

try:
    from executorlib.standalone.scheduler import terminate_with_pysqa
    from executorlib.task_scheduler.file.spawner_pysqa import execute_with_pysqa
except ImportError:
    # If pysqa is not available fall back to executing tasks in a subprocess
    execute_with_pysqa = execute_in_subprocess  # type: ignore
    terminate_with_pysqa = None  # type: ignore


[docs] class FileTaskScheduler(TaskSchedulerBase):
[docs] def __init__( self, executor_kwargs: Optional[dict] = None, execute_function: Callable = execute_with_pysqa, terminate_function: Optional[Callable] = None, pysqa_config_directory: Optional[str] = None, backend: Optional[str] = None, disable_dependencies: bool = False, pmi_mode: Optional[str] = None, wait: bool = True, refresh_rate: float = 0.01, validator: Callable = validate_resource_dict, ): """ Initialize the FileExecutor. Args: executor_kwargs (dict): A dictionary of executor arguments required by the task. With the following keys: - cores (int): number of MPI cores to be used for each function call - cwd (str/None): current working directory where the parallel python task is executed - cache_directory (str): The directory to store cache files. execute_function (Callable, optional): The function to execute tasks. Defaults to execute_in_subprocess. terminate_function (Callable, optional): The function to terminate the tasks. pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). backend (str, optional): name of the backend used to spawn tasks. disable_dependencies (boolean): Disable resolving future objects during the submission. pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None wait (bool): Whether to wait for the completion of all tasks before shutting down the executor. refresh_rate (float): The rate at which to refresh the result. Defaults to 0.01. """ super().__init__(max_cores=None, validator=validator) default_resource_dict = { "cores": 1, "cwd": None, "cache_directory": "executorlib_cache", "exclusive": False, "openmpi_oversubscribe": False, } if executor_kwargs is None: executor_kwargs = {} executor_kwargs.update( {k: v for k, v in default_resource_dict.items() if k not in executor_kwargs} ) self._process_kwargs = { "executor_kwargs": executor_kwargs, "future_queue": self._future_queue, "execute_function": execute_function, "terminate_function": terminate_function, "pysqa_config_directory": pysqa_config_directory, "backend": backend, "disable_dependencies": disable_dependencies, "pmi_mode": pmi_mode, "refresh_rate": refresh_rate, "wait": wait, } self._set_process( Thread( target=execute_tasks_h5, kwargs=self._process_kwargs, ) )
[docs] def create_file_executor( executor_kwargs: dict, max_workers: Optional[int] = None, backend: Optional[str] = None, max_cores: Optional[int] = None, cache_directory: Optional[str] = None, pmi_mode: Optional[str] = None, flux_executor=None, flux_executor_nesting: bool = False, flux_log_files: bool = False, pysqa_config_directory: Optional[str] = None, hostname_localhost: Optional[bool] = None, block_allocation: bool = False, init_function: Optional[Callable] = None, disable_dependencies: bool = False, execute_function: Callable = execute_with_pysqa, wait: bool = True, refresh_rate: float = 0.01, validator: Callable = validate_resource_dict, ): if block_allocation: raise ValueError( "The option block_allocation is not available with the pysqa based backend." ) if init_function is not None: raise ValueError( "The option to specify an init_function is not available with the pysqa based backend." ) if cache_directory is not None: executor_kwargs["cache_directory"] = cache_directory if backend is None: check_pmi_mode(pmi_mode=pmi_mode) check_max_workers_and_cores(max_cores=max_cores, max_workers=max_workers) check_hostname_localhost(hostname_localhost=hostname_localhost) check_executor(executor=flux_executor) check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) check_flux_log_files(flux_log_files=flux_log_files) if execute_function != execute_in_subprocess: terminate_function = terminate_with_pysqa # type: ignore else: terminate_function = terminate_subprocess # type: ignore return FileTaskScheduler( executor_kwargs=executor_kwargs, pysqa_config_directory=pysqa_config_directory, backend=backend, disable_dependencies=disable_dependencies, execute_function=execute_function, terminate_function=terminate_function, pmi_mode=pmi_mode, wait=wait, refresh_rate=refresh_rate, validator=validator, )