Source code for executorlib.task_scheduler.interactive.blockallocation

import queue
import random
from concurrent.futures import Future
from threading import Event, Lock, Thread
from typing import Callable, Optional

from executorlib.standalone.command import get_interactive_execute_command
from executorlib.standalone.inputcheck import (
    check_resource_dict,
    check_resource_dict_is_empty,
)
from executorlib.standalone.interactive.communication import (
    ExecutorlibSocketError,
    SocketInterface,
    interface_bootup,
)
from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner
from executorlib.standalone.queue import cancel_items_in_queue
from executorlib.task_scheduler.base import TaskSchedulerBase, validate_resource_dict
from executorlib.task_scheduler.interactive.shared import (
    execute_task_dict,
    reset_task_dict,
    task_done,
)

_interrupt_bootup_dict: dict = {}


[docs] class BlockAllocationTaskScheduler(TaskSchedulerBase): """ The executorlib.interactive.executor.InteractiveExecutor leverages the exeutorlib executor to distribute python tasks on a workstation or inside a queuing system allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the executorlib.interactive.executor.InteractiveExecutor can be executed in a serial python process and does not require the python script to be executed with MPI. Consequently, it is primarily an abstraction of its functionality to improves the usability in particular when used in combination with Jupyter notebooks. Args: max_workers (int): defines the number workers which can execute functions in parallel executor_kwargs (dict): keyword arguments for the executor spawner (BaseSpawner): interface class to initiate python processes restart_limit (int): The maximum number of restarting worker processes. Examples: >>> import numpy as np >>> from executorlib.task_scheduler.interactive.blockallocation import BlockAllocationTaskScheduler >>> >>> def calc(i, j, k): >>> from mpi4py import MPI >>> size = MPI.COMM_WORLD.Get_size() >>> rank = MPI.COMM_WORLD.Get_rank() >>> return np.array([i, j, k]), size, rank >>> >>> def init_k(): >>> return {"k": 3} >>> >>> with BlockAllocationTaskScheduler(max_workers=2, executor_kwargs={"init_function": init_k}) as p: >>> fs = p.submit(calc, 2, j=4) >>> print(fs.result()) [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] """
[docs] def __init__( self, max_workers: int = 1, executor_kwargs: Optional[dict] = None, spawner: type[BaseSpawner] = MpiExecSpawner, validator: Callable = validate_resource_dict, restart_limit: int = 0, ): if executor_kwargs is None: executor_kwargs = {} super().__init__( max_cores=executor_kwargs.get("max_cores"), validator=validator ) executor_kwargs["future_queue"] = self._future_queue executor_kwargs["spawner"] = spawner executor_kwargs["queue_join_on_shutdown"] = False executor_kwargs["restart_limit"] = restart_limit self._process_kwargs = executor_kwargs self._max_workers = max_workers self_id = random.getrandbits(128) self._self_id = self_id _interrupt_bootup_dict[self._self_id] = False alive_workers = [max_workers] alive_workers_lock = Lock() bootup_events = [Event() for _ in range(self._max_workers)] bootup_events[0].set() self._set_process( process=[ Thread( target=_execute_multiple_tasks, kwargs=executor_kwargs | { "worker_id": worker_id, "stop_function": lambda: _interrupt_bootup_dict[self_id], "bootup_event": bootup_events[worker_id], "next_bootup_event": ( bootup_events[worker_id + 1] if worker_id + 1 < self._max_workers else None ), "alive_workers": alive_workers, "alive_workers_lock": alive_workers_lock, }, ) for worker_id in range(self._max_workers) ], )
@property def max_workers(self) -> int: return self._max_workers @max_workers.setter def max_workers(self, max_workers: int): if isinstance(self._future_queue, queue.Queue) and isinstance( self._process, list ): if self._max_workers > max_workers: for _ in range(self._max_workers - max_workers): self._future_queue.queue.insert(0, {"shutdown": True, "wait": True}) while len(self._process) > max_workers: self._process = [ process for process in self._process if process.is_alive() ] elif self._max_workers < max_workers: new_process_lst = [ Thread( target=_execute_multiple_tasks, kwargs=self._process_kwargs, ) for _ in range(max_workers - self._max_workers) ] for process_instance in new_process_lst: process_instance.start() self._process += new_process_lst self._max_workers = max_workers
[docs] def submit( # type: ignore self, fn: Callable, *args, resource_dict: Optional[dict] = None, **kwargs ) -> Future: """ Submits a callable to be executed with the given arguments. Schedules the callable to be executed as fn(*args, **kwargs) and returns a Future instance representing the execution of the callable. Args: fn (Callable): function to submit for execution args: arguments for the submitted function kwargs: keyword arguments for the submitted function resource_dict (dict): A dictionary of resources required by the task. With the following keys: - cores (int): number of MPI cores to be used for each function call - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_core (int): number of GPUs per worker - defaults to 0 - cwd (str/None): current working directory where the parallel python task is executed - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and SLURM only) - default False - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only) - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. Returns: Future: A Future representing the given call. """ if resource_dict is None: resource_dict = {} check_resource_dict_is_empty(resource_dict=resource_dict) check_resource_dict(function=fn) f: Future = Future() if self._future_queue is not None: self._future_queue.put( {"fn": fn, "args": args, "kwargs": kwargs, "future": f} ) return f
[docs] def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): """Clean-up the resources associated with the Executor. It is safe to call this method several times. Otherwise, no other methods can be called after this one. Args: wait (bool): If True then shutdown will not return until all running futures have finished executing and the resources used by the parallel_executors have been reclaimed. cancel_futures (bool): If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled. """ if self._future_queue is not None: if cancel_futures: cancel_items_in_queue(que=self._future_queue) if isinstance(self._process, list): _interrupt_bootup_dict[self._self_id] = True for _ in range(len(self._process)): self._future_queue.put({"shutdown": True, "wait": wait}) if wait: for process in self._process: process.join() self._future_queue.join() self._process = None self._future_queue = None
def _set_process(self, process: list[Thread]): # type: ignore """ Set the process for the executor. Args: process (List[RaisingThread]): The process for the executor. """ self._process = process for process_instance in self._process: process_instance.start()
def _execute_multiple_tasks( future_queue: queue.Queue, cores: int = 1, spawner: type[BaseSpawner] = MpiExecSpawner, hostname_localhost: Optional[bool] = None, init_function: Optional[Callable] = None, cache_directory: Optional[str] = None, cache_key: Optional[str] = None, queue_join_on_shutdown: bool = True, log_obj_size: bool = False, error_log_file: Optional[str] = None, worker_id: int = 0, stop_function: Optional[Callable] = None, restart_limit: int = 0, bootup_event: Optional[Event] = None, next_bootup_event: Optional[Event] = None, alive_workers: Optional[list] = None, alive_workers_lock: Optional[Lock] = None, **kwargs, ) -> None: """ Execute a single tasks in parallel using the message passing interface (MPI). Args: future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process cores (int): defines the total number of MPI ranks to use spawner (BaseSpawner): Spawner to start process on selected compute resources hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And in principle any computer should be able to resolve that their own hostname points to the same address as localhost. Still MacOS >= 12 seems to disable this look up for security reasons. So on MacOS it is required to set this option to true init_function (Callable): optional function to preset arguments for functions which are submitted later cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache". cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be overwritten by setting the cache_key. queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource distribution. stop_function (Callable): Function to stop the interface. restart_limit (int): The maximum number of restarting worker processes. bootup_event (Event): Event to wait on before submitting the job to the scheduler, ensuring workers are submitted in worker_id order. next_bootup_event (Event): Event to signal after job submission, unblocking the next worker. alive_workers (list): Single-element list [N] tracking how many worker threads are still alive. Shared across all worker threads; decremented when a worker is permanently dead. alive_workers_lock (Lock): Lock protecting alive_workers from concurrent modification. """ if bootup_event is not None: bootup_event.wait() interface = interface_bootup( command_lst=get_interactive_execute_command( cores=cores, ), connections=spawner(cores=cores, worker_id=worker_id, **kwargs), hostname_localhost=hostname_localhost, log_obj_size=log_obj_size, worker_id=worker_id, stop_function=stop_function, ) if next_bootup_event is not None: next_bootup_event.set() interface_initialization_exception = _set_init_function( interface=interface, init_function=init_function, ) restart_counter = 0 while True: if not interface.status and restart_counter >= restart_limit: _drain_dead_worker( future_queue=future_queue, alive_workers=alive_workers, alive_workers_lock=alive_workers_lock, ) break elif not interface.status: interface.bootup() interface_initialization_exception = _set_init_function( interface=interface, init_function=init_function, ) restart_counter += 1 else: # interface.status == True task_dict = future_queue.get() if "shutdown" in task_dict and task_dict["shutdown"]: if interface.status: interface.shutdown(wait=task_dict["wait"]) task_done(future_queue=future_queue) if queue_join_on_shutdown: future_queue.join() break elif "fn" in task_dict and "future" in task_dict: f = task_dict.pop("future") if interface_initialization_exception is not None: f.set_exception(exception=interface_initialization_exception) else: # The interface failed during the execution interface.status = execute_task_dict( task_dict=task_dict, future_obj=f, interface=interface, cache_directory=cache_directory, cache_key=cache_key, error_log_file=error_log_file, ) if not interface.status: reset_task_dict( future_obj=f, future_queue=future_queue, task_dict=task_dict ) task_done(future_queue=future_queue) def _drain_dead_worker( future_queue: queue.Queue, alive_workers: Optional[list] = None, alive_workers_lock: Optional[Lock] = None, ) -> None: """Handle a permanently dead worker by recycling or failing its tasks. If healthy workers remain, tasks are recycled back into the shared queue so they can be picked up. If all workers are dead, tasks are failed immediately with ExecutorlibSocketError. In both cases, the worker's shutdown message is consumed to prevent hangs in shutdown(). """ if alive_workers is not None and alive_workers_lock is not None: with alive_workers_lock: if alive_workers[0] > 0: alive_workers[0] -= 1 while True: try: task_dict = future_queue.get(timeout=1) except queue.Empty: continue if "shutdown" in task_dict and task_dict["shutdown"]: task_done(future_queue=future_queue) break elif "fn" in task_dict and "future" in task_dict: if alive_workers is not None and alive_workers_lock is not None: with alive_workers_lock: has_healthy_workers = alive_workers[0] > 0 else: has_healthy_workers = False if has_healthy_workers: future_queue.put(task_dict) task_done(future_queue=future_queue) else: f = task_dict.pop("future") f.set_exception( ExecutorlibSocketError("SocketInterface crashed during execution.") ) task_done(future_queue=future_queue) def _set_init_function( interface: SocketInterface, init_function: Optional[Callable] = None, ) -> Optional[Exception]: interface_initialization_exception = None if init_function is not None and interface.status: try: _ = interface.send_and_receive_dict( input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}} ) except Exception as init_exception: interface_initialization_exception = init_exception return interface_initialization_exception