多线程 #

一、线程基础 #

1.1 启动多线程 #

设置环境变量:

bash
export JULIA_NUM_THREADS=4
julia

或在启动时指定:

bash
julia --threads=4

1.2 查看线程数 #

julia
using Base.Threads

nthreads()
threadid()

二、@threads宏 #

2.1 基本使用 #

julia
using Base.Threads

@threads for i in 1:10
    println("Thread $(threadid()) processing $i")
end

2.2 并行计算 #

julia
using Base.Threads

function parallel_sum(arr)
    total = Threads.Atomic{Int}(0)
    @threads for x in arr
        Threads.atomic_add!(total, x)
    end
    return total[]
end

parallel_sum(1:1000)

2.3 线程本地存储 #

julia
using Base.Threads

function thread_local_example()
    locals = [0 for _ in 1:nthreads()]
    
    @threads for i in 1:100
        locals[threadid()] += 1
    end
    
    return locals
end

thread_local_example()

三、线程安全 #

3.1 Atomic类型 #

julia
using Base.Threads

counter = Threads.Atomic{Int}(0)

@threads for _ in 1:1000
    Threads.atomic_add!(counter, 1)
end

counter[]

3.2 锁 #

julia
using Base.Threads

lock = ReentrantLock()
counter = 0

@threads for _ in 1:1000
    Base.lock(lock)
    try
        counter += 1
    finally
        Base.unlock(lock)
    end
end

counter

3.3 线程安全集合 #

julia
using Base.Threads

function thread_safe_push()
    lock = ReentrantLock()
    results = Int[]
    
    @threads for i in 1:100
        Base.lock(lock)
        try
            push!(results, i)
        finally
            Base.unlock(lock)
        end
    end
    
    return results
end

四、并行算法 #

4.1 并行映射 #

julia
using Base.Threads

function parallel_map(f, arr)
    results = similar(arr, Any)
    @threads for i in eachindex(arr)
        results[i] = f(arr[i])
    end
    return results
end

parallel_map(x -> x^2, [1, 2, 3, 4, 5])

4.2 并行归约 #

julia
using Base.Threads

function parallel_reduce(f, arr, init)
    n = nthreads()
    partial = [init for _ in 1:n]
    
    @threads for i in eachindex(arr)
        partial[threadid()] = f(partial[threadid()], arr[i])
    end
    
    return reduce(f, partial)
end

parallel_reduce(+, 1:1000, 0)

4.3 FJPattern (分治) #

julia
using Base.Threads

function parallel_fib(n)
    if n < 30
        return n <= 1 ? n : parallel_fib(n-1) + parallel_fib(n-2)
    end
    
    t = @task parallel_fib(n - 1)
    schedule(t)
    y = parallel_fib(n - 2)
    return fetch(t) + y
end

五、实践练习 #

5.1 练习1:并行矩阵乘法 #

julia
using Base.Threads

function parallel_matrix_multiply(A, B)
    rows_A, cols_A = size(A)
    rows_B, cols_B = size(B)
    
    C = zeros(rows_A, cols_B)
    
    @threads for i in 1:rows_A
        for j in 1:cols_B
            for k in 1:cols_A
                C[i, j] += A[i, k] * B[k, j]
            end
        end
    end
    
    return C
end

5.2 练习2:并行文件处理 #

julia
using Base.Threads

function parallel_process_files(files)
    results = Vector{String}(undef, length(files))
    
    @threads for i in eachindex(files)
        results[i] = read(files[i], String)
    end
    
    return results
end

5.3 练习3:并行蒙特卡洛 #

julia
using Base.Threads

function parallel_monte_carlo_pi(n)
    hits = Threads.Atomic{Int}(0)
    
    @threads for _ in 1:n
        x, y = rand(), rand()
        if x^2 + y^2 <= 1
            Threads.atomic_add!(hits, 1)
        end
    end
    
    return 4 * hits[] / n
end

parallel_monte_carlo_pi(10^7)

六、总结 #

本章我们学习了:

  1. 线程基础:启动和查看线程
  2. @threads宏:并行循环
  3. 线程安全:Atomic和锁
  4. 并行算法:映射、归约、分治

接下来让我们学习Julia的分布式计算!

最后更新:2026-03-27