executorlib.task_scheduler.interactive.onetoone.OneProcessTaskScheduler#
- class executorlib.task_scheduler.interactive.onetoone.OneProcessTaskScheduler(max_cores: int | None = None, max_workers: int | None = None, executor_kwargs: dict | None = None, spawner: type[~executorlib.standalone.interactive.spawner.BaseSpawner] = <class 'executorlib.standalone.interactive.spawner.MpiExecSpawner'>, validator: ~typing.Callable = <function validate_resource_dict>)[source]#
Bases:
TaskSchedulerBaseThe executorlib.interactive.executor.InteractiveStepExecutor leverages the executorlib executor to distribute python tasks. In contrast to the mpi4py.futures.MPIPoolExecutor the executorlib.interactive.executor.InteractiveStepExecutor can be executed in a serial python process and does not require the python script to be executed with MPI. Consequently, it is primarily an abstraction of its functionality to improve the usability in particular when used in combination with Jupyter notebooks.
- Parameters:
max_cores (int) – defines the number workers which can execute functions in parallel
executor_kwargs (dict) – keyword arguments for the executor
spawner (BaseSpawner) – interface class to initiate python processes
Examples
>>> import numpy as np >>> from executorlib.task_scheduler.interactive.onetoone import OneProcessTaskScheduler >>> >>> def calc(i, j, k): >>> from mpi4py import MPI >>> size = MPI.COMM_WORLD.Get_size() >>> rank = MPI.COMM_WORLD.Get_rank() >>> return np.array([i, j, k]), size, rank >>> >>> with OneProcessTaskScheduler(max_cores=2) as p: >>> fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2}) >>> print(fs.result())
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
- __init__(max_cores: int | None = None, max_workers: int | None = None, executor_kwargs: dict | None = None, spawner: type[~executorlib.standalone.interactive.spawner.BaseSpawner] = <class 'executorlib.standalone.interactive.spawner.MpiExecSpawner'>, validator: ~typing.Callable = <function validate_resource_dict>)[source]#
Initialize the TaskSchedulerBase.
- Parameters:
max_cores (int, optional) – Maximum number of cores available to the scheduler. Tasks requesting more cores than this will be rejected. Defaults to None (unlimited).
validator (Callable) – Function used to validate per-task resource dicts before submission. Defaults to the no-op validate_resource_dict.
Methods
__init__([max_cores, max_workers, ...])Initialize the TaskSchedulerBase.
batched(iterable, n)Batch futures from the iterable into tuples of length n.
map(fn, *iterables[, timeout, chunksize])Returns an iterator equivalent to map(fn, iter).
shutdown([wait, cancel_futures])Clean-up the resources associated with the Executor.
submit(fn, /, *args[, resource_dict])Submits a callable to be executed with the given arguments.
Attributes
Get the future queue.
Get the information about the executor.
Return the configured number of parallel workers, or None if unconstrained.
- batched(iterable: list[Future], n: int) list[Future]#
Batch futures from the iterable into tuples of length n. The last batch may be shorter than n.
- Parameters:
iterable (list) – list of future objects to batch based on which future objects finish first
n (int) – badge size
- Returns:
list of future objects one for each batch
- Return type:
list[Future]
- property future_queue: Queue | None#
Get the future queue.
- Returns:
The future queue.
- Return type:
queue.Queue
- property info: dict | None#
Get the information about the executor.
- Returns:
Information about the executor.
- Return type:
Optional[dict]
- map(fn: Callable, *iterables, timeout: float | None = None, chunksize: int = 1)#
Returns an iterator equivalent to map(fn, iter).
- Parameters:
fn – A callable that will take as many arguments as there are passed iterables.
timeout – The maximum number of seconds to wait. If None, then there is no limit on the wait time.
chunksize – The size of the chunks the iterable will be broken into before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor.
- Returns:
map(func, *iterables) but the calls may be evaluated out-of-order.
- Return type:
An iterator equivalent to
- Raises:
TimeoutError – If the entire result iterator could not be generated before the given timeout.
Exception – If fn(*args) raises for any values.
- property max_workers: int | None#
Return the configured number of parallel workers, or None if unconstrained.
- Returns:
The max_workers value stored in process kwargs, or None.
- Return type:
Optional[int]
- shutdown(wait: bool = True, *, cancel_futures: bool = False)#
Clean-up the resources associated with the Executor.
It is safe to call this method several times. Otherwise, no other methods can be called after this one.
- Parameters:
wait (bool) – If True then shutdown will not return until all running futures have finished executing and the resources used by the parallel_executors have been reclaimed.
cancel_futures (bool) – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.
- submit(fn: Callable, /, *args, resource_dict: dict | None = None, **kwargs) Future#
Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as fn(*args, **kwargs) and returns a Future instance representing the execution of the callable.
- Parameters:
fn (callable) – function to submit for execution
args – arguments for the submitted function
kwargs – keyword arguments for the submitted function
resource_dict (dict) –
A dictionary of resources required by the task. With the following keys: - cores (int): number of MPI cores to be used for each function call - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_core (int): number of GPUs per worker - defaults to 0 - cwd (str/None): current working directory where the parallel python task is executed - openmpi_oversubscribe (bool): adds the –oversubscribe command line flag (OpenMPI and
SLURM only) - default False
slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
- Returns:
A Future representing the given call.
- Return type:
Future