Source code for executorlib.standalone.batched
from concurrent.futures import Future
[docs]
def batched_futures(
lst: list[Future], nested_skip_lst: list[Future[list]], n: int
) -> tuple[bool, list[Future]]:
"""
Batch n completed future objects. If the number of completed futures is smaller than n and the end of the batch is
not reached yet, then an empty list is returned. If n future objects are done, which are not included in the skip_set
then they are returned as batch.
Args:
lst (list): list of all future objects
nested_skip_lst (list): list of future objects, which contain the list of future objects ids which should be skipped for the batch
n (int): batch size
Returns:
list: results of the batched futures
"""
skip_set = {fid for f in nested_skip_lst for fid in f.result()}
done_lst = []
failed_lst = []
n_expected = min(n, len(lst) - len(skip_set))
for v in lst:
if id(v) not in skip_set and v.done():
if v.exception() is not None:
failed_lst.append(v)
elif id(v) not in skip_set and v.done():
done_lst.append(v)
if len(done_lst) == n_expected:
return True, done_lst
if (len(lst) - len(skip_set)) == len(failed_lst):
return (
False,
failed_lst[:n_expected],
) # raise the exception only after all futures have failed
else:
return True, []