API
Modules
Types and constants
ParallelProcessingTools.AbstractThreadLocal
ParallelProcessingTools.AutoThreadPinning
ParallelProcessingTools.CreateNew
ParallelProcessingTools.CreateOrIgnore
ParallelProcessingTools.CreateOrModify
ParallelProcessingTools.CreateOrReplace
ParallelProcessingTools.DynamicAddProcsMode
ParallelProcessingTools.FilesToRead
ParallelProcessingTools.FilesToWrite
ParallelProcessingTools.FlexWorkerPool
ParallelProcessingTools.HTCondorRun
ParallelProcessingTools.MaxTriesExceeded
ParallelProcessingTools.ModifyExisting
ParallelProcessingTools.NonZeroExitCode
ParallelProcessingTools.OnLocalhost
ParallelProcessingTools.RunProcsMode
ParallelProcessingTools.SlurmRun
ParallelProcessingTools.ThreadLocal
ParallelProcessingTools.TimelimitExceeded
ParallelProcessingTools.WriteMode
Functions and macros
ParallelProcessingTools.@always_everywhere
ParallelProcessingTools.@critical
ParallelProcessingTools.@mt_out_of_order
ParallelProcessingTools.@onprocs
ParallelProcessingTools.@onthreads
ParallelProcessingTools.@return_exceptions
ParallelProcessingTools.@userfriendly_exceptions
ParallelProcessingTools.@wait_while
ParallelProcessingTools.add_procinit_code
ParallelProcessingTools.allprocs_management_lock
ParallelProcessingTools.allthreads
ParallelProcessingTools.clear_worker_caches!
ParallelProcessingTools.current_procinit_level
ParallelProcessingTools.default_cache_dir
ParallelProcessingTools.default_cache_dir!
ParallelProcessingTools.ensure_procinit
ParallelProcessingTools.ensure_procinit_or_kill
ParallelProcessingTools.get_procinit_code
ParallelProcessingTools.getallvalues
ParallelProcessingTools.getlabel
ParallelProcessingTools.getlocalvalue
ParallelProcessingTools.global_procinit_level
ParallelProcessingTools.hasfailed
ParallelProcessingTools.idle_sleep
ParallelProcessingTools.in_vscode_notebook
ParallelProcessingTools.inner_exception
ParallelProcessingTools.isactive
ParallelProcessingTools.isvalid_pid
ParallelProcessingTools.memory_limit
ParallelProcessingTools.memory_limit!
ParallelProcessingTools.onlyfirst_exception
ParallelProcessingTools.onworker
ParallelProcessingTools.original_exception
ParallelProcessingTools.ppt_cluster_manager
ParallelProcessingTools.ppt_cluster_manager!
ParallelProcessingTools.ppt_worker_pool
ParallelProcessingTools.ppt_worker_pool!
ParallelProcessingTools.printover
ParallelProcessingTools.proc_management_lock
ParallelProcessingTools.read_files
ParallelProcessingTools.runworkers
ParallelProcessingTools.sleep_ns
ParallelProcessingTools.split_basename_ext
ParallelProcessingTools.stopworkers
ParallelProcessingTools.tmp_filename
ParallelProcessingTools.wait_for_all
ParallelProcessingTools.wait_for_any
ParallelProcessingTools.whyfailed
ParallelProcessingTools.worker_local_startcmd
ParallelProcessingTools.worker_resources
ParallelProcessingTools.worker_start_command
ParallelProcessingTools.workpart
ParallelProcessingTools.wouldwait
ParallelProcessingTools.write_files
ParallelProcessingTools.write_worker_start_script
Documentation
ParallelProcessingTools.AbstractThreadLocal
— Typeabstract type AbstractThreadLocal{T} end
Abstract type for thread-local values of type T
.
The value for the current thread is accessed via getindex(::AbstractThreadLocal)
and `setindex(::AbstractThreadLocal, x).
To access both regular and thread-local values in a unified manner, use the function getlocalvalue
.
To get the all values across all threads, use the function getallvalues
.
Default implementation is ThreadLocal
.
ParallelProcessingTools.AutoThreadPinning
— Typestruct AutoThreadPinning
ParallelProcessingTools default thread pinning mode.
Constructor:
AutoThreadPinning(; random::Bool = false, pin_blas::Bool = false)
Arguments:
random
: Use system topology based random thread pinning if no thread affinity mask is set (e.g. via SLURM,taskset
).blas
: Try to pin BLAS threads. Not fully functional due to bugs in BLAS thread pinning (see ThreadPinning issue #105).
Use with ThreadPinning.pinthreads
:
using ParallelProcessingTools, ThreadPinning
pinthreads(AutoThreadPinning())
ParallelProcessingTools.CreateNew
— TypeCreateNew() isa WriteMode
Indicates that new files should be created and to throw and eror if the files already exist.
See WriteMode
and write_files
.
ParallelProcessingTools.CreateOrIgnore
— TypeCreateOrIgnore() isa WriteMode
Indicates that new files should be created, and that nothing should be done if if the files already exist.
Causes an error to be thrown if only some of the files exist, to indicate an inconsistent state.
CreateOrIgnore()
is the recommended default when creating files in a parallel computing context, especially if failure or timeouts might result in re-tries. This way, if multiple workers try to create the same file(s), only one file or consistent set of files will be created under the target filenames.
See WriteMode
and write_files
.
ParallelProcessingTools.CreateOrModify
— TypeCreateOrIgnore() isa WriteMode
Indicates that either new files should be created, or that existing files should be modified.
Causes an error to be thrown if only some of the files exist already, to indicate an inconsistent state.
See WriteMode
and write_files
.
ParallelProcessingTools.CreateOrReplace
— TypeCreateOrReplace() isa WriteMode
Indicates that new files should be created and existing files should be replaced.
See WriteMode
and write_files
.
ParallelProcessingTools.DynamicAddProcsMode
— Typeabstract type ParallelProcessingTools.DynamicAddProcsMode <: ParallelProcessingTools.RunProcsMode
Abstract supertype for worker start modes that use an elastic cluster manager that enables dynamic addition and removal of worker processes.
Subtypes must implement:
ParallelProcessingTools.worker_start_command(runmode::SomeDynamicAddProcsMode, manager::ClusterManager)
ParallelProcessingTools.runworkers(runmode::SomeDynamicAddProcsMode, manager::ClusterManager)
ParallelProcessingTools.FilesToRead
— Typestruct ParallelProcessingTools.FilesToRead
Created by read_files
, represents a set of (temporary) files to read from.
With ftr::FilesToRead
, use collect(ftr)
or iterate(ftr)
to access the filenames to read from. Use close(ftr)
or close(ftr, true)
to close things in good order, indicating success, and use close(ftr, false)
or close(ftr, err:Exception)
to abort, indicating failure.
See read_files
for example code.
If aborted or if the Julia process exits without ftr
being closed, temporary files are still cleaned up, unless read_files
was used with delete_tmp_onerror = false
.
ParallelProcessingTools.FilesToWrite
— Typestruct ParallelProcessingTools.FilesToWrite
Created by write_files
, represents a set of (temporary) files to write to.
With ftw::FilesToWrite
, use collect(ftw)
or iterate(ftw)
to access the filenames to write to. Use close(ftw)
or close(ftw, true)
to close things in good order, indicating success, and use close(ftw, false)
or close(ftw, err:Exception)
to abort, indicating failure.
See write_files
for example code.
If aborted or if the Julia process exits without ftw
being closed, temporary files are still cleaned up, unless write_files
was used with delete_tmp_onerror = false
.
ParallelProcessingTools.FlexWorkerPool
— TypeFlexWorkerPool{WP<:AbstractWorkerPool}(
worker_pids::AbstractVector{Int};
label::AbstractString = "", maxoccupancy::Int = 1, init_workers::Bool = true
)::AbstractWorkerPool
FlexWorkerPool(; caching = false, withmyid::Bool = true, kwargs...)
An flexible worker pool, intended to work with cluster managers that may add and remove Julia processes dynamically.
If the current process (Distributed.myid()
) is part of the pool, resp. if withmyid
is true
, it will be used as a fallback when no other workers are in are members of the pool (e.g. because no other processes have been added yet or because all other processes in the pool have terminated and been removed from it). The current process will not be used as a fallback when all other workers are currently in use.
If caching
is true, the pool will use a Distributed.CachingPool
as the underlying pool, otherwise a Distributed.WorkerPool
.
If maxoccupancy
is greater than one, individual workers can be used maxoccupancy
times in parallel. So take!(pool)
may return the same process ID pid
multiple times without a put!(pool, pid)
in between. Such a (ideally moderate) oversubscription can be useful to reduce latency-related idle times on workers: e.g. if communication latency to the worker is not short compared the the runtime of the function called on them. Or if the remote functions are often blocked waiting for I/O. Note: Workers still must be put back the same number of times they were taken from the pool, in total.
If init_workers
is true
, workers taken from the pool will be guaranteed to be initialized to the current global initialization level (see @always_everywhere
).
WP
is the type of the underlying worker pool used, e.g. Distributed.WorkerPool
(default) or Distributed.CachingPool
.
Example:
using ParallelProcessingTools, Distributed
pool = FlexWorkerPool(withmyid = true, maxoccupancy = 3)
workers(pool)
pids = [take!(pool) for _ in 1:3]
@assert pids == repeat([myid()], 3)
foreach(pid -> put!(pool, pid), pids)
addprocs(4)
worker_procs = workers()
push!.(Ref(pool), worker_procs)
pids = [take!(pool) for _ in 1:4*3]
@assert pids == repeat(worker_procs, 3)
foreach(pid -> put!(pool, pid), pids)
rmprocs(worker_procs)
pids = [take!(pool) for _ in 1:3]
@assert pids == repeat([myid()], 3)
foreach(pid -> put!(pool, pid), pids)
ParallelProcessingTools.HTCondorRun
— TypeHTCondorRun(;
n::Int = 1
condor_flags::Cmd = _default_condor_flags()
condor_settings::Dict{String,String} = Dict{String,String}()
julia_flags::Cmd = _default_julia_flags()
julia_depot::Vector{String} = DEPOT_PATH
jobfile_dir = homedir()
env::Dict{String,String} = Dict{String,String}()
redirect_output::Bool = true
)
Mode to add worker processes via HTCondor condor_submit
.
Condor submit script and steering .sh
files are stored in jobfile_dir
.
Example:
julia> runmode = HTCondorRun(n = 10; condor_settings=Dict("universe" => "vanilla", "+queue" => "short", "request_memory" => "4GB"))
task = runworkers(runmode)
julia> runworkers(runmode)
[ Info: Submitting HTCondor job: `condor_submit /home/jiling/jl_rAHyFadwHa.sub`
Submitting job(s)..........
10 job(s) submitted to cluster 3198291.
[ Info: HTCondor job submitted: `condor_submit /home/jiling/jl_rAHyFadwHa.sub`
(nothing, 10)
julia> sleep(10)
julia> nworkers()
10
Workers can also be started manually, use worker_start_command(runmode)
to get the condor_submit
start command and run it from a separate process or so.
ParallelProcessingTools.MaxTriesExceeded
— TypeMaxTriesExceeded <: Exception
Exception thrown when a number of (re-)tries was exceeded.
ParallelProcessingTools.ModifyExisting
— TypeModifyExisting() isa WriteMode
Indicates that existing files should be modified.
Causes an error to be thrown if not all of the files exist already.
See WriteMode
and write_files
.
ParallelProcessingTools.NonZeroExitCode
— TypeParallelProcessingTools.NonZeroExitCode(cmd::Cmd, exitcode::Integer) isa Exception
Exception to indicate that a an external process running cmd
failed with the given exit code (not equal zero).
ParallelProcessingTools.OnLocalhost
— TypeOnLocalhost(;
n::Integer = 1
env::Dict{String,String} = Dict{String,String}()
) isa DynamicAddProcsMode
Mode that runs n
worker processes on the current host.
Example:
runmode = OnLocalhost(n = 4)
task, n = runworkers(runmode)
Threads.@async begin
wait(task)
@info "SLURM workers have terminated."
end
@wait_while nprocs()-1 < n)
Workers can also be started manually, use worker_start_command(runmode)
to get the system (shell) command and run it from a separate process or so.
ParallelProcessingTools.RunProcsMode
— Typeabstract type ParallelProcessingTools.RunProcsMode
Abstract supertype for worker process run modes.
Subtypes must implement:
ParallelProcessingTools.runworkers(runmode::SomeRunProcsMode, manager::Distributed.AbstractClusterManager)
ParallelProcessingTools.SlurmRun
— TypeSlurmRun(;
slurm_flags::Cmd = {defaults}
julia_flags::Cmd = {defaults}
dir = pwd()
env::Dict{String,String} = Dict{String,String}()
redirect_output::Bool = true
)
Mode to add worker processes via SLURM srun
.
srun
and Julia worker julia
command line flags are inferred from SLURM environment variables (e.g. when inside of an salloc
or batch job), as well as slurm_flags
and julia_flags
.
Workers are started with current directory set to dir
.
Example:
runmode = SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`)
task = runworkers(runmode)
Threads.@async begin
wait(task)
@info "SLURM workers have terminated."
end
@wait_while nprocs()-1 < n
Workers can also be started manually, use worker_start_command(runmode)
to get the srun
start command and run it from a separate process or so.
ParallelProcessingTools.ThreadLocal
— TypeThreadLocal{T} <: AbstractThreadLocal{T}
Represents a thread-local value. See AbstractThreadLocal
for the API.
Constructors:
ThreadLocal{T}() where {T}
ThreadLocal(value::T) where {T}
ThreadLocal{T}(f::Base.Callable) where {T}
Examples:
tlvalue = ThreadLocal(0)
@onthreads allthreads() tlvalue[] = Base.Threads.threadid()
getallvalues(tlvalue) == allthreads()
rand_value_on_each_thread = ThreadLocal{Float64}(rand)
all(x -> 0 < x < 1, getallvalues(rand_value_on_each_thread))
ParallelProcessingTools.TimelimitExceeded
— TypeTimelimitExceeded <: Exception
Exception thrown something timed out.
ParallelProcessingTools.WriteMode
— Typeabstract type WriteMode
Abstract type for write modes.
May be one of the following subtypes: CreateNew
, CreateOrIgnore
, CreateOrReplace
, CreateOrModify
, ModifyExisting
.
Used by write_files
.
ParallelProcessingTools.@always_everywhere
— Macro@always_everywhere(expr)
Runs expr
on all current Julia processes, but also all future Julia processes after an ensure_procinit()
) when managed using a FlexWorkerPool
.
Similar to Distributed.everywhere
, but also stores expr
so that ensure_procinit
can execute it on future worker processes.
Example:
@always_everywhere begin
using SomePackage
using SomeOtherPackage
some_global_variable = 42
end
See also ParallelProcessingTools.add_procinit_code
and ParallelProcessingTools.ensure_procinit
.
ParallelProcessingTools.@critical
— Macro@critical expr
Mark code in expr
as a critical section. Code in critical sections will never be executed in parallel (via multithreading) to any other critical section.
@critical
is very useful to mark non-threadsafe code.
Example:
@onthreads allthreads() begin
@critical @info Base.Threads.threadid()
end
Without `@critical`, the above will typically crash Julia.
ParallelProcessingTools.@mt_out_of_order
— Macro@mt_out_of_order begin expr... end
Runs all top-level expressions in begin expr... end
on parallel multi-threaded tasks.
Example:
``` @mtoutof_order begin a = foo() bar() c = baz() end
will run a = foo()
, bar()
and c = baz()
in parallel and in arbitrary order, results of assignments will appear in the outside scope.
ParallelProcessingTools.@onprocs
— Macro@onprocs procsel expr
Executes expr
in parallel on all processes in procsel
. Waits until all processes are done. Returns all results as a vector (or as a single scalar value, if procsel
itself is a scalar).
Example:
using Distributed
addprocs(2)
workers() == @onprocs workers() myid()
ParallelProcessingTools.@onthreads
— Macro@onthreads threadsel expr
Execute code in expr
in parallel on the threads in threadsel
.
threadsel
should be a single thread-ID or a range (or array) of thread-ids. If threadsel == Base.Threads.threadid()
, expr
is run on the current tread with only minimal overhead.
Example 1:
tlsum = ThreadLocal(0.0)
data = rand(100)
@onthreads allthreads() begin
tlsum[] = sum(workpart(data, allthreads(), Base.Threads.threadid()))
end
sum(getallvalues(tlsum)) ≈ sum(data)
Example 2:
# Assuming 4 threads:
tl = ThreadLocal(42)
threadsel = 2:3
@onthreads threadsel begin
tl[] = Base.Threads.threadid()
end
getallvalues(tl)[threadsel] == [2, 3]
getallvalues(tl)[[1,4]] == [42, 42]
ParallelProcessingTools.@return_exceptions
— Macro@return_exceptions expr
Runs expr
and catches and returns exceptions as values instead of having them thrown.
Useful for user-side debugging, especially of parallel and/or remote code execution.
See also @userfriendly_exceptions
.
ParallelProcessingTools.@userfriendly_exceptions
— Macro@userfriendly_exceptions expr
Transforms exceptions originating from expr
into more user-friendly ones.
If multiple exceptions originate from parallel code in expr
, only one is rethrown, and TaskFailedException
s and RemoteException
s are replaced by the original exceptions that caused them.
See [inner_exception
] and onlyfirst_exception
.
ParallelProcessingTools.@wait_while
— Macro@wait_while [maxtime=nothing] [timeout_error=false] cond
Wait while cond
is true, using slowly increasing sleep times in between evaluating cond
.
cond
may be an arbitrary Julia expression.
If maxtime
is given with an real value, will only wait for maxtime
seconds, if the value is zero or negative will not wait at all.
If timeout_error
is true
, will throw a TimelimitExceeded
exception if the maximum waiting time is exceeded.
Example, wait for a task with a maxtime:
task = Threads.@spawn sleep(10)
timer = Timer(2)
@wait_while !istaskdone(task) && isopen(timer)
istaskdone(task) == false
ParallelProcessingTools.add_procinit_code
— MethodParallelProcessingTools.add_procinit_code(expr; run_everywhere::Bool = false)
Add expr
to process init code. expr
is run on the current proccess immediately, but not automatically on remote processes unless run_everywhere
is true
.
User code should typically not need to call this function, but should use @always_everywhere
instead.
See also ParallelProcessingTools.get_procinit_code
and ParallelProcessingTools.ensure_procinit
.
ParallelProcessingTools.allprocs_management_lock
— MethodParallelProcessingTools.allprocs_management_lock()::ReentrantLock
Returns the global process operations lock. This lock is used to protect operations that concern the management of all processes.
ParallelProcessingTools.allthreads
— Methodallthreads()
Convencience function, returns an equivalent of 1:Base.Threads.nthreads()
.
ParallelProcessingTools.clear_worker_caches!
— Functionclear_worker_caches!(pool::AbstractWorkerPool)
Clear the worker caches (cached function closures, etc.) on the workers In pool
.
Does nothing if the pool doesn't perform any on-worker caching.
ParallelProcessingTools.current_procinit_level
— MethodParallelProcessingTools.current_procinit_level()
Return the init level of the current process.
See also global_procinit_level
.
ParallelProcessingTools.default_cache_dir!
— MethodParallelProcessingTools.default_cache_dir!(dir::AbstractString)
Sets the default cache directory to dir
and returns it.
See also default_cache_dir!
.
ParallelProcessingTools.default_cache_dir
— MethodParallelProcessingTools.default_cache_dir()::String
Returns the default cache directory, e.g. for write_files
and read_files
(@ref).
See also default_cache_dir!
.
ParallelProcessingTools.ensure_procinit
— Functionensure_procinit(pid::Int)
ensure_procinit(pids::AbstractVector{Int} = workers())
Run process initialization code on the given process(es) if necessary, returns after initialization is complete.
When using a FlexWorkerPool
, worker initialization can safely be run in the background though, as the pool will only offer workers (via take!(pool)
) after it has fully initialized them.
See also ParallelProcessingTools.get_procinit_code
and ParallelProcessingTools.add_procinit_code
.
See also ParallelProcessingTools.get_procinit_code
, ParallelProcessingTools.ensure_procinit
, ParallelProcessingTools.global_procinit_level
and ParallelProcessingTools.current_procinit_level
.
ParallelProcessingTools.ensure_procinit_or_kill
— FunctionParallelProcessingTools.ensure_procinit_or_kill(pid::Int)
ParallelProcessingTools.ensure_procinit_or_kill(pids::AbstractVector{Int} = workers())
Run process initialization code on the given process(es) if necessary, kill and remove process(es) for which initialization fails.
See also ParallelProcessingTools.ensure_procinit
.
ParallelProcessingTools.get_procinit_code
— MethodParallelProcessingTools.get_procinit_code()
Returns the code that should be run on each process to ensure that desired packages are loaded and global variable are set up as expected.
See also ParallelProcessingTools.add_procinit_code
and ParallelProcessingTools.ensure_procinit
.
ParallelProcessingTools.getallvalues
— Functiongetallvalues(v::AbstractThreadLocal{T})::AbstractVector{T}
Access the all values (one for each thread) of a thread-local value as a vector. Can only be called in single-threaded code sections.
ParallelProcessingTools.getlabel
— FunctionParallelProcessingTools.getlabel(obj)
Returns a descriptive label for obj
suitable for using in exceptions and logging messages. Defaults to string(obj)
.
ParallelProcessingTools.getlocalvalue
— Functiongetlocalvalue(x::Any) = x
getlocalvalue(x::ThreadLocal) = x[]
Access plain values and thread-local values in a unified fashion.
ParallelProcessingTools.global_procinit_level
— MethodParallelProcessingTools.global_procinit_level()
Return the global process init level.
Returns, e.g., the number of times add_procinit_code
resp. @always_everywhere
have been called.
See also current_procinit_level
.
ParallelProcessingTools.hasfailed
— FunctionParallelProcessingTools.hasfailed(obj)::Bool
Checks if obj
has failed in some way.
Supports Task
and Process
and may be extended to other object types.
Returns false
if isnothing(obj)
or ismissing(obj)
.
ParallelProcessingTools.idle_sleep
— Methodidle_sleep(n_idle::Integer, t_interval_s, t_max_s)
Sleep because something has been idle for n_idle
times.
Will sleep for log2(n_idle + 1) * t_interval_s
seconds, but at most for t_max_s
seconds.
Guaranteed yield()
at least once, even if n_idle
is zero.
ParallelProcessingTools.in_vscode_notebook
— MethodParallelProcessingTools.in_vscode_notebook():Bool
Test if running within a Visual Studio Code notebook.
ParallelProcessingTools.inner_exception
— FunctionParallelProcessingTools.inner_exception(err)
Replaces exceptions like a TaskFailedException
or a RemoteException
with their underlying cause. Leaves other exceptions unchanged.
ParallelProcessingTools.isactive
— FunctionParallelProcessingTools.isactive(obj)::Bool
Checks if obj
is still active, running or whatever applies to the type of obj
.
Supports Task
, Process
, Future
, Channel
, Timer
, Base.AsyncCondition
and may be extended to other object types.
Returns false
if isnothing(obj)
and true
if ismissing(obj)
.
ParallelProcessingTools.isvalid_pid
— Functionisvalid_pid(pid::Int)::Bool
Tests if pid
is a valid Julia process ID.
Equivalent to pid in Distributed.procs()
, but faster.
ParallelProcessingTools.memory_limit
— Functionmemory_limit()
Gets the virtual memory limit for the current Julia process.
Returns a tuple (soft_limit::Int64, hard_limit::Int64)
(in units of bytes). Values of -1
mean unlimited.
Currently only works on Linux, simply returns (Int64(-1), Int64(-1))
on other operationg systems.
ParallelProcessingTools.memory_limit!
— Functionmemory_limit!(soft_limit::Integer, hard_limit::Integer = -1)
Sets the virtual memory limit for the current Julia process.
soft_limit
and hard_limit
are in units of bytes. Values of -1
mean unlimited. hard_limit
must not be stricter than soft_limit
, and should typically be set to -1
.
Returns (soft_limit::Int64, hard_limit::Int64)
.
Currently only has an effect on Linux, does nothing and simply returns (Int64(-1), Int64(-1))
on other operating systems.
ParallelProcessingTools.onlyfirst_exception
— FunctionParallelProcessingTools.onlyfirst_exception(err)
Replaces CompositeException
s with their first exception.
Also employs inner_exception
if simplify
is true
.
ParallelProcessingTools.onworker
— Functiononworker(
f::Function, args...;
pool::AbstractWorkerPool = ppt_worker_pool(),
maxtime::Real = 0, tries::Integer = 1, label::AbstractString = ""
)
Runs f(args...)
on an available worker process from the given pool
and returns the result.
If maxtime > 0
, a maximum time for the activity is set. If the activity takes longer than maxtime
seconds, the process running it (if not the main process) will be terminated.
label
is used for debug-logging.
If a problem occurs (maxtime or worker failure) while running the activity, reschedules the task if the maximum number of tries has not yet been reached, otherwise throws an exception.
ParallelProcessingTools.original_exception
— FunctionParallelProcessingTools.original_exception(err)
Replaces (possibly nested) exceptions like a TaskFailedException
or RemoteException
s with the innermost exception, likely to be the one that was thrown originally. Leaves other exceptions unchanged.
ParallelProcessingTools.ppt_cluster_manager
— FunctionParallelProcessingTools.ppt_cluster_manager()
ParallelProcessingTools.ppt_cluster_manager(manager::ClusterManager)
Get the default ParallelProcessingTools cluster manager.
ParallelProcessingTools.ppt_cluster_manager!
— MethodParallelProcessingTools.ppt_cluster_manager!(manager::CustomClusterManagers.ElasticManager)
Set the default ParallelProcessingTools cluster manager.
ParallelProcessingTools.ppt_worker_pool!
— Methodppt_worker_pool!(wp::FlexWorkerPool)
Sets the default ParallelProcessingTools worker pool to wp
and returns it.
See ppt_worker_pool()
.
ParallelProcessingTools.ppt_worker_pool
— Methodppt_worker_pool()
Returns the default ParallelProcessingTools worker pool.
If the default instance doesn't exist yet, then a FlexWorkerPool
will be created that initially contains Distributed.myid()
as the only worker.
ParallelProcessingTools.printover
— MethodParallelProcessingTools.printover(f_show::Function, io::IOBuffer)
Runs f_show(tmpio)
with an IO buffer, then clears the required number of lines on io
(typically stdout
) and prints the output over them.
ParallelProcessingTools.proc_management_lock
— MethodParallelProcessingTools.proc_management_lock(pid::Integer)::ReentrantLock
Returns a process-specific lock. This lock is used to protect operations that concern the management process pid
.
ParallelProcessingTools.read_files
— Functionfunction read_files(
[f_read, ], filenames::AbstractString...;
use_cache::Bool = true, cache_dir::AbstractString = default_cache_dir(),
create_cachedir::Bool = true, delete_tmp_onerror::Bool=true,
verbose::Bool = false
)
Reads filenames
in an atomic fashion (i.e. only if all filenames
exist) on a best-effort basis (depending on the OS and file-system used).
If a reading function f_read
is given, calls f_read(filenames...)
. The return value of f_read
is passed through.
If use_cache
is true
, then the files are first copied to the cache directory cache_dir
under temporary names, and then read via f_read(temporary_filenames...)
. The temporary files are deleted after f_read
exits (except if an exception is thrown during reading and delete_tmp_onerror
is set to false
).
Set ENV["JULIA_DEBUG"] = "ParallelProcessingTools"
to see a log of all intermediate steps.
For example:
write("foo.txt", "Hello"); write("bar.txt", "World")
result = read_files("foo.txt", "bar.txt", use_cache = true) do foo, bar
read(foo, String) * " " * read(bar, String)
end
If no reading funcion f_read
is given, then read_files
returns an object of type FilesToRead
that holds the temporary filenames. Closing it will clean up temporary files, like described above. So
ftr = read_files("foo.txt", "bar.txt"; use_cache = true)
result = try
foo, bar = collect(ftr)
data_read = read(foo, String) * " " * read(bar, String)
close(ftr)
data_read
catch err
close(ftr, err)
rethrow()
end
is equivalent to the example using read_files(f_read, ...)
above.
If create_cachedir
is true
, then cache_dir
will be created if it doesn't exist yet.
If verbose
is true
, uses log-level Logging.Info
to log file reading, otherwise Logging.Debug
.
On Linux you can set use_cache = true
and cache_dir = "/dev/shm"
to use the default Linux RAM disk as an intermediate directory.
See also write_files
and ParallelProcessingTools.default_cache_dir
.
ParallelProcessingTools.runworkers
— Functionrunworkers(
runmode::ParallelProcessingTools.RunProcsMode
manager::Distributed.AbstractClusterManager = ppt_cluster_manager()
)
Run Julia worker processes.
By default ensures that all workers processes use the same Julia project environment as the current process (requires that file systems paths are consistenst across compute hosts).
The new workers are managed via ppt_cluster_manager()
and automatically added to the ppt_worker_pool()
Returns a tuple (task, n)
. Here, task::Task
is done when all workers have terminated. n
is either an Integer
, if the number of workers that will be started is known, or Nothing
, if the number of workers can't be predicted (accurately).
Example:
task, n = runworkers(OnLocalhost(n = 4))
See also worker_resources()
.
ParallelProcessingTools.sleep_ns
— Methodsleep_ns(t_in_ns::Real)
Sleep for t_in_ns
nanoseconds, using a mixture of yield()
, sleep(0)
and sleep(t)
to be able sleep for short times as well as long times with good relative precision.
Guaranteed to yield()
at least once, even if t_in_ns
is zero.
ParallelProcessingTools.split_basename_ext
— MethodParallelProcessingTools.split_basename_ext(file_basename_with_ext::AbstractString)
Splits a filename (given without its directory path) into a basename without file extension and the file extension. Returns a tuple (basename_noext, ext)
.
Example:
ParallelProcessingTools.split_basename_ext("myfile.tar.gz") == ("myfile", ".tar.gz")
ParallelProcessingTools.stopworkers
— Functionstopworkers()
stopworkers(pid::Int)
stopworkers(pids::AbstractVector{Int})
Stops all or the specified worker processes. The current process is ignored.
ParallelProcessingTools.tmp_filename
— FunctionParallelProcessingTools.tmp_filename(fname::AbstractString)
ParallelProcessingTools.tmp_filename(fname::AbstractString, dir::AbstractString)
Returns a temporary filename, based on fname
.
By default, the temporary filename is in the same directory as fname
, otherwise in dir
.
Does not create the temporary file, only returns the filename (including directory path).
ParallelProcessingTools.wait_for_all
— Functionwait_for_all(
objs...;
maxtime::Union{Real,Nothing} = nothing, timeout_error::Bool = false
)
wait_for_all(objs::Union{Tuple,AbstractVector,Base.Generator,Base.ValueIterator}; kwargs...)
Wait for all of the objs
to become ready.
Readiness of objects is as defined by wouldwait
. Objects that are Nothing
are ignored, i.e. not waited for.
See @wait_while
for the effects of maxtime
and timeout_error
.
Example, wait for two tasks to finish:
task1 = Threads.@spawn sleep(10)
task2 = Threads.@spawn sleep(2)
wait_for_all(task1, task2)
ParallelProcessingTools.wait_for_any
— Functionwait_for_any(
objs...;
maxtime::Union{Real,Nothing} = nothing, timeout_error::Bool = false
)
wait_for_all(objs::Union{Tuple,AbstractVector,Base.Generator,Base.ValueIterator}; kwargs...)
Wait for any of the objects objs
to become ready.
Readiness of objects is as defined by wouldwait
. Objects that are Nothing
are ignored, i.e. not waited for.
See @wait_while
for the effects of maxtime
and timeout_error
.
Example, wait for a task with a timeout:
task1 = Threads.@spawn sleep(1.0)
task2 = Threads.@spawn sleep(5.0)
wait_for_any(task1, task2, maxtime = 3.0)
istaskdone(task1) == true
istaskdone(task2) == false
Similar to waitany
(new in Julia v1.12), but applies to a wider range of object types.
ParallelProcessingTools.whyfailed
— FunctionParallelProcessingTools.whyfailed(obj)::Exception
Returns a reason, as an Exception
instance, why obj
has failed.
Supports Task
and Process
and may be extended to other object types.
obj
must not be nothing
or missing
.
ParallelProcessingTools.worker_local_startcmd
— MethodParallelProcessingTools.worker_local_startcmd(
manager::Distributed.ClusterManager;
julia_cmd::Cmd = _default_julia_cmd(),
julia_flags::Cmd = _default_julia_flags(),
julia_project::AbstractString = _default_julia_project()
redirect_output::Bool = true,
env::AbstractDict{<:AbstractString,<:AbstractString} = ...,
)::Cmd
Return the system command required to start a Julia worker process locally on some host, so that it will connect to manager
.
ParallelProcessingTools.worker_resources
— Methodworker_resources
Get the distributed Julia worker process resources currently available.
This may take some time as some code needs to be loaded on all processes. Automatically runs ensure_procinit()
before querying worker resources.
Note: CPU ID information will only be available if ThreadPinning
is loaded.
ParallelProcessingTools.worker_start_command
— Functionworker_start_command(
runmode::DynamicAddProcsMode,
manager::ClusterManager = ParallelProcessingTools.ppt_cluster_manager()
)::Tuple{Cmd,Integer,Integer}
Return a tuple (cmd, m, n)
, with system command cmd
that needs to be run m
times (in parallel) to start n
workers.
ParallelProcessingTools.workpart
— Functionworkpart(data::AbstractArray, workersel::AbstractVector{W}, current_worker::W) where {W}
Get the part of data
that the execution unit current_worker
is responsible for. Implies a partition of data
across the workers listed in workersel
.
For generic data
arrays, workpart
will return a view. If data
is a Range
(e.g. indices to be processed), a sub-range will be returned.
Type W
will typically be Int
and workersel
will usually be a range/array of thread/process IDs.
Note: workersel
is required to be sorted in ascending order and to contain no duplicate entries.
Examples:
using Distributed, Base.Threads
A = rand(100)
# ...
sub_A = workpart(A, workers(), myid())
# ...
idxs = workpart(eachindex(sub_A), allthreads(), threadid())
for i in idxs
# ...
end
ParallelProcessingTools.wouldwait
— FunctionParallelProcessingTools.wouldwait(obj)::Bool
Returns true
if wait(obj)
would result in waiting and false
if wait(obj)
would return (almost) immediately.
Supports Task
, Process
, Future
, Channel
, Timer
, Base.AsyncCondition
and may be extended to other object types.
Returns false
if isnothing(obj)
but obj
must not be missing
.
ParallelProcessingTools.write_files
— Functionfunction write_files(
[f_write,] filenames::AbstractString...;
mode::WriteMode = CreateOrIgnore(),
use_cache::Bool = false, cache_dir::AbstractString = default_cache_dir(),
create_dirs::Bool = true, delete_tmp_onerror::Bool=true,
verbose::Bool = false
)
Writes to filenames
in an atomic fashion, on a best-effort basis (depending on the OS and file-system used).
mode
determines how to handle pre-existing files, it may be CreateOrIgnore()
(default), CreateNew()
, CreateOrReplace()
, CreateOrModify()
or ModifyExisting()
.
If a writing function f_write
is given, calls f_create(temporary_filenames...)
. If f_create
doesn't throw an exception, the files temporary_filenames
are renamed to filenames
, otherwise the temporary files are are either deleted (if delete_tmp_onerror
is `true) or left in place (e.g. for debugging purposes).
Set ENV["JULIA_DEBUG"] = "ParallelProcessingTools"
to see a log of all intermediate steps.
For example:
write_files("foo.txt", "bar.txt", use_cache = true) do tmp_foo, tmp_bar
write(tmp_foo, "Hello")
write(tmp_bar, "World")
end
write_files(f_write, filenames...)
returns either filenames
, if the files were (re-)written or nothing
if there was nothing to do (depending on mode
).
If no writing funcion f_write
is given then, write_files
returns an object of type FilesToWrite
that holds the temporary filenames. Closing it will, like above, either rename temporary files to filenames
or remove them. So
ftw = write_files("foo.txt", "bar.txt")
if !isnothing(ftw)
try
foo, bar = ftw
write(foo, "Hello")
write(bar, "World")
close(ftw)
catch err
close(ftw, err)
rethrow()
end
end
is equivalent to the example using write_files(f_write, ...)
above.
When modifying files, write_files
first copies existing files filenames
to temporary_filenames
and otherwise behaves as described above.
If use_cache
is true
, the temporary_filenames
are located in cache_dir
and then atomically moved to filenames
, otherwise they located next to filenames
(so in the same directories).
If create_dirs
is true
, target and cache directory paths are created if necessary.
If verbose
is true
, uses log-level Logging.Info
to log file creation, otherwise Logging.Debug
.
On Linux you can set use_cache = true
and cache_dir = "/dev/shm"
to use the default Linux RAM disk as an intermediate directory.
See also read_files
and ParallelProcessingTools.default_cache_dir
.
ParallelProcessingTools.write_worker_start_script
— Functionwrite_worker_start_script(
filename::AbstractString,
runmode::DynamicAddProcsMode,
manager::ClusterManager = ParallelProcessingTools.ppt_cluster_manager()
)
Writes the system command to start worker processes to a shell script.