HPC Cluster Executor#

In contrast to the Single Node Executor and the HPC Job Executor the HPC Submission Executors do not communicate via the zero message queue but instead store the python functions on the file system and uses the job scheduler to handle the dependencies of the Python functions. Consequently, the block allocation block_allocation and the init function init_function are not available in the HPC Cluster Executors. At the same time it is possible to close the Python process which created the Executor, wait until the execution of the submitted Python functions is completed and afterwards reload the results from the cache.

Internally the HPC submission mode is using the Python simple queuing system adatper (pysqa) to connect to HPC job schedulers and the h5py package for serializing the Python functions to store them on the file system. Both packages are optional dependency of executorlib. The installation of the pysqa package and the h5py package are covered in the installation section.

SLURM#

The Simple Linux Utility for Resource Management (SLURM) job scheduler is currently the most commonly used job scheduler for HPC clusters. On shared HPC systems users cannot access compute nodes directly — SLURM acts as the resource controller, accepting job requests, managing the queue, and assigning work to nodes when resources become free.

In the HPC submission mode executorlib internally uses the sbatch command, this is in contrast to the HPC Job Executor which internally uses the srun command.

The connection to the job scheduler is based on the Python simple queuing system adatper (pysqa). It provides a default configuration for most commonly used job schedulers including SLURM, in addition it is also possible to provide the submission template as part of the resource dictionary resource_dict or via the path to the configuration directory with the pysqa_config_directory parameter. All three options are covered in more detail on the pysqa documentation.

Background#

Three commands cover the day-to-day SLURM workflow:

sbatch submits a batch script from the login node. The script carries resource directives on lines starting with #SBATCH:

#!/bin/bash
#SBATCH --job-name=my_job
#SBATCH --output=my_job.out
#SBATCH --ntasks=4           # total MPI ranks
#SBATCH --cpus-per-task=1    # CPU threads per rank
#SBATCH --time=00:30:00      # wall-clock limit (HH:MM:SS)
#SBATCH --partition=regular

srun --mpi=pmix python my_script.py
sbatch job.sh   # submit — returns a job ID immediately

Key #SBATCH directives:

Directive

Meaning

--ntasks=N

Total MPI ranks (processes)

--cpus-per-task=N

CPU threads available to each rank

--mem=NG

Memory per node (e.g. 8G)

--time=HH:MM:SS

Maximum wall-clock time

--partition=name

Queue / partition to target

--dependency=afterok:JOBID

Run only after another job succeeds

srun launches parallel tasks inside an existing allocation. Multiple srun calls can run concurrently using shell backgrounding:

srun --mpi=pmix -n 4 python task_a.py &
srun --mpi=pmix -n 4 python task_b.py &
wait   # block until both finish

squeue and sacct let you inspect the queue and verify resource assignments:

squeue --me                                              # your running/pending jobs
sacct -j 12345 --format=JobID,State,AllocCPUS,Elapsed   # accounting for job 12345

Common squeue state codes: PD (pending), R (running), CG (completing).

MPI-parallel Python#

The Message Passing Interface (MPI) is the dominant parallelisation standard on HPC systems. mpi4py provides Python bindings. A minimal example:

# script.py
from mpi4py import MPI
comm = MPI.COMM_WORLD
print(f"rank {comm.Get_rank()} of {comm.Get_size()}")
srun --mpi=pmix -n 4 python script.py

When multiple independent groups of ranks need to run inside one allocation there are two approaches:

Approach

How

Cross-group communication

Multiple srun calls

Each srun gets its own communicator

Not possible

MPI_Comm_split

One srun, split in Python

Possible via MPI.COMM_WORLD

# communicator splitting — 8 ranks split into two groups of 4
comm = MPI.COMM_WORLD
color = comm.Get_rank() // 4   # group 0 or group 1
sub_comm = comm.Split(color)

SlurmClusterExecutor#

from executorlib import SlurmClusterExecutor

In comparison to the SingleNodeExecutor, the only parameter which is changed in the SlurmClusterExecutor is the requirement to specify the cache directory using the cache_directory="./cache". The rest of the syntax remains exactly the same, to simplify the up-scaling of simulation workflows.

with SlurmClusterExecutor(cache_directory="./cache") as exe:
    future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]
    print([f.result() for f in future_lst])

Specific parameters for SlurmClusterExecutor like the maximum run time "run_time_max", the maximum memory "memory_max" or the submission template for the job submission script "submission_template" can be specified as part of the resource dictionary. Again it is possible to specify the resource dictonary resource_dicionary either for each function in the submit() function or during the initialization of the SlurmClusterExecutor.

submission_template = """\
#!/bin/bash
#SBATCH --output=time.out
#SBATCH --job-name={{job_name}}
#SBATCH --chdir={{working_directory}}
#SBATCH --get-user-env=L
#SBATCH --partition={{partition}}
{%- if run_time_max %}
#SBATCH --time={{ [1, run_time_max // 60]|max }}
{%- endif %}
{%- if dependency %}
#SBATCH --dependency=afterok:{{ dependency | join(',') }}
{%- endif %}
{%- if memory_max %}
#SBATCH --mem={{memory_max}}G
{%- endif %}
#SBATCH --ntasks={{cores}}

{{command}}
"""

with SlurmClusterExecutor(cache_directory="./cache") as exe:
    future = exe.submit(
        sum, [4, 4], 
        resource_dict={
            "submission_template": submission_template, 
            "run_time_max": 180,  # in seconds  
            "partition": "s.cmfe",
        })
    print(future.result())

The template uses Jinja2 syntax. executorlib fills {{cores}} into --ntasks, so cores=1 requests one serial process and cores=4 requests four MPI ranks. With pmi_mode="pmix" the executor additionally wraps the function call in srun --mpi=pmix -n <cores>:

def mpi_calc(i):
    from mpi4py import MPI
    comm = MPI.COMM_WORLD
    return {"rank": comm.Get_rank(), "size": comm.Get_size(), "input": i}

with SlurmClusterExecutor(pmi_mode="pmix", cache_directory="./cache") as exe:
    future = exe.submit(
        mpi_calc, 42,
        resource_dict={"submission_template": submission_template, "cores": 4, "partition": "regular", "run_time_max": 120})
    print(future.result())

With these options executorlib in combination with the SLURM job scheduler provides a lot flexibility to configure the submission of Python functions depending on the specific configuration of the job scheduler.

In case the submission goes wrong, for example when no submission template is defined and executorlib falls back to the default submission template a CalledProcessError is raised:

def echo(i):
    return i

with SlurmClusterExecutor() as exe:
    f1 = exe.submit(echo)
    print(f1.result())

The error message would be something like:

CalledProcessError: Command '['sbatch', '--parsable', '/home/jovyan/executorlib_cache/echoe5873521b7330f831bed941744079e83/run_queue.sh']' returned non-zero exit status 1.

This is not really helpful, as it primarily says the sbatch command was not successful, but the error message which sbatch raised on the command line is not directly accessible from Python. To access the error message you can access the exception of the future object:

excep = f1.exception()
print(excep.output)

This gives:

sbatch: error: invalid partition specified: (null)
sbatch: error: Batch job submission failed: Invalid partition name specified

So the submission failed because no partition was provided, which can be corrected by adding the --partition flag in the submission template, as demonstrated above.

Verifying the Resource Assignment#

After the submission it is often useful to confirm that the job scheduler actually assigned the requested resources. For the SlurmClusterExecutor the SLURM job identifier is stored in the cache and can be retrieved with the get_cache_data() function. This job identifier queue_id can then be passed to the SLURM sacct command to inspect the accounting record of the job:

from executorlib import get_cache_data
import subprocess

for entry in get_cache_data(cache_directory="./cache"):
    if "calc" in str(entry["function"]):
        job_id = entry["queue_id"]
        print(subprocess.check_output(
            ["sacct", "-j", str(job_id), "--format=JobID,State,AllocCPUS,Elapsed"],
            universal_newlines=True,
        ))

The AllocCPUS column reports the number of CPU cores SLURM allocated for the function. In addition to the queue_id each cache entry also contains the full resource_dict, the runtime and the path of the result file, which together provide a complete audit trail of the submission - this is the same information returned by the get_cache_data() function demonstrated for the Single Node Executor.

Flux#

Flux is a modern HPC resource manager developed at Lawrence Livermore National Laboratory (LLNL). On many systems it runs as a secondary scheduler inside a SLURM allocation, enabling fine-grained hierarchical task distribution. Unlike SLURM, Flux can also be installed locally via conda — making it especially suitable for demonstrations, testing, and continuous integration:

conda install -c conda-forge flux-core
flux start   # launch a local Flux instance

This simple installation is explained in the installation section. The features demonstrated below using Flux apply equally to SLURM.

Background#

The key Flux commands map closely onto their SLURM equivalents:

Flux command

SLURM equivalent

Description

flux resource list

sinfo

Show available nodes, cores, and GPUs

flux jobs -a

squeue

List all jobs (running and completed)

flux submit

sbatch

Submit a non-blocking job; returns a job ID immediately

flux run

srun

Launch a blocking job; waits for completion

flux job attach <ID>

Stream output of a previously submitted job

# submit a non-blocking 4-rank MPI job
flux submit -o pmi=pmix --ntasks=4 python script.py

# run a blocking 4-rank MPI job (waits for output)
flux run -o pmi=pmix -n 4 python script.py

The -o pmi=pmix flag matches what SLURM’s --mpi=pmix provides — the same mpi4py scripts run unchanged under both schedulers.

Dependencies#

As already demonstrated for the SingleNodeExecutor the Executor classes from executorlib are capable of resolving the dependencies of serial functions, when concurrent futures Future objects are used as inputs for subsequent function calls. For the case of the HPC submission these dependencies are communicated to the job scheduler, which allows to stop the Python process which created the Executor class, wait until the execution of the submitted Python functions is completed and afterwards restart the Python process for the Executor class and reload the calculation results from the cache defined by the cache_directory parameter.

def add_funct(a, b):
    return a + b
from executorlib import FluxClusterExecutor

with FluxClusterExecutor(cache_directory="./file") as exe:
    future = 0
    for i in range(4, 8):
        future = exe.submit(add_funct, i, future)
    print(future.result())
22

Resource Assignment#

In analogy to the SingleNodeExecutor the resource assignment for the FluxClusterExecutor is handled by either including the resource dictionary parameter resource_dict in the initialization of the FluxClusterExecutor class or in every call of the submit() function.

Below this is demonstrated once for the assignment of multiple CPU cores for the execution of a Python function which internally uses the message passing interface (MPI) via the mpi4py package.

def calc(i):
    from mpi4py import MPI

    size = MPI.COMM_WORLD.Get_size()
    rank = MPI.COMM_WORLD.Get_rank()
    return i, size, rank
with FluxClusterExecutor(cache_directory="./file") as exe:
    fs = exe.submit(calc, 3, resource_dict={"cores": 2})
    print(fs.result())
(3, 1, 0)

Beyond CPU cores and threads which were previously also introduced for the Single Node Executor the HPC Cluster Executors also provide the option to select the available accelerator cards or GPUs, by specifying the "gpus_per_core" parameter in the resource dictionary resource_dict. For demonstration we create a Python function which reads the GPU device IDs and submit it to the FluxClusterExecutor class:

def get_available_gpus():
    import socket
    from tensorflow.python.client import device_lib
    local_device_protos = device_lib.list_local_devices()
    return [
        (x.name, x.physical_device_desc, socket.gethostname()) 
        for x in local_device_protos if x.device_type == 'GPU'
    ]
with FluxClusterExecutor(
    cache_directory="./cache",
    resource_dict={"gpus_per_core": 1}
) as exe:
    fs_1 = exe.submit(get_available_gpus)
    fs_2 = exe.submit(get_available_gpus)
    print(fs_1.result(), fs_2.result())

Disconnecting and Reconnecting#

A key advantage of the HPC Cluster Executors over the HPC Job Executors is that the Python process which created the executor does not need to stay alive while the submitted functions are running. As the functions are submitted as individual scheduler jobs and the results are stored on the file system, the Python process can be closed after the submission - for example to log out of the login node overnight - and the results can be reloaded later. This is controlled with the shutdown() method of the executor, which provides the same wait and cancel_futures parameters as the Executor interface of the Python standard library.

To submit a set of functions and disconnect without waiting, the executor is created without a with statement and shutdown() is called with wait=False and cancel_futures=False:

from executorlib import SlurmClusterExecutor

exe = SlurmClusterExecutor(cache_directory="./cache")
future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]
exe.shutdown(wait=False, cancel_futures=False)

The submitted jobs remain in the queue of the job scheduler and continue to run. At a later point - even from a new Python process - the same functions are submitted again using the same cache_directory. executorlib recognises the cached results and returns them immediately instead of submitting the functions a second time:

from executorlib import SlurmClusterExecutor

with SlurmClusterExecutor(cache_directory="./cache") as exe:
    future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]
    print([f.result() for f in future_lst])

The behaviour of the shutdown() method is summarized in the following table:

shutdown() call

Effect

shutdown(wait=True)

Wait until all submitted functions are finished before continuing - this is the default and also what the with statement does.

shutdown(wait=False, cancel_futures=False)

Return immediately and leave the submitted jobs running in the scheduler queue - used to disconnect from a running workflow.

shutdown(wait=False, cancel_futures=True)

Cancel the submitted jobs which have not started or finished yet.

This disconnect-and-reconnect capability is available for both the SlurmClusterExecutor and the FluxClusterExecutor, as both communicate via the file system rather than via sockets.

Combine both#

While SlurmClusterExecutor submits each Python function as a separate sbatch job from the login node, the HPC Job Executor (SlurmJobExecutor) runs inside an already-running SLURM allocation and dispatches tasks as srun steps — no new jobs enter the queue.

from executorlib import SlurmJobExecutor

# This code runs inside an existing SLURM job
with SlurmJobExecutor(max_workers=4) as exe:
    futures = [exe.submit(sum, [i, i], resource_dict={"cores": 1}) for i in range(4)]
    print([f.result() for f in futures])

The two executor types can be nested: a function submitted via SlurmClusterExecutor (running as an sbatch job) can itself create a SlurmJobExecutor to parallelise sub-tasks as srun steps within the same allocation:

from executorlib import SlurmClusterExecutor, SlurmJobExecutor

def parallel_workflow(n):
    with SlurmJobExecutor(max_workers=n) as inner:
        futures = [inner.submit(sum, [i, i], resource_dict={"cores": 1}) for i in range(n)]
        return [f.result() for f in futures]

with SlurmClusterExecutor(cache_directory="./cache") as outer:
    future = outer.submit(
        parallel_workflow, 4,
        resource_dict={"cores": 1, "partition": "regular", "run_time_max": 300})
    print(future.result())

The sacct output for such a job will show the outer sbatch job together with numbered srun steps (e.g. 12345.0, 12345.1, …), all completing within the same allocation.

Executor

Scheduler command

Typical use

SlurmClusterExecutor

sbatch (one job per function)

Submit from login node

SlurmJobExecutor

srun steps within current job

Inside an existing allocation

FluxClusterExecutor

flux submit

Flux-managed allocation or local testing

Cleaning Cache#

Finally, as the HPC Cluster Executors leverage the file system to communicate serialized Python functions, it is important to clean up the cache directory specified by the cache_directory parameter once the results of the submitted Python functions are no longer needed. The serialized Python functions are stored in binary format using the cloudpickle library for serialization. This format is design for caching but not for long-term storage. The user is responsible for the long-term storage of their data.

import os
import shutil

cache_dir = "./file"
if os.path.exists(cache_dir):
    print(os.listdir(cache_dir))
    try:
        shutil.rmtree(cache_dir)
    except OSError:
        pass
['add_functcb272924f36cbaa9ac79f8d42b6771c8', 'add_funct67b8245bf71c3c6dcb2018663939c72d', 'add_funct144ecf19b5020fccad214df3f4bdabd0', 'add_funct4a5d1f06cca57f0ffbaa3116d39ecc6a', 'add_funct4a5d1f06cca57f0ffbaa3116d39ecc6a_o.h5', 'calcd732cdcbd2c8e68145d99f0be814e667', 'add_funct67b8245bf71c3c6dcb2018663939c72d_o.h5', 'add_funct144ecf19b5020fccad214df3f4bdabd0_o.h5', 'calcd732cdcbd2c8e68145d99f0be814e667_o.h5', 'add_functcb272924f36cbaa9ac79f8d42b6771c8_o.h5']