executorlib.task_scheduler.interactive.blockallocation.BlockAllocationTaskScheduler#

class executorlib.task_scheduler.interactive.blockallocation.BlockAllocationTaskScheduler(max_workers: int = 1, executor_kwargs: dict | None = None, spawner: type[~executorlib.standalone.interactive.spawner.BaseSpawner] = <class 'executorlib.standalone.interactive.spawner.MpiExecSpawner'>, validator: ~typing.Callable = <function validate_resource_dict>, restart_limit: int = 0)[source]#

Bases: TaskSchedulerBase

The executorlib.interactive.executor.InteractiveExecutor leverages the exeutorlib executor to distribute python tasks on a workstation or inside a queuing system allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the executorlib.interactive.executor.InteractiveExecutor can be executed in a serial python process and does not require the python script to be executed with MPI. Consequently, it is primarily an abstraction of its functionality to improves the usability in particular when used in combination with Jupyter notebooks.

Parameters:
  • max_workers (int) – defines the number workers which can execute functions in parallel

  • executor_kwargs (dict) – keyword arguments for the executor

  • spawner (BaseSpawner) – interface class to initiate python processes

  • restart_limit (int) – The maximum number of restarting worker processes.

Examples

>>> import numpy as np
>>> from executorlib.task_scheduler.interactive.blockallocation import BlockAllocationTaskScheduler
>>>
>>> def calc(i, j, k):
>>>     from mpi4py import MPI
>>>     size = MPI.COMM_WORLD.Get_size()
>>>     rank = MPI.COMM_WORLD.Get_rank()
>>>     return np.array([i, j, k]), size, rank
>>>
>>> def init_k():
>>>     return {"k": 3}
>>>
>>> with BlockAllocationTaskScheduler(max_workers=2, executor_kwargs={"init_function": init_k}) as p:
>>>     fs = p.submit(calc, 2, j=4)
>>>     print(fs.result())
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
__init__(max_workers: int = 1, executor_kwargs: dict | None = None, spawner: type[~executorlib.standalone.interactive.spawner.BaseSpawner] = <class 'executorlib.standalone.interactive.spawner.MpiExecSpawner'>, validator: ~typing.Callable = <function validate_resource_dict>, restart_limit: int = 0)[source]#

Initialize the TaskSchedulerBase.

Parameters:
  • max_cores (int, optional) – Maximum number of cores available to the scheduler. Tasks requesting more cores than this will be rejected. Defaults to None (unlimited).

  • validator (Callable) – Function used to validate per-task resource dicts before submission. Defaults to the no-op validate_resource_dict.

Methods

__init__([max_workers, executor_kwargs, ...])

Initialize the TaskSchedulerBase.

batched(iterable, n)

Batch futures from the iterable into tuples of length n.

map(fn, *iterables[, timeout, chunksize])

Returns an iterator equivalent to map(fn, iter).

shutdown([wait, cancel_futures])

Clean-up the resources associated with the Executor.

submit(fn, *args[, resource_dict])

Submits a callable to be executed with the given arguments.

Attributes

future_queue

Get the future queue.

info

Get the information about the executor.

max_workers

Return the configured number of parallel workers, or None if unconstrained.

batched(iterable: list[Future], n: int) list[Future]#

Batch futures from the iterable into tuples of length n. The last batch may be shorter than n.

Parameters:
  • iterable (list) – list of future objects to batch based on which future objects finish first

  • n (int) – badge size

Returns:

list of future objects one for each batch

Return type:

list[Future]

property future_queue: Queue | None#

Get the future queue.

Returns:

The future queue.

Return type:

queue.Queue

property info: dict | None#

Get the information about the executor.

Returns:

Information about the executor.

Return type:

Optional[dict]

map(fn: Callable, *iterables, timeout: float | None = None, chunksize: int = 1)#

Returns an iterator equivalent to map(fn, iter).

Parameters:
  • fn – A callable that will take as many arguments as there are passed iterables.

  • timeout – The maximum number of seconds to wait. If None, then there is no limit on the wait time.

  • chunksize – The size of the chunks the iterable will be broken into before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor.

Returns:

map(func, *iterables) but the calls may be evaluated out-of-order.

Return type:

An iterator equivalent to

Raises:
  • TimeoutError – If the entire result iterator could not be generated before the given timeout.

  • Exception – If fn(*args) raises for any values.

property max_workers: int#

Return the configured number of parallel workers, or None if unconstrained.

Returns:

The max_workers value stored in process kwargs, or None.

Return type:

Optional[int]

shutdown(wait: bool = True, *, cancel_futures: bool = False)[source]#

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters:
  • wait (bool) – If True then shutdown will not return until all running futures have finished executing and the resources used by the parallel_executors have been reclaimed.

  • cancel_futures (bool) – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.

submit(fn: Callable, *args, resource_dict: dict | None = None, **kwargs) Future[source]#

Submits a callable to be executed with the given arguments.

Schedules the callable to be executed as fn(*args, **kwargs) and returns a Future instance representing the execution of the callable.

Parameters:
  • fn (Callable) – function to submit for execution

  • args – arguments for the submitted function

  • kwargs – keyword arguments for the submitted function

  • resource_dict (dict) –

    A dictionary of resources required by the task. With the following keys: - cores (int): number of MPI cores to be used for each function call - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_core (int): number of GPUs per worker - defaults to 0 - cwd (str/None): current working directory where the parallel python task is executed - openmpi_oversubscribe (bool): adds the –oversubscribe command line flag (OpenMPI

    and SLURM only) - default False

    • slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM

      only)

    • error_log_file (str): Name of the error log file to use for storing exceptions

      raised by the Python functions submitted to the Executor.

Returns:

A Future representing the given call.

Return type:

Future