Source code for executorlib.backend.interactive_serial
import sys
from os.path import abspath
from typing import Optional
from executorlib.standalone.error import backend_write_error_file
from executorlib.standalone.interactive.backend import call_funct, parse_arguments
from executorlib.standalone.interactive.communication import (
interface_connect,
interface_receive,
interface_send,
interface_shutdown,
)
[docs]
def main(argument_lst: Optional[list[str]] = None):
"""
The main function of the program.
Args:
argument_lst (Optional[List[str]]): List of command line arguments. If None, sys.argv will be used.
Returns:
None
"""
if argument_lst is None:
argument_lst = sys.argv
argument_dict = parse_arguments(argument_lst=argument_lst)
context, socket = interface_connect(
host=argument_dict["host"], port=argument_dict["zmqport"]
)
memory = {"executorlib_worker_id": int(argument_dict["worker_id"])}
# required for flux interface - otherwise the current path is not included in the python path
cwd = abspath(".")
if cwd not in sys.path:
sys.path.insert(1, cwd)
while True:
# Read from socket
input_dict = interface_receive(socket=socket)
# Parse input
if "shutdown" in input_dict and input_dict["shutdown"]:
interface_send(socket=socket, result_dict={"result": True})
interface_shutdown(socket=socket, context=context)
break
elif (
"fn" in input_dict
and "init" not in input_dict
and "args" in input_dict
and "kwargs" in input_dict
):
# Execute function
try:
output = call_funct(input_dict=input_dict, funct=None, memory=memory)
except Exception as error:
interface_send(
socket=socket,
result_dict={"error": error},
)
backend_write_error_file(
error=error,
apply_dict=input_dict,
)
else:
# Send output
interface_send(socket=socket, result_dict={"result": output})
elif (
"init" in input_dict
and input_dict["init"]
and "args" in input_dict
and "kwargs" in input_dict
):
try:
memory.update(
call_funct(input_dict=input_dict, funct=None, memory=memory)
)
except Exception as error:
interface_send(
socket=socket,
result_dict={"error": error},
)
backend_write_error_file(
error=error,
apply_dict=input_dict,
)
else:
interface_send(socket=socket, result_dict={"result": True})
if __name__ == "__main__":
main(argument_lst=sys.argv)