API
Modules
Types and constants
ParallelProcessingTools.AbstractThreadLocal
ParallelProcessingTools.AddProcsMode
ParallelProcessingTools.ElasticAddProcsMode
ParallelProcessingTools.ExternalProcesses
ParallelProcessingTools.LocalProcesses
ParallelProcessingTools.SlurmRun
ParallelProcessingTools.ThreadLocal
Functions and macros
ParallelProcessingTools.@always_everywhere
ParallelProcessingTools.@critical
ParallelProcessingTools.@mt_out_of_order
ParallelProcessingTools.@onprocs
ParallelProcessingTools.@onthreads
ParallelProcessingTools.addworkers
ParallelProcessingTools.allthreads
ParallelProcessingTools.default_elastic_manager
ParallelProcessingTools.elastic_addprocs_timeout
ParallelProcessingTools.elastic_localworker_startcmd
ParallelProcessingTools.getallvalues
ParallelProcessingTools.getlocalvalue
ParallelProcessingTools.pinthreads_auto
ParallelProcessingTools.pinthreads_distributed
ParallelProcessingTools.shutdown_workers_atexit
ParallelProcessingTools.start_elastic_workers
ParallelProcessingTools.worker_init_code
ParallelProcessingTools.worker_resources
ParallelProcessingTools.worker_start_command
ParallelProcessingTools.workpart
Documentation
ParallelProcessingTools.AbstractThreadLocal
— Typeabstract type AbstractThreadLocal{T} end
Abstract type for thread-local values of type T
.
The value for the current thread is accessed via getindex(::AbstractThreadLocal)
and `setindex(::AbstractThreadLocal, x).
To access both regular and thread-local values in a unified manner, use the function getlocalvalue
.
To get the all values across all threads, use the function getallvalues
.
Default implementation is ThreadLocal
.
ParallelProcessingTools.AddProcsMode
— Typeabstract type ParallelProcessingTools.AddProcsMode
Abstract supertype for worker process addition modes.
Subtypes must implement:
ParallelProcessingTools.addworkers(mode::SomeAddProcsMode)
and may want to specialize:
ParallelProcessingTools.worker_init_code(mode::SomeAddProcsMode)
ParallelProcessingTools.ElasticAddProcsMode
— Typeabstract type ParallelProcessingTools.ElasticAddProcsMode <: ParallelProcessingTools.AddProcsMode
Abstract supertype for worker process addition modes that use the elastic cluster manager.
Subtypes must implement:
ParallelProcessingTools.worker_start_command(mode::SomeElasticAddProcsMode, manager::ClusterManagers.ElasticManager)
ParallelProcessingTools.start_elastic_workers(mode::SomeElasticAddProcsMode, manager::ClusterManagers.ElasticManager)
and may want to specialize:
ParallelProcessingTools.elastic_addprocs_timeout(mode::SomeElasticAddProcsMode)
ParallelProcessingTools.ExternalProcesses
— TypeParallelProcessingTools.ExternalProcesses(;
nprocs::Integer = ...
)
Add worker processes by starting them externally.
Will log (via @info
) a worker start command and then wait for the workers to connect. The user is responsible for starting the specified number of workers externally using that start command.
Example:
mode = ExternalProcesses(nprocs = 4)
addworkers(mode)
The user now has to start 4 Julia worker processes externally using the logged start command. This start command can also be retrieved via worker_start_command(mode)
.
ParallelProcessingTools.LocalProcesses
— TypeLocalProcesses(;
nprocs::Integer = 1
)
Mode to add nprocs
worker processes on the current host.
ParallelProcessingTools.SlurmRun
— TypeSlurmRun(;
slurm_flags::Cmd = {defaults}
julia_flags::Cmd = {defaults}
dir = pwd()
user_start::Bool = false
timeout::Real = 60
)
Mode to add worker processes via SLURM srun
.
srun
and Julia worker julia
command line flags are inferred from SLURM environment variables (e.g. when inside of an salloc
or batch job), as well as slurm_flags
and julia_flags
.
Workers are started with current directory set to dir
.
Example:
mode = SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`)
addworkers(mode)
If user_start
is true
, then the SLURM srun-command will not be run automatically, instead it will be logged via @info
and the user is responsible for running it. This srun-command can also be retrieved via worker_start_command(mode)
.
ParallelProcessingTools.ThreadLocal
— TypeThreadLocal{T} <: AbstractThreadLocal{T}
Represents a thread-local value. See AbstractThreadLocal
for the API.
Constructors:
ThreadLocal{T}() where {T}
ThreadLocal(value::T) where {T}
ThreadLocal{T}(f::Base.Callable) where {T}
Examples:
tlvalue = ThreadLocal(0)
@onthreads allthreads() tlvalue[] = Base.Threads.threadid()
getallvalues(tlvalue) == allthreads()
rand_value_on_each_thread = ThreadLocal{Float64}(rand)
all(x -> 0 < x < 1, getallvalues(rand_value_on_each_thread))
ParallelProcessingTools.@always_everywhere
— Macroalways_everywhere(expr)
Runs expr
on all current Julia processes, but also all future Julia processes added via addworkers
).
Similar to Distributed.everywhere
, but also stores expr
so that addworkers
can execute it automatically on new worker processes.
ParallelProcessingTools.@critical
— Macro@critical expr
Mark code in expr
as a critical section. Code in critical sections will never be executed in parallel (via multithreading) to any other critical section.
@critical
is very useful to mark non-threadsafe code.
Example:
@onthreads allthreads() begin
@critical @info Base.Threads.threadid()
end
Without `@critical`, the above will typically crash Julia.
ParallelProcessingTools.@mt_out_of_order
— Macro@mt_out_of_order begin expr... end
Runs all top-level expressions in begin expr... end
on parallel multi-threaded tasks.
Example:
``` @mtoutof_order begin a = foo() bar() c = baz() end
will run a = foo()
, bar()
and c = baz()
in parallel and in arbitrary order, results of assignments will appear in the outside scope.
ParallelProcessingTools.@onprocs
— Macro@onprocs procsel expr
Executes expr
in parallel on all processes in procsel
. Waits until all processes are done. Returns all results as a vector (or as a single scalar value, if procsel
itself is a scalar).
Example:
using Distributed
addprocs(2)
workers() == @onprocs workers() myid()
ParallelProcessingTools.@onthreads
— Macro@onthreads threadsel expr
Execute code in expr
in parallel on the threads in threadsel
.
threadsel
should be a single thread-ID or a range (or array) of thread-ids. If threadsel == Base.Threads.threadid()
, expr
is run on the current tread with only minimal overhead.
Example 1:
tlsum = ThreadLocal(0.0)
data = rand(100)
@onthreads allthreads() begin
tlsum[] = sum(workpart(data, allthreads(), Base.Threads.threadid()))
end
sum(getallvalues(tlsum)) ≈ sum(data)
Example 2:
# Assuming 4 threads:
tl = ThreadLocal(42)
threadsel = 2:3
@onthreads threadsel begin
tl[] = Base.Threads.threadid()
end
getallvalues(tl)[threadsel] == [2, 3]
getallvalues(tl)[[1,4]] == [42, 42]
ParallelProcessingTools.addworkers
— Functionaddworkers(mode::ParallelProcessingTools.AddProcsMode)
Add Julia worker processes for LEGEND data processing.
By default ensures that all workers processes use the same Julia project environment as the current process (requires that file systems paths are consistenst across compute hosts).
Use @always_everywhere
to run initialization code on all current processes and all future processes added via addworkers
:
using Distributed, ParallelProcessingTools
@always_everywhere begin
using SomePackage
import SomeOtherPackage
get_global_value() = 42
end
# ... some code ...
addworkers(LocalProcesses(nprocs = 4))
# `get_global_value` is available even though workers were added later:
remotecall_fetch(get_global_value, last(workers()))
See also worker_resources()
.
ParallelProcessingTools.allthreads
— Methodallthreads()
Convencience function, returns an equivalent of 1:Base.Threads.nthreads()
.
ParallelProcessingTools.default_elastic_manager
— FunctionParallelProcessingTools.default_elastic_manager()
ParallelProcessingTools.default_elastic_manager(manager::ClusterManagers.ElasticManager)
Get or set the default elastic cluster manager.
ParallelProcessingTools.elastic_addprocs_timeout
— FunctionParallelProcessingTools.elastic_addprocs_timeout(mode::ElasticAddProcsMode)
Get the timeout in seconds for waiting for worker processes to connect.
ParallelProcessingTools.elastic_localworker_startcmd
— MethodParallelProcessingTools.elastic_localworker_startcmd(
manager::Distributed.ClusterManager;
julia_cmd::Cmd = _default_julia_cmd(),
julia_flags::Cmd = _default_julia_flags(),
julia_project::AbstractString = _default_julia_project()
)::Cmd
Return the system command required to start a Julia worker process, that will connect to manager
, on the current host.
ParallelProcessingTools.getallvalues
— Functiongetallvalues(v::AbstractThreadLocal{T})::AbstractVector{T}
Access the all values (one for each thread) of a thread-local value as a vector. Can only be called in single-threaded code sections.
ParallelProcessingTools.getlocalvalue
— Functiongetlocalvalue(x::Any) = x
getlocalvalue(x::ThreadLocal) = x[]
Access plain values and thread-local values in a unified fashion.
ParallelProcessingTools.pinthreads_auto
— Methodpinthreads_auto()
Use default thread-pinning strategy for the current Julia process.
ParallelProcessingTools.pinthreads_distributed
— MethodParallelProcessingTools.pinthreads_distributed(procs::AbstractVector{<:Integer} = Distrib)
Use default thread-pinning strategy on all Julia processes processes procs
.
ParallelProcessingTools.shutdown_workers_atexit
— MethodParallelProcessingTools.shutdown_workers_atexit()
Ensure worker processes are shut down when Julia exits.
ParallelProcessingTools.start_elastic_workers
— FunctionParallelProcessingTools.start_elastic_workers(mode::ElasticAddProcsMode, manager::ClusterManagers.ElasticManager)::Int
Spawn worker processes as specified by mode
and return the number of expected additional workers.
ParallelProcessingTools.worker_init_code
— FunctionParallelProcessingTools.worker_init_code(::AddProcsMode)::Expr
Get a Julia code expression to run on new worker processes even before running @always_everywhere
code on them.
ParallelProcessingTools.worker_resources
— Methodworker_resources
Get the distributed Julia process resources currently available.
ParallelProcessingTools.worker_start_command
— FunctionParallelProcessingTools.worker_start_command(
mode::ElasticAddProcsMode,
manager::ClusterManagers.ElasticManager = ParallelProcessingTools.default_elastic_manager()
)::Tuple{Cmd,Integer}
Return the system command to start worker processes as well as the number of workers to start.
ParallelProcessingTools.workpart
— Functionworkpart(data::AbstractArray, workersel::AbstractVector{W}, current_worker::W) where {W}
Get the part of data
that the execution unit current_worker
is responsible for. Implies a partition of data
across the workers listed in workersel
.
For generic data
arrays, workpart
will return a view. If data
is a Range
(e.g. indices to be processed), a sub-range will be returned.
Type W
will typically be Int
and workersel
will usually be a range/array of thread/process IDs.
Note: workersel
is required to be sorted in ascending order and to contain no duplicate entries.
Examples:
using Distributed, Base.Threads
A = rand(100)
# ...
sub_A = workpart(A, workers(), myid())
# ...
idxs = workpart(eachindex(sub_A), allthreads(), threadid())
for i in idxs
# ...
end