Source code for executorlib.task_scheduler.interactive.onetoone

import queue
from concurrent.futures import Future
from threading import Thread
from typing import Callable, Optional

from executorlib.standalone.command import get_interactive_execute_command
from executorlib.standalone.interactive.communication import (
    ExecutorlibSocketError,
    interface_bootup,
)
from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner
from executorlib.task_scheduler.base import TaskSchedulerBase, validate_resource_dict
from executorlib.task_scheduler.interactive.shared import execute_task_dict


[docs] class OneProcessTaskScheduler(TaskSchedulerBase): """ The executorlib.interactive.executor.InteractiveStepExecutor leverages the executorlib executor to distribute python tasks. In contrast to the mpi4py.futures.MPIPoolExecutor the executorlib.interactive.executor.InteractiveStepExecutor can be executed in a serial python process and does not require the python script to be executed with MPI. Consequently, it is primarily an abstraction of its functionality to improve the usability in particular when used in combination with Jupyter notebooks. Args: max_cores (int): defines the number workers which can execute functions in parallel executor_kwargs (dict): keyword arguments for the executor spawner (BaseSpawner): interface class to initiate python processes Examples: >>> import numpy as np >>> from executorlib.task_scheduler.interactive.onetoone import OneProcessTaskScheduler >>> >>> def calc(i, j, k): >>> from mpi4py import MPI >>> size = MPI.COMM_WORLD.Get_size() >>> rank = MPI.COMM_WORLD.Get_rank() >>> return np.array([i, j, k]), size, rank >>> >>> with OneProcessTaskScheduler(max_cores=2) as p: >>> fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2}) >>> print(fs.result()) [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] """
[docs] def __init__( self, max_cores: Optional[int] = None, max_workers: Optional[int] = None, executor_kwargs: Optional[dict] = None, spawner: type[BaseSpawner] = MpiExecSpawner, validator: Callable = validate_resource_dict, ): if executor_kwargs is None: executor_kwargs = {} super().__init__( max_cores=executor_kwargs.get("max_cores"), validator=validator ) executor_kwargs.update( { "future_queue": self._future_queue, "spawner": spawner, "max_cores": max_cores, "max_workers": max_workers, } ) self._process_kwargs = executor_kwargs self._set_process( Thread( target=_execute_single_task, kwargs=executor_kwargs, ) )
def _execute_single_task( future_queue: queue.Queue, spawner: type[BaseSpawner] = MpiExecSpawner, max_cores: Optional[int] = None, max_workers: Optional[int] = None, hostname_localhost: Optional[bool] = None, **kwargs, ): """ Execute a single tasks in parallel using the message passing interface (MPI). Args: future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process spawner (BaseSpawner): Interface to start process on selected compute resources max_cores (int): defines the number cores which can be used in parallel max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of cores which can be used in parallel - just like the max_cores parameter. Using max_cores is recommended, as computers have a limited number of compute cores. hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And in principle any computer should be able to resolve that their own hostname points to the same address as localhost. Still MacOS >= 12 seems to disable this look up for security reasons. So on MacOS it is required to set this option to true """ active_task_dict: dict = {} process_lst: list = [] if "cores" not in kwargs: kwargs["cores"] = 1 while True: task_dict = future_queue.get() if "shutdown" in task_dict and task_dict["shutdown"]: if task_dict["wait"]: _ = [process.join() for process in process_lst] future_queue.task_done() future_queue.join() break elif "fn" in task_dict and "future" in task_dict: process, active_task_dict = _wrap_execute_task_in_separate_process( task_dict=task_dict, active_task_dict=active_task_dict, spawner=spawner, executor_kwargs=kwargs, max_cores=max_cores, max_workers=max_workers, hostname_localhost=hostname_localhost, ) process_lst.append(process) future_queue.task_done() def _wait_for_free_slots( active_task_dict: dict, cores_requested: int, max_cores: Optional[int] = None, max_workers: Optional[int] = None, ) -> dict: """ Wait for available computing resources to become available. Args: active_task_dict (dict): Dictionary containing the future objects and the number of cores they require cores_requested (int): Number of cores required for executing the next task max_cores (int): Maximum number cores which can be used max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of cores which can be used in parallel - just like the max_cores parameter. Using max_cores is recommended, as computers have a limited number of compute cores. Returns: dict: Dictionary containing the future objects and the number of cores they require """ if max_cores is not None: while sum(active_task_dict.values()) + cores_requested > max_cores: active_task_dict = { k: v for k, v in active_task_dict.items() if not k.done() } elif max_workers is not None and max_cores is None: while len(active_task_dict.values()) + 1 > max_workers: active_task_dict = { k: v for k, v in active_task_dict.items() if not k.done() } return active_task_dict def _wrap_execute_task_in_separate_process( task_dict: dict, active_task_dict: dict, spawner: type[BaseSpawner], executor_kwargs: dict, max_cores: Optional[int] = None, max_workers: Optional[int] = None, hostname_localhost: Optional[bool] = None, ): """ Submit function to be executed in separate Python process Args: task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}} active_task_dict (dict): Dictionary containing the future objects and the number of cores they require spawner (BaseSpawner): Interface to start process on selected compute resources executor_kwargs (dict): keyword parameters used to initialize the Executor max_cores (int): defines the number cores which can be used in parallel max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of cores which can be used in parallel - just like the max_cores parameter. Using max_cores is recommended, as computers have a limited number of compute cores. hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And in principle any computer should be able to resolve that their own hostname points to the same address as localhost. Still MacOS >= 12 seems to disable this look up for security reasons. So on MacOS it is required to set this option to true Returns: RaisingThread, dict: thread for communicating with the python process which is executing the function and dictionary containing the future objects and the number of cores they require """ resource_dict = task_dict.pop("resource_dict").copy() f = task_dict.pop("future") if "cores" not in resource_dict or ( resource_dict["cores"] == 1 and executor_kwargs["cores"] >= 1 ): resource_dict["cores"] = executor_kwargs["cores"] slots_required = resource_dict["cores"] * resource_dict.get("threads_per_core", 1) active_task_dict = _wait_for_free_slots( active_task_dict=active_task_dict, cores_requested=slots_required, max_cores=max_cores, max_workers=max_workers, ) active_task_dict[f] = slots_required task_kwargs = executor_kwargs.copy() task_kwargs.update(resource_dict) task_kwargs.update( { "task_dict": task_dict, "spawner": spawner, "hostname_localhost": hostname_localhost, "future_obj": f, } ) process = Thread( target=_execute_task_in_thread, kwargs=task_kwargs, ) process.start() return process, active_task_dict def _execute_task_in_thread( task_dict: dict, future_obj: Future, cores: int = 1, spawner: type[BaseSpawner] = MpiExecSpawner, hostname_localhost: Optional[bool] = None, cache_directory: Optional[str] = None, cache_key: Optional[str] = None, log_obj_size: bool = False, error_log_file: Optional[str] = None, worker_id: Optional[int] = None, **kwargs, ): """ Execute a single tasks in parallel using the message passing interface (MPI). Args: task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}} future_obj (Future): A Future representing the given call. cores (int): defines the total number of MPI ranks to use spawner (BaseSpawner): Spawner to start process on selected compute resources hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And in principle any computer should be able to resolve that their own hostname points to the same address as localhost. Still MacOS >= 12 seems to disable this look up for security reasons. So on MacOS it is required to set this option to true cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache". cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be overwritten by setting the cache_key. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource distribution. """ if not execute_task_dict( task_dict=task_dict, future_obj=future_obj, interface=interface_bootup( command_lst=get_interactive_execute_command( cores=cores, ), connections=spawner(cores=cores, **kwargs), hostname_localhost=hostname_localhost, log_obj_size=log_obj_size, worker_id=worker_id, ), cache_directory=cache_directory, cache_key=cache_key, error_log_file=error_log_file, ): future_obj.set_exception( ExecutorlibSocketError("SocketInterface crashed during execution.") )