ParallelProcessingTools.jl

This Julia package provides some tools to ease multithreaded and distributed programming.

Distributed computing

Julia provides native support for distributed computing on multiple Julia processes that run in parallel on the same or on different machines. ParallelProcessingTools add some machinery to make some aspects of this even easier.

An internal elastic cluster manager (ppt_cluster_manager, a modified version of ParallelProcessingTools.ElasticManager), started on demand, allows for starting (runworkers) an stopping (stopworkers) worker processes in a dynamic fashion. The worker processes can also be started outside of the Julia session (worker_start_command and write_worker_start_script), this can be useful to add worker to a running Julia session via manually controlled batch jobs, for example. Workers can be started locally (OnLocalhost), via SLURM (SlurmRun), or via HTCondor (HTCondorRun). Other methods to start workers (e.g. via SSH) may be added in the future (contributions are very welcome).

The elastic cluster manager automatically adds new workers to an automatically created dynamic worker pool (ppt_worker_pool) of type FlexWorkerPool that optionally supports oversubscription. Users can take! workers from the pool and put! them back, or use onworker to send work to workers in the pool without exceeding their maximum occupancy.

Since workers can appear and disappear dynamically, initializing them (loading packages, etc.) via the standard Distributed.@everywhere macro is problematic, as workers added afterwards won't be initialized. Parallel processing tools provides the macro @always_everywhere to run code globally on all current processes, but also store the code so it can be run again on future new worker processes. Workers that are part of a FlexWorkerPool will be updated automatically on take! and onworker. You can also use ensure_procinit to manually update all workers to all @always_everywhere used so far.

AutoThreadPinning, in conjunction with the package ThreadPinning, provides a convenient way to perform automatic thread pinning (e.g. inside of @always_everywhere, to apply thead pinning to all processes). Note that ThreadPinning.pinthreads(AutoThreadPinning()) works on a best-effort basis and that advanced applications may require customized thread pinning for best performance.

Some batch system configurations can result in whole Julia processes, or even a whole batch job, being terminated if a process exceeds its memory limit. In such cases, you can try to gain a softer failure mode by setting a custom (slightly smaller) memory limit using memory_limit!.

For example:

ENV["JULIA_DEBUG"] = "ParallelProcessingTools"
ENV["JULIA_WORKER_TIMEOUT"] = "120"

using ParallelProcessingTools, Distributed

@always_everywhere begin
    using ParallelProcessingTools
    using Statistics

    import ThreadPinning
    pinthreads_auto()

    # Optional: Set a custom memory limit for worker processes:
    # myid() != 1 && memory_limit!(8 * 1000^3) # 8 GB
end

runmode = OnLocalhost(n = 4)
# runmode = lkSlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`)

display(worker_start_command(runmode))

# Add some workers and initialize with all `@always_everywhere` code:
old_nprocs = nprocs()
_, n = runworkers(runmode)
@wait_while nprocs() < old_nprocs + n
ensure_procinit()

# Show worker resources:
pool = ppt_worker_pool()
display(pool)
display(worker_resources())

# Confirm that Distributions is loaded on a worker:
worker = last(workers())
@fetchfrom worker mean(rand(100))

# Some more init code
@always_everywhere begin
    X = rand(100)
end

# Add some more workers, we won't run `ensure_procinit()` manually this time:
old_nprocs = nprocs()
_, n = runworkers(runmode)
@wait_while nprocs() < old_nprocs + n

# Worker hasn't run @always_everywhere code yet, so it doesn't have `mean`:
worker = last(workers())
display(@return_exceptions @userfriendly_exceptions begin
    @fetchfrom worker mean(X)
end)

# Using `take!` on a `FlexWorkerPool` automatically runs init code as necessary:
pid = take!(pool)
try
    remotecall_fetch(() -> mean(X), pid)
finally
    put!(pool, pid)
end

# `onworker` (using the default `FlexWorkerPool` here) does the same:
onworker(mean, X)

# If we don't need workers processes for a while, let's stop them:
stopworkers()

We can also use SLURM batch scripts, like this (e.g. "batchtest.jl"):

#!/usr/bin/env julia
#SBATCH --ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G --time=00:15:00

using Pkg; pkg"activate @SOME_JULIA_ENVIRONMENT"

ENV["JULIA_DEBUG"] = "ParallelProcessingTools"
ENV["JULIA_WORKER_TIMEOUT"] = "120"

using ParallelProcessingTools, Distributed

@always_everywhere begin
    using ParallelProcessingTools
    import ThreadPinning
    pinthreads_auto()
end

_, n = runworkers(SlurmRun(slurm_flags = `--cpu-bind=cores --mem-bind=local`))
@wait_while maxtime=240 nprocs() < n + 1

resources = worker_resources()
display(resources)

stopworkers()

This should run with a simple

sbatch -o out.txt batchtest.jl

and "out.txt" should then contain debugging output and a list of the worker resources.

Multithreading

To test multithreading performance and help debug and optimize multithreaded code, ParallelProcessingTools provides the utility macros @onthreads to run code explicitly on the selected Julia threads (all threads can be listed using allthreads).

You can use the macro @critical to prevent code that may suffer from race conditions in parallel to other code fenced by @critical.

The macro @mt_out_of_order is useful to run different code on in parallel on Julia threads.

Waiting and sleeping

In a parallel computing scenario, on threads, distributed processes or both, or when dealing with I/O operations, code often needs to wait. In addition a timeout mechanism is often necessary. Julia's standard wait function can only waits a single object without a timeout. (waitany, requires Julia >= v1.12, can be used to wait for multiple tasks).

ParallelProcessingTools provides a very flexible macro @wait_while to wait for custom conditions with an optional timeout, as well as the functions wait_for_all and wait_for_any that can wait for different kinds of objects, also with an optional timeout.

The functions sleep_ns and idle_sleep can be used to implement custom scenarios that require precise sleeping for both very short and long intervals.

Exception handling

Exceptions throws during remote code execution can be complex, nested and sometimes hard to understand. You can use the functions inner_exception, onlyfirst_exception and original_exception to get to the underlying reason of a failure more easily. The macro @userfriendly_exceptions automatizes this to some extent for a given piece of code.

To get an exception "in hand" for further analysis, you can use the macro @return_exceptions to make (possibly failing) code return the exceptions instead of throwing it.

File I/O

File handling can become more challenging when working in a parallel and possibly distributed fashion. Code or whole workers can crash, resulting in corrupt files, or workers may become disconnected, but still write files and clash with restarted code (resulting in race conditions and may also result in corrupt files).

ParallelProcessingTools provides the functions write_files and read_files to implement atomic file operations, on a best-effort basis (depending on the operating system and underlying file systems).