Source code for executorlib.task_scheduler.file.spawner_subprocess

import os
import subprocess
import time
from typing import Optional

from executorlib.standalone.hdf import dump
from executorlib.standalone.inputcheck import check_file_exists
from executorlib.standalone.interactive.spawner import (
    set_current_directory_in_environment,
)


[docs] def execute_in_subprocess( command: list, file_name: str, data_dict: dict, cache_directory: Optional[str] = None, task_dependent_lst: Optional[list] = None, resource_dict: Optional[dict] = None, config_directory: Optional[str] = None, backend: Optional[str] = None, ) -> subprocess.Popen: """ Execute a command in a subprocess. Args: command (list): The command to be executed. file_name (str): Name of the HDF5 file which contains the Python function data_dict (dict): dictionary containing the python function to be executed {"fn": ..., "args": (), "kwargs": {}} cache_directory (str): The directory to store the HDF5 files. task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to []. resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function. Example resource dictionary: { cwd: None, } config_directory (str, optional): path to the config directory. backend (str, optional): name of the backend used to spawn tasks. Returns: subprocess.Popen: The subprocess object. """ if task_dependent_lst is None: task_dependent_lst = [] if os.path.exists(file_name): os.remove(file_name) dump(file_name=file_name, data_dict=data_dict) check_file_exists(file_name=file_name) while len(task_dependent_lst) > 0: task_dependent_lst = [ task for task in task_dependent_lst if task.poll() is None ] if config_directory is not None: raise ValueError( "config_directory parameter is not supported for subprocess spawner." ) if backend is not None: raise ValueError("backend parameter is not supported for subprocess spawner.") cwd = _get_working_directory( cache_directory=cache_directory, resource_dict=resource_dict ) if cwd is not None: os.makedirs(cwd, exist_ok=True) set_current_directory_in_environment() return subprocess.Popen(command, universal_newlines=True, cwd=cwd)
[docs] def terminate_subprocess(task): """ Terminate a subprocess and wait for it to complete. Args: task (subprocess.Popen): The subprocess.Popen instance to terminate """ task.terminate() while task.poll() is None: time.sleep(0.1)
def _get_working_directory( cache_directory: Optional[str] = None, resource_dict: Optional[dict] = None ): if resource_dict is None: resource_dict = {} if "cwd" in resource_dict and resource_dict["cwd"] is not None: return resource_dict["cwd"] else: return cache_directory