Source code for executorlib.executor.base

import queue
from abc import ABC
from concurrent.futures import (
    Executor as FutureExecutor,
)
from concurrent.futures import (
    Future,
)
from typing import Callable, Optional

from executorlib.task_scheduler.base import TaskSchedulerBase


[docs] class BaseExecutor(FutureExecutor, ABC): """ Interface class for the executor. Args: executor (TaskSchedulerBase): internal executor """
[docs] def __init__(self, executor: TaskSchedulerBase): self._task_scheduler = executor self._is_active = True
@property def max_workers(self) -> Optional[int]: """ Return the number of parallel workers configured for this executor. Returns: Optional[int]: The maximum number of parallel workers, or None if unconstrained. """ return self._task_scheduler.max_workers @max_workers.setter def max_workers(self, max_workers: int): """ Set the number of parallel workers on the underlying task scheduler. Args: max_workers (int): New maximum number of parallel workers. """ self._task_scheduler.max_workers = max_workers @property def info(self) -> Optional[dict]: """ Get the information about the executor. Returns: Optional[dict]: Information about the executor. """ return self._task_scheduler.info @property def future_queue(self) -> Optional[queue.Queue]: """ Get the future queue. Returns: queue.Queue: The future queue. """ return self._task_scheduler.future_queue
[docs] def batched( self, iterable: list[Future], n: int, ) -> list[Future]: """ Batch futures from the iterable into tuples of length n. The last batch may be shorter than n. Args: iterable (list): list of future objects to batch based on which future objects finish first n (int): badge size Returns: list[Future]: list of future objects one for each batch """ return self._task_scheduler.batched(iterable=iterable, n=n)
[docs] def submit( # type: ignore self, fn: Callable, /, *args, resource_dict: Optional[dict] = None, **kwargs, ) -> Future: """ Submits a callable to be executed with the given arguments. Schedules the callable to be executed as fn(*args, **kwargs) and returns a Future instance representing the execution of the callable. Args: fn (callable): function to submit for execution args: arguments for the submitted function kwargs: keyword arguments for the submitted function resource_dict (dict): A dictionary of resources required by the task. With the following keys: - cores (int): number of MPI cores to be used for each function call - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_core (int): number of GPUs per worker - defaults to 0 - cwd (str/None): current working directory where the parallel python task is executed - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and SLURM only) - default False - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only) - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. Returns: Future: A Future representing the given call. """ if self._is_active: return self._task_scheduler.submit( *([fn] + list(args)), resource_dict=resource_dict, **kwargs ) else: raise RuntimeError("cannot schedule new futures after shutdown")
[docs] def map( self, fn: Callable, *iterables, timeout: Optional[float] = None, chunksize: int = 1, ): """Returns an iterator equivalent to map(fn, iter). Args: fn: A callable that will take as many arguments as there are passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. chunksize: The size of the chunks the iterable will be broken into before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor. Returns: An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order. Raises: TimeoutError: If the entire result iterator could not be generated before the given timeout. Exception: If fn(*args) raises for any values. """ if self._is_active: return self._task_scheduler.map( *([fn] + list(iterables)), timeout=timeout, chunksize=chunksize, ) else: raise RuntimeError("cannot schedule new futures after shutdown")
[docs] def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): """ Clean-up the resources associated with the Executor. It is safe to call this method several times. Otherwise, no other methods can be called after this one. Args: wait (bool): If True then shutdown will not return until all running futures have finished executing and the resources used by the parallel_executors have been reclaimed. cancel_futures (bool): If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled. """ self._task_scheduler.shutdown(wait=wait, cancel_futures=cancel_futures) self._is_active = False
def __len__(self) -> int: """ Get the length of the executor. Returns: int: The length of the executor. """ return len(self._task_scheduler) def __bool__(self): """ Overwrite length to always return True Returns: bool: Always return True """ return True def __exit__(self, *args, **kwargs) -> None: """ Exit method called when exiting the context manager. """ self._task_scheduler.__exit__(*args, **kwargs) self._is_active = False