多线程 #
一、线程基础 #
1.1 启动多线程 #
设置环境变量:
bash
export JULIA_NUM_THREADS=4
julia
或在启动时指定:
bash
julia --threads=4
1.2 查看线程数 #
julia
using Base.Threads
nthreads()
threadid()
二、@threads宏 #
2.1 基本使用 #
julia
using Base.Threads
@threads for i in 1:10
println("Thread $(threadid()) processing $i")
end
2.2 并行计算 #
julia
using Base.Threads
function parallel_sum(arr)
total = Threads.Atomic{Int}(0)
@threads for x in arr
Threads.atomic_add!(total, x)
end
return total[]
end
parallel_sum(1:1000)
2.3 线程本地存储 #
julia
using Base.Threads
function thread_local_example()
locals = [0 for _ in 1:nthreads()]
@threads for i in 1:100
locals[threadid()] += 1
end
return locals
end
thread_local_example()
三、线程安全 #
3.1 Atomic类型 #
julia
using Base.Threads
counter = Threads.Atomic{Int}(0)
@threads for _ in 1:1000
Threads.atomic_add!(counter, 1)
end
counter[]
3.2 锁 #
julia
using Base.Threads
lock = ReentrantLock()
counter = 0
@threads for _ in 1:1000
Base.lock(lock)
try
counter += 1
finally
Base.unlock(lock)
end
end
counter
3.3 线程安全集合 #
julia
using Base.Threads
function thread_safe_push()
lock = ReentrantLock()
results = Int[]
@threads for i in 1:100
Base.lock(lock)
try
push!(results, i)
finally
Base.unlock(lock)
end
end
return results
end
四、并行算法 #
4.1 并行映射 #
julia
using Base.Threads
function parallel_map(f, arr)
results = similar(arr, Any)
@threads for i in eachindex(arr)
results[i] = f(arr[i])
end
return results
end
parallel_map(x -> x^2, [1, 2, 3, 4, 5])
4.2 并行归约 #
julia
using Base.Threads
function parallel_reduce(f, arr, init)
n = nthreads()
partial = [init for _ in 1:n]
@threads for i in eachindex(arr)
partial[threadid()] = f(partial[threadid()], arr[i])
end
return reduce(f, partial)
end
parallel_reduce(+, 1:1000, 0)
4.3 FJPattern (分治) #
julia
using Base.Threads
function parallel_fib(n)
if n < 30
return n <= 1 ? n : parallel_fib(n-1) + parallel_fib(n-2)
end
t = @task parallel_fib(n - 1)
schedule(t)
y = parallel_fib(n - 2)
return fetch(t) + y
end
五、实践练习 #
5.1 练习1:并行矩阵乘法 #
julia
using Base.Threads
function parallel_matrix_multiply(A, B)
rows_A, cols_A = size(A)
rows_B, cols_B = size(B)
C = zeros(rows_A, cols_B)
@threads for i in 1:rows_A
for j in 1:cols_B
for k in 1:cols_A
C[i, j] += A[i, k] * B[k, j]
end
end
end
return C
end
5.2 练习2:并行文件处理 #
julia
using Base.Threads
function parallel_process_files(files)
results = Vector{String}(undef, length(files))
@threads for i in eachindex(files)
results[i] = read(files[i], String)
end
return results
end
5.3 练习3:并行蒙特卡洛 #
julia
using Base.Threads
function parallel_monte_carlo_pi(n)
hits = Threads.Atomic{Int}(0)
@threads for _ in 1:n
x, y = rand(), rand()
if x^2 + y^2 <= 1
Threads.atomic_add!(hits, 1)
end
end
return 4 * hits[] / n
end
parallel_monte_carlo_pi(10^7)
六、总结 #
本章我们学习了:
- 线程基础:启动和查看线程
- @threads宏:并行循环
- 线程安全:Atomic和锁
- 并行算法:映射、归约、分治
接下来让我们学习Julia的分布式计算!
最后更新:2026-03-27