API
Modules
Types and constants
ParallelProcessingTools.AbstractThreadLocalParallelProcessingTools.AutoThreadPinningParallelProcessingTools.CreateNewParallelProcessingTools.CreateOrIgnoreParallelProcessingTools.CreateOrModifyParallelProcessingTools.CreateOrReplaceParallelProcessingTools.DynamicAddProcsModeParallelProcessingTools.FilesToReadParallelProcessingTools.FilesToWriteParallelProcessingTools.FlexWorkerPoolParallelProcessingTools.MaxTriesExceededParallelProcessingTools.ModifyExistingParallelProcessingTools.NonZeroExitCodeParallelProcessingTools.OnHTCondorParallelProcessingTools.OnLocalhostParallelProcessingTools.OnSlurmParallelProcessingTools.RunProcsModeParallelProcessingTools.ThreadLocalParallelProcessingTools.TimelimitExceededParallelProcessingTools.WriteMode
Functions and macros
ParallelProcessingTools.@always_everywhereParallelProcessingTools.@criticalParallelProcessingTools.@mt_out_of_orderParallelProcessingTools.@onprocsParallelProcessingTools.@onthreadsParallelProcessingTools.@return_exceptionsParallelProcessingTools.@userfriendly_exceptionsParallelProcessingTools.@wait_whileParallelProcessingTools.add_procinit_codeParallelProcessingTools.allprocs_management_lockParallelProcessingTools.allthreadsParallelProcessingTools.clear_worker_caches!ParallelProcessingTools.current_procinit_levelParallelProcessingTools.default_cache_dirParallelProcessingTools.default_cache_dir!ParallelProcessingTools.ensure_procinitParallelProcessingTools.ensure_procinit_or_killParallelProcessingTools.get_procinit_codeParallelProcessingTools.getallvaluesParallelProcessingTools.getlabelParallelProcessingTools.getlocalvalueParallelProcessingTools.global_procinit_levelParallelProcessingTools.hasfailedParallelProcessingTools.idle_sleepParallelProcessingTools.in_vscode_notebookParallelProcessingTools.inner_exceptionParallelProcessingTools.isactiveParallelProcessingTools.isvalid_pidParallelProcessingTools.memory_limitParallelProcessingTools.memory_limit!ParallelProcessingTools.onlyfirst_exceptionParallelProcessingTools.onworkerParallelProcessingTools.original_exceptionParallelProcessingTools.ppt_cluster_managerParallelProcessingTools.ppt_cluster_manager!ParallelProcessingTools.ppt_worker_poolParallelProcessingTools.ppt_worker_pool!ParallelProcessingTools.printoverParallelProcessingTools.proc_management_lockParallelProcessingTools.read_filesParallelProcessingTools.runworkersParallelProcessingTools.sleep_nsParallelProcessingTools.split_basename_extParallelProcessingTools.stopworkersParallelProcessingTools.tmp_filenameParallelProcessingTools.wait_for_allParallelProcessingTools.wait_for_anyParallelProcessingTools.whyfailedParallelProcessingTools.worker_local_startcmdParallelProcessingTools.worker_resourcesParallelProcessingTools.worker_start_commandParallelProcessingTools.workpartParallelProcessingTools.wouldwaitParallelProcessingTools.write_filesParallelProcessingTools.write_worker_start_script
Documentation
ParallelProcessingTools.AbstractThreadLocal — Typeabstract type AbstractThreadLocal{T} endAbstract type for thread-local values of type T.
The value for the current thread is accessed via getindex(::AbstractThreadLocal) and `setindex(::AbstractThreadLocal, x).
To access both regular and thread-local values in a unified manner, use the function getlocalvalue.
To get the all values across all threads, use the function getallvalues.
Default implementation is ThreadLocal.
ParallelProcessingTools.AutoThreadPinning — Typestruct AutoThreadPinningParallelProcessingTools default thread pinning mode.
Constructor:
AutoThreadPinning(; random::Bool = false, pin_blas::Bool = false)Arguments:
random: Use system topology based random thread pinning if no thread affinity mask is set (e.g. via SLURM,taskset).blas: Try to pin BLAS threads. Not fully functional due to bugs in BLAS thread pinning (see ThreadPinning issue #105).
Use with ThreadPinning.pinthreads:
using ParallelProcessingTools, ThreadPinning
pinthreads(AutoThreadPinning())ParallelProcessingTools.CreateNew — TypeCreateNew() isa WriteModeIndicates that new files should be created and to throw and eror if the files already exist.
See WriteMode and write_files.
ParallelProcessingTools.CreateOrIgnore — TypeCreateOrIgnore() isa WriteModeIndicates that new files should be created, and that nothing should be done if if the files already exist.
Causes an error to be thrown if only some of the files exist, to indicate an inconsistent state.
CreateOrIgnore() is the recommended default when creating files in a parallel computing context, especially if failure or timeouts might result in re-tries. This way, if multiple workers try to create the same file(s), only one file or consistent set of files will be created under the target filenames.
See WriteMode and write_files.
ParallelProcessingTools.CreateOrModify — TypeCreateOrIgnore() isa WriteModeIndicates that either new files should be created, or that existing files should be modified.
Causes an error to be thrown if only some of the files exist already, to indicate an inconsistent state.
See WriteMode and write_files.
ParallelProcessingTools.CreateOrReplace — TypeCreateOrReplace() isa WriteModeIndicates that new files should be created and existing files should be replaced.
See WriteMode and write_files.
ParallelProcessingTools.DynamicAddProcsMode — Typeabstract type ParallelProcessingTools.DynamicAddProcsMode <: ParallelProcessingTools.RunProcsModeAbstract supertype for worker start modes that use an elastic cluster manager that enables dynamic addition and removal of worker processes.
Subtypes must implement:
ParallelProcessingTools.worker_start_command(runmode::SomeDynamicAddProcsMode, manager::ClusterManager)ParallelProcessingTools.runworkers(runmode::SomeDynamicAddProcsMode, manager::ClusterManager)
ParallelProcessingTools.FilesToRead — Typestruct ParallelProcessingTools.FilesToReadCreated by read_files, represents a set of (temporary) files to read from.
With ftr::FilesToRead, use collect(ftr) or iterate(ftr) to access the filenames to read from. Use close(ftr) or close(ftr, true) to close things in good order, indicating success, and use close(ftr, false) or close(ftr, err:Exception) to abort, indicating failure.
See read_files for example code.
If aborted or if the Julia process exits without ftr being closed, temporary files are still cleaned up, unless read_files was used with delete_tmp_onerror = false.
ParallelProcessingTools.FilesToWrite — Typestruct ParallelProcessingTools.FilesToWriteCreated by write_files, represents a set of (temporary) files to write to.
With ftw::FilesToWrite, use collect(ftw) or iterate(ftw) to access the filenames to write to. Use close(ftw) or close(ftw, true) to close things in good order, indicating success, and use close(ftw, false) or close(ftw, err:Exception) to abort, indicating failure.
See write_files for example code.
If aborted or if the Julia process exits without ftw being closed, temporary files are still cleaned up, unless write_files was used with delete_tmp_onerror = false.
ParallelProcessingTools.FlexWorkerPool — TypeFlexWorkerPool{WP<:AbstractWorkerPool}(
worker_pids::AbstractVector{Int};
label::AbstractString = "", maxoccupancy::Int = 1, init_workers::Bool = true
)::AbstractWorkerPool
FlexWorkerPool(; caching = false, withmyid::Bool = true, kwargs...)An flexible worker pool, intended to work with cluster managers that may add and remove Julia processes dynamically.
If the current process (Distributed.myid()) is part of the pool, resp. if withmyid is true, it will be used as a fallback when no other workers are in are members of the pool (e.g. because no other processes have been added yet or because all other processes in the pool have terminated and been removed from it). The current process will not be used as a fallback when all other workers are currently in use.
If caching is true, the pool will use a Distributed.CachingPool as the underlying pool, otherwise a Distributed.WorkerPool.
If maxoccupancyis greater than one, individual workers can be used maxoccupancy times in parallel. So take!(pool) may return the same process ID pid multiple times without a put!(pool, pid) in between. Such a (ideally moderate) oversubscription can be useful to reduce latency-related idle times on workers: e.g. if communication latency to the worker is not short compared the the runtime of the function called on them. Or if the remote functions are often blocked waiting for I/O. Note: Workers still must be put back the same number of times they were taken from the pool, in total.
If init_workers is true, workers taken from the pool will be guaranteed to be initialized to the current global initialization level (see @always_everywhere).
WP is the type of the underlying worker pool used, e.g. Distributed.WorkerPool (default) or Distributed.CachingPool.
Example:
using ParallelProcessingTools, Distributed
pool = FlexWorkerPool(withmyid = true, maxoccupancy = 3)
workers(pool)
pids = [take!(pool) for _ in 1:3]
@assert pids == repeat([myid()], 3)
foreach(pid -> put!(pool, pid), pids)
addprocs(4)
worker_procs = workers()
push!.(Ref(pool), worker_procs)
pids = [take!(pool) for _ in 1:4*3]
@assert pids == repeat(worker_procs, 3)
foreach(pid -> put!(pool, pid), pids)
rmprocs(worker_procs)
pids = [take!(pool) for _ in 1:3]
@assert pids == repeat([myid()], 3)
foreach(pid -> put!(pool, pid), pids)ParallelProcessingTools.MaxTriesExceeded — TypeMaxTriesExceeded <: ExceptionException thrown when a number of (re-)tries was exceeded.
ParallelProcessingTools.ModifyExisting — TypeModifyExisting() isa WriteModeIndicates that existing files should be modified.
Causes an error to be thrown if not all of the files exist already.
See WriteMode and write_files.
ParallelProcessingTools.NonZeroExitCode — TypeParallelProcessingTools.NonZeroExitCode(cmd::Cmd, exitcode::Integer) isa ExceptionException to indicate that a an external process running cmd failed with the given exit code (not equal zero).
ParallelProcessingTools.OnHTCondor — TypeOnHTCondor(;
n::Int = 1
condor_flags::Cmd = _default_condor_flags()
condor_settings::Dict{String,String} = Dict{String,String}()
julia_flags::Cmd = _default_julia_flags()
julia_depot::Vector{String} = DEPOT_PATH
jobfile_dir = homedir()
env::Dict{String,String} = Dict{String,String}()
redirect_output::Bool = true
)Mode to add worker processes via HTCondor condor_submit.
Condor submit script and steering .sh files are stored in jobfile_dir.
Example:
julia> runmode = OnHTCondor(n = 10; condor_settings=Dict("universe" => "vanilla", "+queue" => "short", "request_memory" => "4GB"))
task = runworkers(runmode)
julia> runworkers(runmode)
[ Info: Submitting HTCondor job: `condor_submit /home/jiling/jl_rAHyFadwHa.sub`
Submitting job(s)..........
10 job(s) submitted to cluster 3198291.
[ Info: HTCondor job submitted: `condor_submit /home/jiling/jl_rAHyFadwHa.sub`
(nothing, 10)
julia> sleep(10)
julia> nworkers()
10Workers can also be started manually, use worker_start_command(runmode) to get the condor_submit start command and run it from a separate process or so.
ParallelProcessingTools.OnLocalhost — TypeOnLocalhost(;
n::Integer = 1
env::Dict{String,String} = Dict{String,String}()
julia_flags::Cmd = _default_julia_flags()
dir = pwd()
) isa DynamicAddProcsModeMode that runs n worker processes on the current host.
Example:
runmode = OnLocalhost(n = 4)
task, n = runworkers(runmode)
Threads.@async begin
wait(task)
@info "SLURM workers have terminated."
end
@wait_while nprocs()-1 < n)Workers can also be started manually, use worker_start_command(runmode) to get the system (shell) command and run it from a separate process or so.
ParallelProcessingTools.OnSlurm — TypeOnSlurm(;
slurm_flags::Cmd = {defaults}
julia_flags::Cmd = {defaults}
dir = pwd()
env::Dict{String,String} = Dict{String,String}()
redirect_output::Bool = true
)Mode to add worker processes via SLURM srun.
srun and Julia worker julia command line flags are inferred from SLURM environment variables (e.g. when inside of an salloc or batch job), as well as slurm_flags and julia_flags.
Workers are started with current directory set to dir.
Example:
runmode = OnSlurm(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`)
task = runworkers(runmode)
Threads.@async begin
wait(task)
@info "SLURM workers have terminated."
end
@wait_while nprocs()-1 < nWorkers can also be started manually, use worker_start_command(runmode) to get the srun start command and run it from a separate process or so.
ParallelProcessingTools.RunProcsMode — Typeabstract type ParallelProcessingTools.RunProcsModeAbstract supertype for worker process run modes.
Subtypes must implement:
ParallelProcessingTools.runworkers(runmode::SomeRunProcsMode, manager::Distributed.AbstractClusterManager)
ParallelProcessingTools.ThreadLocal — TypeThreadLocal{T} <: AbstractThreadLocal{T}Represents a thread-local value. See AbstractThreadLocal for the API.
Constructors:
ThreadLocal{T}() where {T}
ThreadLocal(value::T) where {T}
ThreadLocal{T}(f::Base.Callable) where {T}Examples:
tlvalue = ThreadLocal(0)
@onthreads allthreads() tlvalue[] = Base.Threads.threadid()
getallvalues(tlvalue) == allthreads()rand_value_on_each_thread = ThreadLocal{Float64}(rand)
all(x -> 0 < x < 1, getallvalues(rand_value_on_each_thread))ParallelProcessingTools.TimelimitExceeded — TypeTimelimitExceeded <: ExceptionException thrown something timed out.
ParallelProcessingTools.WriteMode — Typeabstract type WriteModeAbstract type for write modes.
May be one of the following subtypes: CreateNew, CreateOrIgnore, CreateOrReplace, CreateOrModify, ModifyExisting.
Used by write_files.
ParallelProcessingTools.@always_everywhere — Macro@always_everywhere(expr)Runs expr on all current Julia processes, but also all future Julia processes after an ensure_procinit()) when managed using a FlexWorkerPool.
Similar to Distributed.everywhere, but also stores expr so that ensure_procinit can execute it on future worker processes.
Example:
@always_everywhere begin
using SomePackage
using SomeOtherPackage
some_global_variable = 42
endSee also ParallelProcessingTools.add_procinit_code and ParallelProcessingTools.ensure_procinit.
ParallelProcessingTools.@critical — Macro@critical exprMark code in expr as a critical section. Code in critical sections will never be executed in parallel (via multithreading) to any other critical section.
@critical is very useful to mark non-threadsafe code.
Example:
@onthreads allthreads() begin
@critical @info Base.Threads.threadid()
end
Without `@critical`, the above will typically crash Julia.ParallelProcessingTools.@mt_out_of_order — Macro@mt_out_of_order begin expr... endRuns all top-level expressions in begin expr... end on parallel multi-threaded tasks.
Example:
``` @mtoutof_order begin a = foo() bar() c = baz() end
will run a = foo(), bar() and c = baz() in parallel and in arbitrary order, results of assignments will appear in the outside scope.
ParallelProcessingTools.@onprocs — Macro@onprocs procsel exprExecutes expr in parallel on all processes in procsel. Waits until all processes are done. Returns all results as a vector (or as a single scalar value, if procsel itself is a scalar).
Example:
using Distributed
addprocs(2)
workers() == @onprocs workers() myid()ParallelProcessingTools.@onthreads — Macro@onthreads threadsel exprExecute code in expr in parallel on the threads in threadsel.
threadsel should be a single thread-ID or a range (or array) of thread-ids. If threadsel == Base.Threads.threadid(), expr is run on the current tread with only minimal overhead.
Example 1:
tlsum = ThreadLocal(0.0)
data = rand(100)
@onthreads allthreads() begin
tlsum[] = sum(workpart(data, allthreads(), Base.Threads.threadid()))
end
sum(getallvalues(tlsum)) ≈ sum(data)Example 2:
# Assuming 4 threads:
tl = ThreadLocal(42)
threadsel = 2:3
@onthreads threadsel begin
tl[] = Base.Threads.threadid()
end
getallvalues(tl)[threadsel] == [2, 3]
getallvalues(tl)[[1,4]] == [42, 42]ParallelProcessingTools.@return_exceptions — Macro@return_exceptions exprRuns expr and catches and returns exceptions as values instead of having them thrown.
Useful for user-side debugging, especially of parallel and/or remote code execution.
See also @userfriendly_exceptions.
ParallelProcessingTools.@userfriendly_exceptions — Macro@userfriendly_exceptions exprTransforms exceptions originating from expr into more user-friendly ones.
If multiple exceptions originate from parallel code in expr, only one is rethrown, and TaskFailedExceptions and RemoteExceptions are replaced by the original exceptions that caused them.
See [inner_exception] and onlyfirst_exception.
ParallelProcessingTools.@wait_while — Macro@wait_while [maxtime=nothing] [timeout_error=false] condWait while cond is true, using slowly increasing sleep times in between evaluating cond.
cond may be an arbitrary Julia expression.
If maxtime is given with an real value, will only wait for maxtime seconds, if the value is zero or negative will not wait at all.
If timeout_error is true, will throw a TimelimitExceeded exception if the maximum waiting time is exceeded.
Example, wait for a task with a maxtime:
task = Threads.@spawn sleep(10)
timer = Timer(2)
@wait_while !istaskdone(task) && isopen(timer)
istaskdone(task) == falseParallelProcessingTools.add_procinit_code — MethodParallelProcessingTools.add_procinit_code(expr; run_everywhere::Bool = false)Add expr to process init code. expr is run on the current proccess immediately, but not automatically on remote processes unless run_everywhere is true.
User code should typically not need to call this function, but should use @always_everywhere instead.
See also ParallelProcessingTools.get_procinit_code and ParallelProcessingTools.ensure_procinit.
ParallelProcessingTools.allprocs_management_lock — MethodParallelProcessingTools.allprocs_management_lock()::ReentrantLockReturns the global process operations lock. This lock is used to protect operations that concern the management of all processes.
ParallelProcessingTools.allthreads — Methodallthreads()Convencience function, returns an equivalent of 1:Base.Threads.nthreads().
ParallelProcessingTools.clear_worker_caches! — Functionclear_worker_caches!(pool::AbstractWorkerPool)Clear the worker caches (cached function closures, etc.) on the workers In pool.
Does nothing if the pool doesn't perform any on-worker caching.
ParallelProcessingTools.current_procinit_level — MethodParallelProcessingTools.current_procinit_level()Return the init level of the current process.
See also global_procinit_level.
ParallelProcessingTools.default_cache_dir! — MethodParallelProcessingTools.default_cache_dir!(dir::AbstractString)Sets the default cache directory to dir and returns it.
See also default_cache_dir!.
ParallelProcessingTools.default_cache_dir — MethodParallelProcessingTools.default_cache_dir()::StringReturns the default cache directory, e.g. for write_files and read_files(@ref).
See also default_cache_dir!.
ParallelProcessingTools.ensure_procinit — Functionensure_procinit(pid::Int)
ensure_procinit(pids::AbstractVector{Int} = workers())Run process initialization code on the given process(es) if necessary, returns after initialization is complete.
When using a FlexWorkerPool, worker initialization can safely be run in the background though, as the pool will only offer workers (via take!(pool)) after it has fully initialized them.
See also ParallelProcessingTools.get_procinit_code and ParallelProcessingTools.add_procinit_code.
See also ParallelProcessingTools.get_procinit_code, ParallelProcessingTools.ensure_procinit, ParallelProcessingTools.global_procinit_level and ParallelProcessingTools.current_procinit_level.
ParallelProcessingTools.ensure_procinit_or_kill — FunctionParallelProcessingTools.ensure_procinit_or_kill(pid::Int)
ParallelProcessingTools.ensure_procinit_or_kill(pids::AbstractVector{Int} = workers())Run process initialization code on the given process(es) if necessary, kill and remove process(es) for which initialization fails.
See also ParallelProcessingTools.ensure_procinit.
ParallelProcessingTools.get_procinit_code — MethodParallelProcessingTools.get_procinit_code()Returns the code that should be run on each process to ensure that desired packages are loaded and global variable are set up as expected.
See also ParallelProcessingTools.add_procinit_code and ParallelProcessingTools.ensure_procinit.
ParallelProcessingTools.getallvalues — Functiongetallvalues(v::AbstractThreadLocal{T})::AbstractVector{T}Access the all values (one for each thread) of a thread-local value as a vector. Can only be called in single-threaded code sections.
ParallelProcessingTools.getlabel — FunctionParallelProcessingTools.getlabel(obj)Returns a descriptive label for obj suitable for using in exceptions and logging messages. Defaults to string(obj).
ParallelProcessingTools.getlocalvalue — Functiongetlocalvalue(x::Any) = x
getlocalvalue(x::ThreadLocal) = x[]Access plain values and thread-local values in a unified fashion.
ParallelProcessingTools.global_procinit_level — MethodParallelProcessingTools.global_procinit_level()Return the global process init level.
Returns, e.g., the number of times add_procinit_code resp. @always_everywhere have been called.
See also current_procinit_level.
ParallelProcessingTools.hasfailed — FunctionParallelProcessingTools.hasfailed(obj)::BoolChecks if obj has failed in some way.
Supports Task and Process and may be extended to other object types.
Returns false if isnothing(obj) or ismissing(obj).
ParallelProcessingTools.idle_sleep — Methodidle_sleep(n_idle::Integer, t_interval_s, t_max_s)Sleep because something has been idle for n_idle times.
Will sleep for log2(n_idle + 1) * t_interval_s seconds, but at most for t_max_s seconds.
Guaranteed yield() at least once, even if n_idle is zero.
ParallelProcessingTools.in_vscode_notebook — MethodParallelProcessingTools.in_vscode_notebook():BoolTest if running within a Visual Studio Code notebook.
ParallelProcessingTools.inner_exception — FunctionParallelProcessingTools.inner_exception(err)Replaces exceptions like a TaskFailedException or a RemoteException with their underlying cause. Leaves other exceptions unchanged.
ParallelProcessingTools.isactive — FunctionParallelProcessingTools.isactive(obj)::BoolChecks if obj is still active, running or whatever applies to the type of obj.
Supports Task, Process, Future, Channel, Timer, Base.AsyncCondition and may be extended to other object types.
Returns false if isnothing(obj) and true if ismissing(obj).
ParallelProcessingTools.isvalid_pid — Functionisvalid_pid(pid::Int)::BoolTests if pid is a valid Julia process ID.
Equivalent to pid in Distributed.procs(), but faster.
ParallelProcessingTools.memory_limit — Functionmemory_limit()Gets the virtual memory limit for the current Julia process.
Returns a tuple (soft_limit::Int64, hard_limit::Int64) (in units of bytes). Values of -1 mean unlimited.
ParallelProcessingTools.memory_limit! — Functionmemory_limit!(soft_limit::Integer, hard_limit::Integer = -1)Sets the virtual memory limit for the current Julia process.
soft_limit and hard_limit are in units of bytes. Values of -1 mean unlimited. hard_limit must not be stricter than soft_limit, and should typically be set to -1.
Returns (soft_limit::Int64, hard_limit::Int64).
ParallelProcessingTools.onlyfirst_exception — FunctionParallelProcessingTools.onlyfirst_exception(err)Replaces CompositeExceptions with their first exception.
Also employs inner_exception if simplify is true.
ParallelProcessingTools.onworker — Functiononworker(
f::Function, args...;
pool::AbstractWorkerPool = ppt_worker_pool(),
maxtime::Real = 0, tries::Integer = 1, label::AbstractString = ""
)Runs f(args...) on an available worker process from the given pool and returns the result.
If maxtime > 0, a maximum time for the activity is set. If the activity takes longer than maxtime seconds, the process running it (if not the main process) will be terminated.
label is used for debug-logging.
If a problem occurs (maxtime or worker failure) while running the activity, reschedules the task if the maximum number of tries has not yet been reached, otherwise throws an exception.
ParallelProcessingTools.original_exception — FunctionParallelProcessingTools.original_exception(err)Replaces (possibly nested) exceptions like a TaskFailedException or RemoteExceptions with the innermost exception, likely to be the one that was thrown originally. Leaves other exceptions unchanged.
ParallelProcessingTools.ppt_cluster_manager — FunctionParallelProcessingTools.ppt_cluster_manager()
ParallelProcessingTools.ppt_cluster_manager(manager::ClusterManager)Get the default ParallelProcessingTools cluster manager.
ParallelProcessingTools.ppt_cluster_manager! — MethodParallelProcessingTools.ppt_cluster_manager!(manager::ElasticClusterManager.ElasticManager)Set the default ParallelProcessingTools cluster manager.
ParallelProcessingTools.ppt_worker_pool! — Methodppt_worker_pool!(wp::FlexWorkerPool)Sets the default ParallelProcessingTools worker pool to wp and returns it.
See ppt_worker_pool().
ParallelProcessingTools.ppt_worker_pool — Methodppt_worker_pool()Returns the default ParallelProcessingTools worker pool.
If the default instance doesn't exist yet, then a FlexWorkerPool will be created that initially contains Distributed.myid() as the only worker.
ParallelProcessingTools.printover — MethodParallelProcessingTools.printover(f_show::Function, io::IOBuffer)Runs f_show(tmpio) with an IO buffer, then clears the required number of lines on io (typically stdout) and prints the output over them.
ParallelProcessingTools.proc_management_lock — MethodParallelProcessingTools.proc_management_lock(pid::Integer)::ReentrantLockReturns a process-specific lock. This lock is used to protect operations that concern the management process pid.
ParallelProcessingTools.read_files — Functionfunction read_files(
[f_read, ], filenames::AbstractString...;
use_cache::Bool = true, cache_dir::AbstractString = default_cache_dir(),
create_cachedir::Bool = true, delete_tmp_onerror::Bool=true,
verbose::Bool = false
)Reads filenames in an atomic fashion (i.e. only if all filenames exist) on a best-effort basis (depending on the OS and file-system used).
If a reading function f_read is given, calls f_read(filenames...). The return value of f_read is passed through.
If use_cache is true, then the files are first copied to the cache directory cache_dir under temporary names, and then read via f_read(temporary_filenames...). The temporary files are deleted after f_read exits (except if an exception is thrown during reading and delete_tmp_onerror is set to false).
Set ENV["JULIA_DEBUG"] = "ParallelProcessingTools" to see a log of all intermediate steps.
For example:
write("foo.txt", "Hello"); write("bar.txt", "World")
result = read_files("foo.txt", "bar.txt", use_cache = true) do foo, bar
read(foo, String) * " " * read(bar, String)
endIf no reading funcion f_read is given, then read_files returns an object of type FilesToRead that holds the temporary filenames. Closing it will clean up temporary files, like described above. So
ftr = read_files("foo.txt", "bar.txt"; use_cache = true)
result = try
foo, bar = collect(ftr)
data_read = read(foo, String) * " " * read(bar, String)
close(ftr)
data_read
catch err
close(ftr, err)
rethrow()
endis equivalent to the example using read_files(f_read, ...)above.
If create_cachedir is true, then cache_dir will be created if it doesn't exist yet.
If verbose is true, uses log-level Logging.Info to log file reading, otherwise Logging.Debug.
On Linux you can set use_cache = true and cache_dir = "/dev/shm" to use the default Linux RAM disk as an intermediate directory.
See also write_files and ParallelProcessingTools.default_cache_dir.
ParallelProcessingTools.runworkers — Functionrunworkers(
runmode::ParallelProcessingTools.RunProcsMode
manager::Distributed.AbstractClusterManager = ppt_cluster_manager()
)Run Julia worker processes.
By default ensures that all workers processes use the same Julia project environment as the current process (requires that file systems paths are consistenst across compute hosts).
The new workers are managed via ppt_cluster_manager() and automatically added to the ppt_worker_pool()
Returns a tuple (task, n). Here, task::Task is done when all workers have terminated. n is either an Integer, if the number of workers that will be started is known, or Nothing, if the number of workers can't be predicted (accurately).
Example:
task, n = runworkers(OnLocalhost(n = 4))See also worker_resources().
ParallelProcessingTools.sleep_ns — Methodsleep_ns(t_in_ns::Real)Sleep for t_in_ns nanoseconds, using a mixture of yield(), sleep(0) and sleep(t) to be able sleep for short times as well as long times with good relative precision.
Guaranteed to yield() at least once, even if t_in_ns is zero.
ParallelProcessingTools.split_basename_ext — MethodParallelProcessingTools.split_basename_ext(file_basename_with_ext::AbstractString)Splits a filename (given without its directory path) into a basename without file extension and the file extension. Returns a tuple (basename_noext, ext).
Example:
ParallelProcessingTools.split_basename_ext("myfile.tar.gz") == ("myfile", ".tar.gz")ParallelProcessingTools.stopworkers — Functionstopworkers()
stopworkers(pid::Int)
stopworkers(pids::AbstractVector{Int})Stops all or the specified worker processes. The current process is ignored.
ParallelProcessingTools.tmp_filename — FunctionParallelProcessingTools.tmp_filename(fname::AbstractString)
ParallelProcessingTools.tmp_filename(fname::AbstractString, dir::AbstractString)Returns a temporary filename, based on fname.
By default, the temporary filename is in the same directory as fname, otherwise in dir.
Does not create the temporary file, only returns the filename (including directory path).
ParallelProcessingTools.wait_for_all — Functionwait_for_all(
objs...;
maxtime::Union{Real,Nothing} = nothing, timeout_error::Bool = false
)
wait_for_all(objs::Union{Tuple,AbstractVector,Base.Generator,Base.ValueIterator}; kwargs...)Wait for all of the objs to become ready.
Readiness of objects is as defined by wouldwait. Objects that are Nothing are ignored, i.e. not waited for.
See @wait_while for the effects of maxtime and timeout_error.
Example, wait for two tasks to finish:
task1 = Threads.@spawn sleep(10)
task2 = Threads.@spawn sleep(2)
wait_for_all(task1, task2)ParallelProcessingTools.wait_for_any — Functionwait_for_any(
objs...;
maxtime::Union{Real,Nothing} = nothing, timeout_error::Bool = false
)
wait_for_all(objs::Union{Tuple,AbstractVector,Base.Generator,Base.ValueIterator}; kwargs...)Wait for any of the objects objs to become ready.
Readiness of objects is as defined by wouldwait. Objects that are Nothing are ignored, i.e. not waited for.
See @wait_while for the effects of maxtime and timeout_error.
Example, wait for a task with a timeout:
task1 = Threads.@spawn sleep(1.0)
task2 = Threads.@spawn sleep(5.0)
wait_for_any(task1, task2, maxtime = 3.0)
istaskdone(task1) == true
istaskdone(task2) == falseSimilar to waitany (new in Julia v1.12), but applies to a wider range of object types.
ParallelProcessingTools.whyfailed — FunctionParallelProcessingTools.whyfailed(obj)::ExceptionReturns a reason, as an Exception instance, why obj has failed.
Supports Task and Process and may be extended to other object types.
obj must not be nothing or missing.
ParallelProcessingTools.worker_local_startcmd — MethodParallelProcessingTools.worker_local_startcmd(
manager::Distributed.ClusterManager;
julia_cmd::Cmd = _default_julia_cmd(),
julia_flags::Cmd = _default_julia_flags(),
julia_project::AbstractString = _default_julia_project()
redirect_output::Bool = true,
env::AbstractDict{<:AbstractString,<:AbstractString} = ...,
)::CmdReturn the system command required to start a Julia worker process locally on some host, so that it will connect to manager.
ParallelProcessingTools.worker_resources — Methodworker_resourcesGet the distributed Julia worker process resources currently available.
This may take some time as some code needs to be loaded on all processes. Automatically runs ensure_procinit() before querying worker resources.
Note: CPU ID information will only be available if ThreadPinning is loaded.
ParallelProcessingTools.worker_start_command — Functionworker_start_command(
runmode::DynamicAddProcsMode,
manager::ClusterManager = ParallelProcessingTools.ppt_cluster_manager()
)::Tuple{Cmd,Integer,Integer}Return a tuple (cmd, m, n), with system command cmd that needs to be run m times (in parallel) to start n workers.
ParallelProcessingTools.workpart — Functionworkpart(data::AbstractArray, workersel::AbstractVector{W}, current_worker::W) where {W}Get the part of data that the execution unit current_worker is responsible for. Implies a partition of data across the workers listed in workersel.
For generic data arrays, workpart will return a view. If data is a Range (e.g. indices to be processed), a sub-range will be returned.
Type W will typically be Int and workersel will usually be a range/array of thread/process IDs.
Note: workersel is required to be sorted in ascending order and to contain no duplicate entries.
Examples:
using Distributed, Base.Threads
A = rand(100)
# ...
sub_A = workpart(A, workers(), myid())
# ...
idxs = workpart(eachindex(sub_A), allthreads(), threadid())
for i in idxs
# ...
endParallelProcessingTools.wouldwait — FunctionParallelProcessingTools.wouldwait(obj)::BoolReturns true if wait(obj) would result in waiting and false if wait(obj) would return (almost) immediately.
Supports Task, Process, Future, Channel, Timer, Base.AsyncCondition and may be extended to other object types.
Returns false if isnothing(obj) but obj must not be missing.
ParallelProcessingTools.write_files — Functionfunction write_files(
[f_write,] filenames::AbstractString...;
mode::WriteMode = CreateOrIgnore(),
use_cache::Bool = false, cache_dir::AbstractString = default_cache_dir(),
create_dirs::Bool = true, delete_tmp_onerror::Bool=true,
verbose::Bool = false
)Writes to filenames in an atomic fashion, on a best-effort basis (depending on the OS and file-system used).
mode determines how to handle pre-existing files, it may be CreateOrIgnore() (default), CreateNew(), CreateOrReplace(), CreateOrModify() or ModifyExisting().
If a writing function f_write is given, calls f_create(temporary_filenames...). If f_create doesn't throw an exception, the files temporary_filenames are renamed to filenames, otherwise the temporary files are are either deleted (if delete_tmp_onerror is `true) or left in place (e.g. for debugging purposes).
Set ENV["JULIA_DEBUG"] = "ParallelProcessingTools" to see a log of all intermediate steps.
For example:
write_files("foo.txt", "bar.txt", use_cache = true) do tmp_foo, tmp_bar
write(tmp_foo, "Hello")
write(tmp_bar, "World")
endwrite_files(f_write, filenames...) returns either filenames, if the files were (re-)written or nothing if there was nothing to do (depending on mode).
If no writing funcion f_write is given then, write_files returns an object of type FilesToWrite that holds the temporary filenames. Closing it will, like above, either rename temporary files to filenames or remove them. So
ftw = write_files("foo.txt", "bar.txt")
if !isnothing(ftw)
try
foo, bar = ftw
write(foo, "Hello")
write(bar, "World")
close(ftw)
catch err
close(ftw, err)
rethrow()
end
endis equivalent to the example using write_files(f_write, ...)above.
When modifying files, write_files first copies existing files filenames to temporary_filenames and otherwise behaves as described above.
If use_cache is true, the temporary_filenames are located in cache_dir and then atomically moved to filenames, otherwise they located next to filenames (so in the same directories).
If create_dirs is true, target and cache directory paths are created if necessary.
If verbose is true, uses log-level Logging.Info to log file creation, otherwise Logging.Debug.
On Linux you can set use_cache = true and cache_dir = "/dev/shm" to use the default Linux RAM disk as an intermediate directory.
See also read_files and ParallelProcessingTools.default_cache_dir.
ParallelProcessingTools.write_worker_start_script — Functionwrite_worker_start_script(
filename::AbstractString,
runmode::DynamicAddProcsMode,
manager::ClusterManager = ParallelProcessingTools.ppt_cluster_manager()
)Writes the system command to start worker processes to a shell script.