API

Modules

    Types and constants

    Functions and macros

    Documentation

    ParallelProcessingTools.AbstractThreadLocalType
    abstract type AbstractThreadLocal{T} end

    Abstract type for thread-local values of type T.

    The value for the current thread is accessed via getindex(::AbstractThreadLocal) and `setindex(::AbstractThreadLocal, x).

    To access both regular and thread-local values in a unified manner, use the function getlocalvalue.

    To get the all values across all threads, use the function getallvalues.

    Default implementation is ThreadLocal.

    source
    ParallelProcessingTools.AutoThreadPinningType
    struct AutoThreadPinning

    ParallelProcessingTools default thread pinning mode.

    Constructor:

    AutoThreadPinning(; random::Bool = false, pin_blas::Bool = false)

    Arguments:

    • random: Use system topology based random thread pinning if no thread affinity mask is set (e.g. via SLURM, taskset).

    • blas: Try to pin BLAS threads. Not fully functional due to bugs in BLAS thread pinning (see ThreadPinning issue #105).

    Use with ThreadPinning.pinthreads:

    using ParallelProcessingTools, ThreadPinning
    pinthreads(AutoThreadPinning())
    source
    ParallelProcessingTools.CreateOrIgnoreType
    CreateOrIgnore() isa WriteMode

    Indicates that new files should be created, and that nothing should be done if if the files already exist.

    Causes an error to be thrown if only some of the files exist, to indicate an inconsistent state.

    CreateOrIgnore() is the recommended default when creating files in a parallel computing context, especially if failure or timeouts might result in re-tries. This way, if multiple workers try to create the same file(s), only one file or consistent set of files will be created under the target filenames.

    See WriteMode and write_files.

    source
    ParallelProcessingTools.DynamicAddProcsModeType
    abstract type ParallelProcessingTools.DynamicAddProcsMode <: ParallelProcessingTools.RunProcsMode

    Abstract supertype for worker start modes that use an elastic cluster manager that enables dynamic addition and removal of worker processes.

    Subtypes must implement:

    • ParallelProcessingTools.worker_start_command(runmode::SomeDynamicAddProcsMode, manager::ClusterManager)
    • ParallelProcessingTools.runworkers(runmode::SomeDynamicAddProcsMode, manager::ClusterManager)
    source
    ParallelProcessingTools.FilesToReadType
    struct ParallelProcessingTools.FilesToRead

    Created by read_files, represents a set of (temporary) files to read from.

    With ftr::FilesToRead, use collect(ftr) or iterate(ftr) to access the filenames to read from. Use close(ftr) or close(ftr, true) to close things in good order, indicating success, and use close(ftr, false) or close(ftr, err:Exception) to abort, indicating failure.

    See read_files for example code.

    If aborted or if the Julia process exits without ftr being closed, temporary files are still cleaned up, unless read_files was used with delete_tmp_onerror = false.

    source
    ParallelProcessingTools.FilesToWriteType
    struct ParallelProcessingTools.FilesToWrite

    Created by write_files, represents a set of (temporary) files to write to.

    With ftw::FilesToWrite, use collect(ftw) or iterate(ftw) to access the filenames to write to. Use close(ftw) or close(ftw, true) to close things in good order, indicating success, and use close(ftw, false) or close(ftw, err:Exception) to abort, indicating failure.

    See write_files for example code.

    If aborted or if the Julia process exits without ftw being closed, temporary files are still cleaned up, unless write_files was used with delete_tmp_onerror = false.

    source
    ParallelProcessingTools.FlexWorkerPoolType
    FlexWorkerPool{WP<:AbstractWorkerPool}(
        worker_pids::AbstractVector{Int};
        label::AbstractString = "", maxoccupancy::Int = 1, init_workers::Bool = true
    )::AbstractWorkerPool
    
    FlexWorkerPool(; caching = false, withmyid::Bool = true, kwargs...)

    An flexible worker pool, intended to work with cluster managers that may add and remove Julia processes dynamically.

    If the current process (Distributed.myid()) is part of the pool, resp. if withmyid is true, it will be used as a fallback when no other workers are in are members of the pool (e.g. because no other processes have been added yet or because all other processes in the pool have terminated and been removed from it). The current process will not be used as a fallback when all other workers are currently in use.

    If caching is true, the pool will use a Distributed.CachingPool as the underlying pool, otherwise a Distributed.WorkerPool.

    If maxoccupancyis greater than one, individual workers can be used maxoccupancy times in parallel. So take!(pool) may return the same process ID pid multiple times without a put!(pool, pid) in between. Such a (ideally moderate) oversubscription can be useful to reduce latency-related idle times on workers: e.g. if communication latency to the worker is not short compared the the runtime of the function called on them. Or if the remote functions are often blocked waiting for I/O. Note: Workers still must be put back the same number of times they were taken from the pool, in total.

    If init_workers is true, workers taken from the pool will be guaranteed to be initialized to the current global initialization level (see @always_everywhere).

    WP is the type of the underlying worker pool used, e.g. Distributed.WorkerPool (default) or Distributed.CachingPool.

    Example:

    using ParallelProcessingTools, Distributed
    
    pool = FlexWorkerPool(withmyid = true, maxoccupancy = 3)
    
    workers(pool)
    
    pids = [take!(pool) for _ in 1:3]
    @assert pids == repeat([myid()], 3)
    foreach(pid -> put!(pool, pid), pids)
    
    addprocs(4)
    worker_procs = workers()
    push!.(Ref(pool), worker_procs)
    
    pids = [take!(pool) for _ in 1:4*3]
    @assert pids == repeat(worker_procs, 3)
    foreach(pid -> put!(pool, pid), pids)
    rmprocs(worker_procs)
    
    pids = [take!(pool) for _ in 1:3]
    @assert pids == repeat([myid()], 3)
    foreach(pid -> put!(pool, pid), pids)
    source
    ParallelProcessingTools.HTCondorRunType
    HTCondorRun(;
        n::Int = 1
        condor_flags::Cmd = _default_condor_flags()
        condor_settings::Dict{String,String} = Dict{String,String}()
        julia_flags::Cmd = _default_julia_flags()
        julia_depot::Vector{String} = DEPOT_PATH
        jobfile_dir = homedir()
        env::Dict{String,String} = Dict{String,String}()
        redirect_output::Bool = true
    )

    Mode to add worker processes via HTCondor condor_submit.

    Condor submit script and steering .sh files are stored in jobfile_dir.

    Example:

    julia> runmode = HTCondorRun(n = 10; condor_settings=Dict("universe" => "vanilla", "+queue" => "short", "request_memory" => "4GB"))
    task = runworkers(runmode)
    
    julia> runworkers(runmode)
    [ Info: Submitting HTCondor job: `condor_submit /home/jiling/jl_rAHyFadwHa.sub`
    Submitting job(s)..........
    10 job(s) submitted to cluster 3198291.
    [ Info: HTCondor job submitted: `condor_submit /home/jiling/jl_rAHyFadwHa.sub`
    (nothing, 10)
    
    julia> sleep(10)
    
    julia> nworkers()
    10

    Workers can also be started manually, use worker_start_command(runmode) to get the condor_submit start command and run it from a separate process or so.

    source
    ParallelProcessingTools.NonZeroExitCodeType
    ParallelProcessingTools.NonZeroExitCode(cmd::Cmd, exitcode::Integer) isa Exception

    Exception to indicate that a an external process running cmd failed with the given exit code (not equal zero).

    source
    ParallelProcessingTools.OnLocalhostType
    OnLocalhost(;
        n::Integer = 1
        env::Dict{String,String} = Dict{String,String}()
    ) isa DynamicAddProcsMode

    Mode that runs n worker processes on the current host.

    Example:

    runmode = OnLocalhost(n = 4)
    task, n = runworkers(runmode)
    
    Threads.@async begin
        wait(task)
        @info "SLURM workers have terminated."
    end
    
    @wait_while nprocs()-1 < n)

    Workers can also be started manually, use worker_start_command(runmode) to get the system (shell) command and run it from a separate process or so.

    source
    ParallelProcessingTools.RunProcsModeType
    abstract type ParallelProcessingTools.RunProcsMode

    Abstract supertype for worker process run modes.

    Subtypes must implement:

    • ParallelProcessingTools.runworkers(runmode::SomeRunProcsMode, manager::Distributed.AbstractClusterManager)
    source
    ParallelProcessingTools.SlurmRunType
    SlurmRun(;
        slurm_flags::Cmd = {defaults}
        julia_flags::Cmd = {defaults}
        dir = pwd()
        env::Dict{String,String} = Dict{String,String}()
        redirect_output::Bool = true
    )

    Mode to add worker processes via SLURM srun.

    srun and Julia worker julia command line flags are inferred from SLURM environment variables (e.g. when inside of an salloc or batch job), as well as slurm_flags and julia_flags.

    Workers are started with current directory set to dir.

    Example:

    runmode = SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`)
    task = runworkers(runmode)
    
    Threads.@async begin
        wait(task)
        @info "SLURM workers have terminated."
    end
    
    @wait_while nprocs()-1 < n

    Workers can also be started manually, use worker_start_command(runmode) to get the srun start command and run it from a separate process or so.

    source
    ParallelProcessingTools.ThreadLocalType
    ThreadLocal{T} <: AbstractThreadLocal{T}

    Represents a thread-local value. See AbstractThreadLocal for the API.

    Constructors:

    ThreadLocal{T}() where {T}
    ThreadLocal(value::T) where {T}
    ThreadLocal{T}(f::Base.Callable) where {T}

    Examples:

    tlvalue = ThreadLocal(0)
    @onthreads allthreads() tlvalue[] = Base.Threads.threadid()
    getallvalues(tlvalue) == allthreads()
    rand_value_on_each_thread = ThreadLocal{Float64}(rand)
    all(x -> 0 < x < 1, getallvalues(rand_value_on_each_thread))
    source
    ParallelProcessingTools.@always_everywhereMacro
    @always_everywhere(expr)

    Runs expr on all current Julia processes, but also all future Julia processes after an ensure_procinit()) when managed using a FlexWorkerPool.

    Similar to Distributed.everywhere, but also stores expr so that ensure_procinit can execute it on future worker processes.

    Example:

    @always_everywhere begin
        using SomePackage
        using SomeOtherPackage
        
        some_global_variable = 42
    end

    See also ParallelProcessingTools.add_procinit_code and ParallelProcessingTools.ensure_procinit.

    source
    ParallelProcessingTools.@criticalMacro
    @critical expr

    Mark code in expr as a critical section. Code in critical sections will never be executed in parallel (via multithreading) to any other critical section.

    @critical is very useful to mark non-threadsafe code.

    Example:

    @onthreads allthreads() begin
        @critical @info Base.Threads.threadid()
    end
    
    Without `@critical`, the above will typically crash Julia.
    source
    ParallelProcessingTools.@mt_out_of_orderMacro
    @mt_out_of_order begin expr... end

    Runs all top-level expressions in begin expr... end on parallel multi-threaded tasks.

    Example:

    ``` @mtoutof_order begin a = foo() bar() c = baz() end

    will run a = foo(), bar() and c = baz() in parallel and in arbitrary order, results of assignments will appear in the outside scope.

    source
    ParallelProcessingTools.@onprocsMacro
    @onprocs procsel expr

    Executes expr in parallel on all processes in procsel. Waits until all processes are done. Returns all results as a vector (or as a single scalar value, if procsel itself is a scalar).

    Example:

    using Distributed
    addprocs(2)
    workers() == @onprocs workers() myid()
    source
    ParallelProcessingTools.@onthreadsMacro
    @onthreads threadsel expr

    Execute code in expr in parallel on the threads in threadsel.

    threadsel should be a single thread-ID or a range (or array) of thread-ids. If threadsel == Base.Threads.threadid(), expr is run on the current tread with only minimal overhead.

    Example 1:

    tlsum = ThreadLocal(0.0)
    data = rand(100)
    @onthreads allthreads() begin
        tlsum[] = sum(workpart(data, allthreads(), Base.Threads.threadid()))
    end
    sum(getallvalues(tlsum)) ≈ sum(data)

    Example 2:

    # Assuming 4 threads:
    tl = ThreadLocal(42)
    threadsel = 2:3
    @onthreads threadsel begin
        tl[] = Base.Threads.threadid()
    end
    getallvalues(tl)[threadsel] == [2, 3]
    getallvalues(tl)[[1,4]] == [42, 42]
    source
    ParallelProcessingTools.@userfriendly_exceptionsMacro
    @userfriendly_exceptions expr

    Transforms exceptions originating from expr into more user-friendly ones.

    If multiple exceptions originate from parallel code in expr, only one is rethrown, and TaskFailedExceptions and RemoteExceptions are replaced by the original exceptions that caused them.

    See [inner_exception] and onlyfirst_exception.

    source
    ParallelProcessingTools.@wait_whileMacro
    @wait_while [maxtime=nothing] [timeout_error=false] cond

    Wait while cond is true, using slowly increasing sleep times in between evaluating cond.

    cond may be an arbitrary Julia expression.

    If maxtime is given with an real value, will only wait for maxtime seconds, if the value is zero or negative will not wait at all.

    If timeout_error is true, will throw a TimelimitExceeded exception if the maximum waiting time is exceeded.

    Example, wait for a task with a maxtime:

    task = Threads.@spawn sleep(10)
    timer = Timer(2)
    @wait_while !istaskdone(task) && isopen(timer)
    istaskdone(task) == false
    source
    ParallelProcessingTools.clear_worker_caches!Function
    clear_worker_caches!(pool::AbstractWorkerPool)

    Clear the worker caches (cached function closures, etc.) on the workers In pool.

    Does nothing if the pool doesn't perform any on-worker caching.

    source
    ParallelProcessingTools.ensure_procinitFunction
    ensure_procinit(pid::Int)
    ensure_procinit(pids::AbstractVector{Int} = workers())

    Run process initialization code on the given process(es) if necessary, returns after initialization is complete.

    When using a FlexWorkerPool, worker initialization can safely be run in the background though, as the pool will only offer workers (via take!(pool)) after it has fully initialized them.

    See also ParallelProcessingTools.get_procinit_code and ParallelProcessingTools.add_procinit_code.

    See also ParallelProcessingTools.get_procinit_code, ParallelProcessingTools.ensure_procinit, ParallelProcessingTools.global_procinit_level and ParallelProcessingTools.current_procinit_level.

    source
    ParallelProcessingTools.getallvaluesFunction
    getallvalues(v::AbstractThreadLocal{T})::AbstractVector{T}

    Access the all values (one for each thread) of a thread-local value as a vector. Can only be called in single-threaded code sections.

    source
    ParallelProcessingTools.getlabelFunction
    ParallelProcessingTools.getlabel(obj)

    Returns a descriptive label for obj suitable for using in exceptions and logging messages. Defaults to string(obj).

    source
    ParallelProcessingTools.hasfailedFunction
    ParallelProcessingTools.hasfailed(obj)::Bool

    Checks if obj has failed in some way.

    Supports Task and Process and may be extended to other object types.

    Returns false if isnothing(obj) or ismissing(obj).

    source
    ParallelProcessingTools.idle_sleepMethod
    idle_sleep(n_idle::Integer, t_interval_s, t_max_s)

    Sleep because something has been idle for n_idle times.

    Will sleep for log2(n_idle + 1) * t_interval_s seconds, but at most for t_max_s seconds.

    Guaranteed yield() at least once, even if n_idle is zero.

    source
    ParallelProcessingTools.inner_exceptionFunction
    ParallelProcessingTools.inner_exception(err)

    Replaces exceptions like a TaskFailedException or a RemoteException with their underlying cause. Leaves other exceptions unchanged.

    source
    ParallelProcessingTools.isactiveFunction
    ParallelProcessingTools.isactive(obj)::Bool

    Checks if obj is still active, running or whatever applies to the type of obj.

    Supports Task, Process, Future, Channel, Timer, Base.AsyncCondition and may be extended to other object types.

    Returns false if isnothing(obj) and true if ismissing(obj).

    source
    ParallelProcessingTools.memory_limitFunction
    memory_limit()

    Gets the virtual memory limit for the current Julia process.

    Returns a tuple (soft_limit::Int64, hard_limit::Int64) (in units of bytes). Values of -1 mean unlimited.

    Note

    Currently only works on Linux, simply returns (Int64(-1), Int64(-1)) on other operationg systems.

    source
    ParallelProcessingTools.memory_limit!Function
    memory_limit!(soft_limit::Integer, hard_limit::Integer = -1)

    Sets the virtual memory limit for the current Julia process.

    soft_limit and hard_limit are in units of bytes. Values of -1 mean unlimited. hard_limit must not be stricter than soft_limit, and should typically be set to -1.

    Returns (soft_limit::Int64, hard_limit::Int64).

    Note

    Currently only has an effect on Linux, does nothing and simply returns (Int64(-1), Int64(-1)) on other operating systems.

    source
    ParallelProcessingTools.onworkerFunction
    onworker(
        f::Function, args...;
        pool::AbstractWorkerPool = ppt_worker_pool(),
        maxtime::Real = 0, tries::Integer = 1, label::AbstractString = ""
    )

    Runs f(args...) on an available worker process from the given pool and returns the result.

    If maxtime > 0, a maximum time for the activity is set. If the activity takes longer than maxtime seconds, the process running it (if not the main process) will be terminated.

    label is used for debug-logging.

    If a problem occurs (maxtime or worker failure) while running the activity, reschedules the task if the maximum number of tries has not yet been reached, otherwise throws an exception.

    source
    ParallelProcessingTools.original_exceptionFunction
    ParallelProcessingTools.original_exception(err)

    Replaces (possibly nested) exceptions like a TaskFailedException or RemoteExceptions with the innermost exception, likely to be the one that was thrown originally. Leaves other exceptions unchanged.

    source
    ParallelProcessingTools.ppt_worker_poolMethod
    ppt_worker_pool()

    Returns the default ParallelProcessingTools worker pool.

    If the default instance doesn't exist yet, then a FlexWorkerPool will be created that initially contains Distributed.myid() as the only worker.

    source
    ParallelProcessingTools.printoverMethod
    ParallelProcessingTools.printover(f_show::Function, io::IOBuffer)

    Runs f_show(tmpio) with an IO buffer, then clears the required number of lines on io (typically stdout) and prints the output over them.

    source
    ParallelProcessingTools.read_filesFunction
    function read_files(
        [f_read, ], filenames::AbstractString...;
        use_cache::Bool = true, cache_dir::AbstractString = default_cache_dir(),
        create_cachedir::Bool = true, delete_tmp_onerror::Bool=true,
        verbose::Bool = false
    )

    Reads filenames in an atomic fashion (i.e. only if all filenames exist) on a best-effort basis (depending on the OS and file-system used).

    If a reading function f_read is given, calls f_read(filenames...). The return value of f_read is passed through.

    If use_cache is true, then the files are first copied to the cache directory cache_dir under temporary names, and then read via f_read(temporary_filenames...). The temporary files are deleted after f_read exits (except if an exception is thrown during reading and delete_tmp_onerror is set to false).

    Set ENV["JULIA_DEBUG"] = "ParallelProcessingTools" to see a log of all intermediate steps.

    For example:

    write("foo.txt", "Hello"); write("bar.txt", "World")
    
    result = read_files("foo.txt", "bar.txt", use_cache = true) do foo, bar
        read(foo, String) * " " * read(bar, String)
    end

    If no reading funcion f_read is given, then read_files returns an object of type FilesToRead that holds the temporary filenames. Closing it will clean up temporary files, like described above. So

    ftr = read_files("foo.txt", "bar.txt"; use_cache = true)
    result = try
        foo, bar = collect(ftr)
        data_read = read(foo, String) * " " * read(bar, String)
        close(ftr)
        data_read
    catch err
        close(ftr, err)
        rethrow()
    end

    is equivalent to the example using read_files(f_read, ...)above.

    If create_cachedir is true, then cache_dir will be created if it doesn't exist yet.

    If verbose is true, uses log-level Logging.Info to log file reading, otherwise Logging.Debug.

    On Linux you can set use_cache = true and cache_dir = "/dev/shm" to use the default Linux RAM disk as an intermediate directory.

    See also write_files and ParallelProcessingTools.default_cache_dir.

    source
    ParallelProcessingTools.runworkersFunction
    runworkers(
        runmode::ParallelProcessingTools.RunProcsMode
        manager::Distributed.AbstractClusterManager = ppt_cluster_manager()
    )

    Run Julia worker processes.

    By default ensures that all workers processes use the same Julia project environment as the current process (requires that file systems paths are consistenst across compute hosts).

    The new workers are managed via ppt_cluster_manager() and automatically added to the ppt_worker_pool()

    Returns a tuple (task, n). Here, task::Task is done when all workers have terminated. n is either an Integer, if the number of workers that will be started is known, or Nothing, if the number of workers can't be predicted (accurately).

    Example:

    task, n = runworkers(OnLocalhost(n = 4))

    See also worker_resources().

    source
    ParallelProcessingTools.sleep_nsMethod
    sleep_ns(t_in_ns::Real)

    Sleep for t_in_ns nanoseconds, using a mixture of yield(), sleep(0) and sleep(t) to be able sleep for short times as well as long times with good relative precision.

    Guaranteed to yield() at least once, even if t_in_ns is zero.

    source
    ParallelProcessingTools.split_basename_extMethod
    ParallelProcessingTools.split_basename_ext(file_basename_with_ext::AbstractString)

    Splits a filename (given without its directory path) into a basename without file extension and the file extension. Returns a tuple (basename_noext, ext).

    Example:

    ParallelProcessingTools.split_basename_ext("myfile.tar.gz") == ("myfile", ".tar.gz")
    source
    ParallelProcessingTools.tmp_filenameFunction
    ParallelProcessingTools.tmp_filename(fname::AbstractString)
    ParallelProcessingTools.tmp_filename(fname::AbstractString, dir::AbstractString)

    Returns a temporary filename, based on fname.

    By default, the temporary filename is in the same directory as fname, otherwise in dir.

    Does not create the temporary file, only returns the filename (including directory path).

    source
    ParallelProcessingTools.wait_for_allFunction
    wait_for_all(
        objs...;
        maxtime::Union{Real,Nothing} = nothing, timeout_error::Bool = false
    )
    
    wait_for_all(objs::Union{Tuple,AbstractVector,Base.Generator,Base.ValueIterator}; kwargs...)

    Wait for all of the objs to become ready.

    Readiness of objects is as defined by wouldwait. Objects that are Nothing are ignored, i.e. not waited for.

    See @wait_while for the effects of maxtime and timeout_error.

    Example, wait for two tasks to finish:

    task1 = Threads.@spawn sleep(10)
    task2 = Threads.@spawn sleep(2)
    wait_for_all(task1, task2)
    source
    ParallelProcessingTools.wait_for_anyFunction
    wait_for_any(
        objs...;
        maxtime::Union{Real,Nothing} = nothing, timeout_error::Bool = false
    )
    
    wait_for_all(objs::Union{Tuple,AbstractVector,Base.Generator,Base.ValueIterator}; kwargs...)

    Wait for any of the objects objs to become ready.

    Readiness of objects is as defined by wouldwait. Objects that are Nothing are ignored, i.e. not waited for.

    See @wait_while for the effects of maxtime and timeout_error.

    Example, wait for a task with a timeout:

    task1 = Threads.@spawn sleep(1.0)
    task2 = Threads.@spawn sleep(5.0)
    wait_for_any(task1, task2, maxtime = 3.0)
    istaskdone(task1) == true
    istaskdone(task2) == false

    Similar to waitany (new in Julia v1.12), but applies to a wider range of object types.

    source
    ParallelProcessingTools.whyfailedFunction
    ParallelProcessingTools.whyfailed(obj)::Exception

    Returns a reason, as an Exception instance, why obj has failed.

    Supports Task and Process and may be extended to other object types.

    obj must not be nothing or missing.

    source
    ParallelProcessingTools.worker_local_startcmdMethod
    ParallelProcessingTools.worker_local_startcmd(
        manager::Distributed.ClusterManager;
        julia_cmd::Cmd = _default_julia_cmd(),
        julia_flags::Cmd = _default_julia_flags(),
        julia_project::AbstractString = _default_julia_project()
        redirect_output::Bool = true,
        env::AbstractDict{<:AbstractString,<:AbstractString} = ...,
    )::Cmd

    Return the system command required to start a Julia worker process locally on some host, so that it will connect to manager.

    source
    ParallelProcessingTools.worker_resourcesMethod
    worker_resources

    Get the distributed Julia worker process resources currently available.

    This may take some time as some code needs to be loaded on all processes. Automatically runs ensure_procinit() before querying worker resources.

    Note: CPU ID information will only be available if ThreadPinning is loaded.

    source
    ParallelProcessingTools.worker_start_commandFunction
    worker_start_command(
        runmode::DynamicAddProcsMode,
        manager::ClusterManager = ParallelProcessingTools.ppt_cluster_manager()
    )::Tuple{Cmd,Integer,Integer}

    Return a tuple (cmd, m, n), with system command cmd that needs to be run m times (in parallel) to start n workers.

    source
    ParallelProcessingTools.workpartFunction
    workpart(data::AbstractArray, workersel::AbstractVector{W}, current_worker::W) where {W}

    Get the part of data that the execution unit current_worker is responsible for. Implies a partition of data across the workers listed in workersel.

    For generic data arrays, workpart will return a view. If data is a Range (e.g. indices to be processed), a sub-range will be returned.

    Type W will typically be Int and workersel will usually be a range/array of thread/process IDs.

    Note: workersel is required to be sorted in ascending order and to contain no duplicate entries.

    Examples:

    using Distributed, Base.Threads
    A = rand(100)
    # ...
    sub_A = workpart(A, workers(), myid())
    # ...
    idxs = workpart(eachindex(sub_A), allthreads(), threadid())
    for i in idxs
        # ...
    end
    source
    ParallelProcessingTools.wouldwaitFunction
    ParallelProcessingTools.wouldwait(obj)::Bool

    Returns true if wait(obj) would result in waiting and false if wait(obj) would return (almost) immediately.

    Supports Task, Process, Future, Channel, Timer, Base.AsyncCondition and may be extended to other object types.

    Returns false if isnothing(obj) but obj must not be missing.

    source
    ParallelProcessingTools.write_filesFunction
    function write_files(
        [f_write,] filenames::AbstractString...;
        mode::WriteMode = CreateOrIgnore(),
        use_cache::Bool = false, cache_dir::AbstractString = default_cache_dir(),
        create_dirs::Bool = true, delete_tmp_onerror::Bool=true,
        verbose::Bool = false
    )

    Writes to filenames in an atomic fashion, on a best-effort basis (depending on the OS and file-system used).

    mode determines how to handle pre-existing files, it may be CreateOrIgnore() (default), CreateNew(), CreateOrReplace(), CreateOrModify() or ModifyExisting().

    If a writing function f_write is given, calls f_create(temporary_filenames...). If f_create doesn't throw an exception, the files temporary_filenames are renamed to filenames, otherwise the temporary files are are either deleted (if delete_tmp_onerror is `true) or left in place (e.g. for debugging purposes).

    Set ENV["JULIA_DEBUG"] = "ParallelProcessingTools" to see a log of all intermediate steps.

    For example:

    write_files("foo.txt", "bar.txt", use_cache = true) do tmp_foo, tmp_bar
        write(tmp_foo, "Hello")
        write(tmp_bar, "World")
    end

    write_files(f_write, filenames...) returns either filenames, if the files were (re-)written or nothing if there was nothing to do (depending on mode).

    If no writing funcion f_write is given then, write_files returns an object of type FilesToWrite that holds the temporary filenames. Closing it will, like above, either rename temporary files to filenames or remove them. So

    ftw = write_files("foo.txt", "bar.txt")
    if !isnothing(ftw)
        try
            foo, bar = ftw
            write(foo, "Hello")
            write(bar, "World")
            close(ftw)
        catch err
            close(ftw, err)
            rethrow()
        end
    end

    is equivalent to the example using write_files(f_write, ...)above.

    When modifying files, write_files first copies existing files filenames to temporary_filenames and otherwise behaves as described above.

    If use_cache is true, the temporary_filenames are located in cache_dir and then atomically moved to filenames, otherwise they located next to filenames (so in the same directories).

    If create_dirs is true, target and cache directory paths are created if necessary.

    If verbose is true, uses log-level Logging.Info to log file creation, otherwise Logging.Debug.

    On Linux you can set use_cache = true and cache_dir = "/dev/shm" to use the default Linux RAM disk as an intermediate directory.

    See also read_files and ParallelProcessingTools.default_cache_dir.

    source
    ParallelProcessingTools.write_worker_start_scriptFunction
    write_worker_start_script(
        filename::AbstractString,
        runmode::DynamicAddProcsMode,
        manager::ClusterManager = ParallelProcessingTools.ppt_cluster_manager()
    )

    Writes the system command to start worker processes to a shell script.

    source