Source code for executorlib.task_scheduler.file.backend
import os
import time
from typing import Any
from executorlib.standalone.error import backend_write_error_file
from executorlib.standalone.hdf import dump, load
from executorlib.task_scheduler.file.shared import FutureItem
[docs]
def backend_load_file(file_name: str) -> dict:
"""
Load the data from an HDF5 file and convert FutureItem objects to their results.
Args:
file_name (str): The name of the HDF5 file.
Returns:
dict: The loaded data from the file.
"""
apply_dict = load(file_name=file_name)
apply_dict["args"] = [
arg if not _isinstance(arg, FutureItem) else arg.result()
for arg in apply_dict["args"]
]
apply_dict["kwargs"] = {
key: arg if not _isinstance(arg, FutureItem) else arg.result()
for key, arg in apply_dict["kwargs"].items()
}
return apply_dict
[docs]
def backend_write_file(file_name: str, output: Any, runtime: float) -> None:
"""
Write the output to an HDF5 file.
Args:
file_name (str): The name of the HDF5 file.
output (Any): The output to be written.
runtime (float): Time for executing function.
Returns:
None
"""
file_name_out = os.path.splitext(file_name)[0][:-2]
os.rename(file_name, file_name_out + "_r.h5")
try:
if "result" in output:
dump(
file_name=file_name_out + "_r.h5",
data_dict={"output": output["result"], "runtime": runtime},
)
else:
dump(
file_name=file_name_out + "_r.h5",
data_dict={"error": output["error"], "runtime": runtime},
)
except Exception as serialize_error:
# Serialization failed — store the error so the job is not stuck
dump(
file_name=file_name_out + "_r.h5",
data_dict={"error": serialize_error, "runtime": runtime},
)
os.rename(file_name_out + "_r.h5", file_name_out + "_o.h5")
[docs]
def backend_execute_task_in_file(file_name: str) -> None:
"""
Execute the task stored in a given HDF5 file.
Args:
file_name (str): The file name of the HDF5 file as an absolute path.
Returns:
None
"""
apply_dict = {}
time_start = time.time()
try:
apply_dict = backend_load_file(file_name=file_name)
result = {
"result": apply_dict["fn"].__call__(
*apply_dict["args"], **apply_dict["kwargs"]
)
}
except Exception as error:
result = {"error": error}
backend_write_error_file(
error=error,
apply_dict=apply_dict,
)
backend_write_file(
file_name=file_name,
output=result,
runtime=time.time() - time_start,
)
def _isinstance(obj: Any, cls: type) -> bool:
return str(obj.__class__) == str(cls)