Source code for executorlib.backend.interactive_parallel
import pickle
import sys
from os.path import abspath
from typing import Optional
import cloudpickle
import zmq
from executorlib.standalone.error import backend_write_error_file
from executorlib.standalone.interactive.backend import call_funct, parse_arguments
from executorlib.standalone.interactive.communication import (
interface_connect,
interface_receive,
interface_send,
interface_shutdown,
)
[docs]
def main() -> None:
"""
Entry point of the program.
This function initializes MPI, sets up the necessary communication, and executes the requested functions.
Returns:
None
"""
from mpi4py import MPI
MPI.pickle.__init__( # type: ignore
cloudpickle.dumps,
cloudpickle.loads,
pickle.HIGHEST_PROTOCOL,
)
mpi_rank_zero = MPI.COMM_WORLD.Get_rank() == 0
mpi_size_larger_one = MPI.COMM_WORLD.Get_size() > 1
argument_dict = parse_arguments(argument_lst=sys.argv)
context: Optional[zmq.Context] = None
socket: Optional[zmq.Socket] = None
if mpi_rank_zero:
context, socket = interface_connect(
host=argument_dict["host"], port=argument_dict["zmqport"]
)
memory = {"executorlib_worker_id": int(argument_dict["worker_id"])}
# required for flux interface - otherwise the current path is not included in the python path
cwd = abspath(".")
if cwd not in sys.path:
sys.path.insert(1, cwd)
while True:
# Read from socket
input_dict: dict = {}
if mpi_rank_zero:
input_dict = interface_receive(socket=socket)
input_dict = MPI.COMM_WORLD.bcast(input_dict, root=0)
# Parse input
if "shutdown" in input_dict and input_dict["shutdown"]:
if mpi_rank_zero:
interface_send(socket=socket, result_dict={"result": True})
interface_shutdown(socket=socket, context=context)
MPI.COMM_WORLD.Barrier()
break
elif (
"fn" in input_dict
and "init" not in input_dict
and "args" in input_dict
and "kwargs" in input_dict
):
# Execute function
try:
output = call_funct(input_dict=input_dict, funct=None, memory=memory)
if mpi_size_larger_one:
output_reply = MPI.COMM_WORLD.gather(output, root=0)
else:
output_reply = output
except Exception as error:
if mpi_rank_zero:
interface_send(
socket=socket,
result_dict={"error": error},
)
backend_write_error_file(
error=error,
apply_dict=input_dict,
)
else:
# Send output
if mpi_rank_zero:
interface_send(socket=socket, result_dict={"result": output_reply})
elif (
"init" in input_dict
and input_dict["init"]
and "args" in input_dict
and "kwargs" in input_dict
):
try:
memory.update(
call_funct(input_dict=input_dict, funct=None, memory=memory)
)
except Exception as error:
if mpi_rank_zero:
interface_send(
socket=socket,
result_dict={"error": error},
)
backend_write_error_file(
error=error,
apply_dict=input_dict,
)
else:
if mpi_rank_zero:
interface_send(socket=socket, result_dict={"result": True})
if __name__ == "__main__":
main()