Source code for executorlib.standalone.interactive.arguments
from asyncio.exceptions import CancelledError
from concurrent.futures import Future, TimeoutError
from typing import Any, Union
[docs]
def get_future_objects_from_input(args: tuple, kwargs: dict):
"""
Check the input parameters if they contain future objects and which of these future objects are executed
Args:
args (tuple): function arguments
kwargs (dict): function keyword arguments
Returns:
list, boolean: list of future objects and boolean flag if all future objects are already done
"""
future_lst = []
def find_future_in_list(lst):
for el in lst:
if isinstance(el, Future):
future_lst.append(el)
elif isinstance(el, list):
find_future_in_list(lst=el)
elif isinstance(el, dict):
find_future_in_list(lst=el.values())
find_future_in_list(lst=args)
find_future_in_list(lst=kwargs.values())
return future_lst
[docs]
def check_list_of_futures_is_done(future_lst: list[Future]) -> bool:
"""
Check if all future objects in the list of future objects are done
Args:
future_lst (list): list of future objects
Returns:
bool: True if all future objects in the list of future objects are done, False otherwise
"""
return len([future for future in future_lst if future.done()]) == len(future_lst)
[docs]
def get_exception_lst(future_lst: list[Future]) -> list:
"""
Get list of exceptions raised by the future objects in the list of future objects
Args:
future_lst (list): list of future objects
Returns:
list: list of exceptions raised by the future objects in the list of future objects. Returns empty list if no
exception was raised.
"""
return [
f.exception() for f in future_lst if check_exception_was_raised(future_obj=f)
]
[docs]
def check_exception_was_raised(future_obj: Future) -> bool:
"""
Check if exception was raised by future object
Args:
future_obj (Future): future object
Returns:
bool: True if exception was raised, False otherwise
"""
try:
excp = future_obj.exception(timeout=10**-10)
return excp is not None and not isinstance(excp, CancelledError)
except TimeoutError:
return False
[docs]
def update_futures_in_input(args: tuple, kwargs: dict) -> tuple[tuple, dict]:
"""
Evaluate future objects in the arguments and keyword arguments by calling future.result()
Args:
args (tuple): function arguments
kwargs (dict): function keyword arguments
Returns:
tuple, dict: arguments and keyword arguments with each future object in them being evaluated
"""
def get_result(arg: Union[list[Future], Future]) -> Any:
if isinstance(arg, Future):
return arg.result()
elif isinstance(arg, list):
return [get_result(arg=el) for el in arg]
elif isinstance(arg, dict):
return {k: get_result(arg=v) for k, v in arg.items()}
else:
return arg
args = tuple([get_result(arg=arg) for arg in args])
kwargs = {key: get_result(arg=value) for key, value in kwargs.items()}
return args, kwargs