API

Modules

    Types and constants

    Functions and macros

    Documentation

    ParallelProcessingTools.AbstractThreadLocalType
    abstract type AbstractThreadLocal{T} end

    Abstract type for thread-local values of type T.

    The value for the current thread is accessed via getindex(::AbstractThreadLocal) and `setindex(::AbstractThreadLocal, x).

    To access both regular and thread-local values in a unified manner, use the function getlocalvalue.

    To get the all values across all threads, use the function getallvalues.

    Default implementation is ThreadLocal.

    source
    ParallelProcessingTools.AddProcsModeType
    abstract type ParallelProcessingTools.AddProcsMode

    Abstract supertype for worker process addition modes.

    Subtypes must implement:

    • ParallelProcessingTools.addworkers(mode::SomeAddProcsMode)

    and may want to specialize:

    • ParallelProcessingTools.worker_init_code(mode::SomeAddProcsMode)
    source
    ParallelProcessingTools.ElasticAddProcsModeType
    abstract type ParallelProcessingTools.ElasticAddProcsMode <: ParallelProcessingTools.AddProcsMode

    Abstract supertype for worker process addition modes that use the elastic cluster manager.

    Subtypes must implement:

    • ParallelProcessingTools.worker_start_command(mode::SomeElasticAddProcsMode, manager::ClusterManagers.ElasticManager)
    • ParallelProcessingTools.start_elastic_workers(mode::SomeElasticAddProcsMode, manager::ClusterManagers.ElasticManager)

    and may want to specialize:

    • ParallelProcessingTools.elastic_addprocs_timeout(mode::SomeElasticAddProcsMode)
    source
    ParallelProcessingTools.ExternalProcessesType
    ParallelProcessingTools.ExternalProcesses(;
        nprocs::Integer = ...
    )

    Add worker processes by starting them externally.

    Will log (via @info) a worker start command and then wait for the workers to connect. The user is responsible for starting the specified number of workers externally using that start command.

    Example:

    mode = ExternalProcesses(nprocs = 4)
    addworkers(mode)

    The user now has to start 4 Julia worker processes externally using the logged start command. This start command can also be retrieved via worker_start_command(mode).

    source
    ParallelProcessingTools.SlurmRunType
    SlurmRun(;
        slurm_flags::Cmd = {defaults}
        julia_flags::Cmd = {defaults}
        dir = pwd()
        user_start::Bool = false
        timeout::Real = 60
    )

    Mode to add worker processes via SLURM srun.

    srun and Julia worker julia command line flags are inferred from SLURM environment variables (e.g. when inside of an salloc or batch job), as well as slurm_flags and julia_flags.

    Workers are started with current directory set to dir.

    Example:

    mode = SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`)
    addworkers(mode)

    If user_start is true, then the SLURM srun-command will not be run automatically, instead it will be logged via @info and the user is responsible for running it. This srun-command can also be retrieved via worker_start_command(mode).

    source
    ParallelProcessingTools.ThreadLocalType
    ThreadLocal{T} <: AbstractThreadLocal{T}

    Represents a thread-local value. See AbstractThreadLocal for the API.

    Constructors:

    ThreadLocal{T}() where {T}
    ThreadLocal(value::T) where {T}
    ThreadLocal{T}(f::Base.Callable) where {T}

    Examples:

    tlvalue = ThreadLocal(0)
    @onthreads allthreads() tlvalue[] = Base.Threads.threadid()
    getallvalues(tlvalue) == allthreads()
    rand_value_on_each_thread = ThreadLocal{Float64}(rand)
    all(x -> 0 < x < 1, getallvalues(rand_value_on_each_thread))
    source
    ParallelProcessingTools.@always_everywhereMacro
    always_everywhere(expr)

    Runs expr on all current Julia processes, but also all future Julia processes added via addworkers).

    Similar to Distributed.everywhere, but also stores expr so that addworkers can execute it automatically on new worker processes.

    source
    ParallelProcessingTools.@criticalMacro
    @critical expr

    Mark code in expr as a critical section. Code in critical sections will never be executed in parallel (via multithreading) to any other critical section.

    @critical is very useful to mark non-threadsafe code.

    Example:

    @onthreads allthreads() begin
        @critical @info Base.Threads.threadid()
    end
    
    Without `@critical`, the above will typically crash Julia.
    source
    ParallelProcessingTools.@mt_out_of_orderMacro
    @mt_out_of_order begin expr... end

    Runs all top-level expressions in begin expr... end on parallel multi-threaded tasks.

    Example:

    ``` @mtoutof_order begin a = foo() bar() c = baz() end

    will run a = foo(), bar() and c = baz() in parallel and in arbitrary order, results of assignments will appear in the outside scope.

    source
    ParallelProcessingTools.@onprocsMacro
    @onprocs procsel expr

    Executes expr in parallel on all processes in procsel. Waits until all processes are done. Returns all results as a vector (or as a single scalar value, if procsel itself is a scalar).

    Example:

    using Distributed
    addprocs(2)
    workers() == @onprocs workers() myid()
    source
    ParallelProcessingTools.@onthreadsMacro
    @onthreads threadsel expr

    Execute code in expr in parallel on the threads in threadsel.

    threadsel should be a single thread-ID or a range (or array) of thread-ids. If threadsel == Base.Threads.threadid(), expr is run on the current tread with only minimal overhead.

    Example 1:

    tlsum = ThreadLocal(0.0)
    data = rand(100)
    @onthreads allthreads() begin
        tlsum[] = sum(workpart(data, allthreads(), Base.Threads.threadid()))
    end
    sum(getallvalues(tlsum)) ≈ sum(data)

    Example 2:

    # Assuming 4 threads:
    tl = ThreadLocal(42)
    threadsel = 2:3
    @onthreads threadsel begin
        tl[] = Base.Threads.threadid()
    end
    getallvalues(tl)[threadsel] == [2, 3]
    getallvalues(tl)[[1,4]] == [42, 42]
    source
    ParallelProcessingTools.addworkersFunction
    addworkers(mode::ParallelProcessingTools.AddProcsMode)

    Add Julia worker processes for LEGEND data processing.

    By default ensures that all workers processes use the same Julia project environment as the current process (requires that file systems paths are consistenst across compute hosts).

    Use @always_everywhere to run initialization code on all current processes and all future processes added via addworkers:

    using Distributed, ParallelProcessingTools
    
    @always_everywhere begin
        using SomePackage
        import SomeOtherPackage
    
        get_global_value() = 42
    end
    
    # ... some code ...
    
    addworkers(LocalProcesses(nprocs = 4))
    
    # `get_global_value` is available even though workers were added later:
    remotecall_fetch(get_global_value, last(workers()))

    See also worker_resources().

    source
    ParallelProcessingTools.elastic_localworker_startcmdMethod
    ParallelProcessingTools.elastic_localworker_startcmd(
        manager::Distributed.ClusterManager;
        julia_cmd::Cmd = _default_julia_cmd(),
        julia_flags::Cmd = _default_julia_flags(),
        julia_project::AbstractString = _default_julia_project()
    )::Cmd

    Return the system command required to start a Julia worker process, that will connect to manager, on the current host.

    source
    ParallelProcessingTools.getallvaluesFunction
    getallvalues(v::AbstractThreadLocal{T})::AbstractVector{T}

    Access the all values (one for each thread) of a thread-local value as a vector. Can only be called in single-threaded code sections.

    source
    ParallelProcessingTools.start_elastic_workersFunction
    ParallelProcessingTools.start_elastic_workers(mode::ElasticAddProcsMode, manager::ClusterManagers.ElasticManager)::Int

    Spawn worker processes as specified by mode and return the number of expected additional workers.

    source
    ParallelProcessingTools.worker_start_commandFunction
    ParallelProcessingTools.worker_start_command(
        mode::ElasticAddProcsMode,
        manager::ClusterManagers.ElasticManager = ParallelProcessingTools.default_elastic_manager()
    )::Tuple{Cmd,Integer}

    Return the system command to start worker processes as well as the number of workers to start.

    source
    ParallelProcessingTools.workpartFunction
    workpart(data::AbstractArray, workersel::AbstractVector{W}, current_worker::W) where {W}

    Get the part of data that the execution unit current_worker is responsible for. Implies a partition of data across the workers listed in workersel.

    For generic data arrays, workpart will return a view. If data is a Range (e.g. indices to be processed), a sub-range will be returned.

    Type W will typically be Int and workersel will usually be a range/array of thread/process IDs.

    Note: workersel is required to be sorted in ascending order and to contain no duplicate entries.

    Examples:

    using Distributed, Base.Threads
    A = rand(100)
    # ...
    sub_A = workpart(A, workers(), myid())
    # ...
    idxs = workpart(eachindex(sub_A), allthreads(), threadid())
    for i in idxs
        # ...
    end
    source