Source code for executorlib.standalone.inputcheck

import inspect
import multiprocessing
import os.path
from concurrent.futures import Executor
from typing import Callable, Optional
from warnings import warn


[docs] def check_oversubscribe(oversubscribe: bool) -> None: """ Check if oversubscribe is True and raise a ValueError if it is. """ if oversubscribe: raise ValueError( "Oversubscribing is not supported for the executorlib.flux.PyFLuxExecutor backend." "Please use oversubscribe=False instead of oversubscribe=True." )
[docs] def check_wait_on_shutdown( wait_on_shutdown: bool, ) -> None: """ Check if wait_on_shutdown is False and raise a ValueError if it is. """ if not wait_on_shutdown: raise ValueError( "The wait_on_shutdown parameter is only supported for the executorlib.FluxClusterExecutor and executorlib.SlurmClusterExecutor." )
[docs] def check_command_line_argument_lst(command_line_argument_lst: list[str]) -> None: """ Check if command_line_argument_lst is not empty and raise a ValueError if it is. """ if len(command_line_argument_lst) > 0: raise ValueError( "The command_line_argument_lst parameter is not supported for the SLURM backend." )
[docs] def check_gpus_per_worker(gpus_per_worker: int) -> None: """ Check if gpus_per_worker is not 0 and raise a TypeError if it is. """ if gpus_per_worker != 0: raise TypeError( "GPU assignment is not supported for the executorlib.mpi.PyMPIExecutor backend." "Please use gpus_per_worker=0 instead of gpus_per_worker=" + str(gpus_per_worker) + "." )
[docs] def check_executor(executor: Executor) -> None: """ Check if executor is not None and raise a ValueError if it is. """ if executor is not None: raise ValueError( "The executor parameter is only supported for the flux framework backend." )
[docs] def check_nested_flux_executor(nested_flux_executor: bool) -> None: """ Check if nested_flux_executor is True and raise a ValueError if it is. """ if nested_flux_executor: raise ValueError( "The nested_flux_executor parameter is only supported for the flux framework backend." )
[docs] def check_resource_dict(function: Callable) -> None: """ Check if the function has a parameter named 'resource_dict' and raise a ValueError if it does. """ if "resource_dict" in inspect.signature(function).parameters: raise ValueError( "The parameter resource_dict is used internally in executorlib, " "so it cannot be used as a parameter in the submitted functions." )
[docs] def check_resource_dict_is_empty(resource_dict: dict) -> None: """ Check if resource_dict is not empty and raise a ValueError if it is. """ if len(resource_dict) > 0: raise ValueError( "When block_allocation is enabled, the resource requirements have to be defined on the executor level." )
[docs] def check_refresh_rate(refresh_rate: float) -> None: """ Check if refresh_rate is not 0.01 and raise a ValueError if it is. """ if refresh_rate != 0.01: raise ValueError( "The sleep_interval parameter is only used when disable_dependencies=False." )
[docs] def check_plot_dependency_graph(plot_dependency_graph: bool) -> None: """ Check if plot_dependency_graph is True and raise a ValueError if it is. """ if plot_dependency_graph: raise ValueError( "The plot_dependency_graph parameter is only used when disable_dependencies=False." )
[docs] def check_pmi(backend: Optional[str], pmi: Optional[str]) -> None: """ Check if pmi is valid for the selected backend and raise a ValueError if it is not. """ if backend is not None: if backend != "flux_allocation" and pmi is not None: raise ValueError( "The pmi parameter is currently only implemented for flux." ) elif backend == "flux_allocation" and pmi not in ["pmix", "pmi1", "pmi2", None]: raise ValueError( "The pmi parameter supports [pmix, pmi1, pmi2], but not: " + str(pmi) )
[docs] def check_init_function( block_allocation: bool, init_function: Optional[Callable] ) -> None: """ Check if block_allocation is False and init_function is not None, and raise a ValueError if it is. """ if not block_allocation and init_function is not None: raise ValueError("")
[docs] def check_max_workers_and_cores( max_workers: Optional[int], max_cores: Optional[int] ) -> None: """ Check that neither max_workers nor max_cores is set when using a pysqa-based backend. Args: max_workers (int, optional): Maximum number of workers. max_cores (int, optional): Maximum number of cores. Raises: ValueError: If max_workers or max_cores is not None. """ if max_workers is not None: raise ValueError( "The number of workers cannot be controlled with the pysqa based backend." ) if max_cores is not None: raise ValueError( "The number of cores cannot be controlled with the pysqa based backend." )
[docs] def check_hostname_localhost(hostname_localhost: Optional[bool]) -> None: """ Check that hostname_localhost is not set when using a pysqa-based backend. Args: hostname_localhost (bool, optional): Flag to use localhost for ZMQ connections. Raises: ValueError: If hostname_localhost is not None. """ if hostname_localhost is not None: raise ValueError( "The option to connect to hosts based on their hostname is not available with the pysqa based backend." )
[docs] def check_restart_limit(restart_limit: int, block_allocation: bool = True) -> None: """ Check that restart_limit is only used together with block_allocation. Args: restart_limit (int): Maximum number of times a worker process may be restarted. block_allocation (bool): Whether block allocation is enabled. Defaults to True. Raises: ValueError: If restart_limit is non-zero and block_allocation is False. """ if not block_allocation and restart_limit != 0: raise ValueError( "The option to specify a restart limit for worker processes is only available with block_allocation=True." )
[docs] def check_pmi_mode(pmi_mode: Optional[str]) -> None: """ Check that pmi_mode is not set on a local workstation without SLURM or flux. Args: pmi_mode (str, optional): PMI interface name (e.g. "pmix", "pmi1", "pmi2"). Raises: ValueError: If pmi_mode is not None. """ if pmi_mode is not None: raise ValueError( "The option to specify the pmi mode is not available on a local workstation, it requires SLURM or flux." )
[docs] def check_flux_log_files(flux_log_files: Optional[bool]) -> None: """ Check if flux_log_files is True and raise a ValueError if it is. """ if flux_log_files: raise ValueError( "The flux_log_files parameter is only supported for the flux framework backend." )
[docs] def check_pysqa_config_directory(pysqa_config_directory: Optional[str]) -> None: """ Check if pysqa_config_directory is None and raise a ValueError if it is not. """ if pysqa_config_directory is not None: raise ValueError( "pysqa_config_directory parameter is only supported for pysqa backend." )
[docs] def validate_number_of_cores( max_cores: Optional[int] = None, max_workers: Optional[int] = None, cores_per_worker: Optional[int] = 1, set_local_cores: bool = False, ) -> int: """ Validate the number of cores and return the number of workers to use. Precedence: max_cores / cores_per_worker > max_workers > CPU count (with warning). Args: max_cores (int, optional): Total number of cores available. max_workers (int, optional): Explicit number of parallel workers. cores_per_worker (int, optional): Number of cores allocated to each worker. Defaults to 1. set_local_cores (bool): When True, fall back to the local CPU count if neither max_cores nor max_workers is given instead of raising an error. Defaults to False. Returns: int: Number of parallel workers to use. Raises: ValueError: If neither max_cores nor max_workers is set and set_local_cores is False. """ if max_cores is not None and max_workers is None and cores_per_worker is not None: return int(max_cores / cores_per_worker) elif max_workers is not None: return int(max_workers) elif max_cores is None and max_workers is None and not set_local_cores: raise ValueError( "Block allocation requires a fixed set of computational resources. Neither max_cores nor max_workers are defined." ) else: max_workers = multiprocessing.cpu_count() warn( "max_workers parameter is not set, set default based on CPU count to: max_workers=" + str(max_workers), stacklevel=2, ) return max_workers
[docs] def check_file_exists(file_name: Optional[str]): """ Check if file exists and raise a ValueError if it does not or file_name is None. """ if file_name is None: raise ValueError("file_name is not set.") if not os.path.exists(file_name): raise ValueError("file_name is not written to the file system.")
[docs] def check_log_obj_size(log_obj_size: bool) -> None: """ Check if log_obj_size is True and raise a ValueError if it is. """ if log_obj_size: raise ValueError( "log_obj_size is not supported for the executorlib.SlurmClusterExecutor and executorlib.FluxClusterExecutor." "Please use log_obj_size=False instead of log_obj_size=True." )