协程 #

一、Task基础 #

1.1 创建Task #

julia
t = Task(() -> println("Hello from task"))

schedule(t)

1.2 @task宏 #

julia
t = @task println("Hello from task")

schedule(t)

1.3 @async宏 #

julia
@async println("Hello from async")

1.4 等待Task完成 #

julia
t = @async begin
    sleep(1)
    println("Task done")
    42
end

wait(t)
fetch(t)

二、Channel #

2.1 创建Channel #

julia
ch = Channel{Int}(10)

2.2 发送和接收 #

julia
ch = Channel{Int}(10)

put!(ch, 1)
put!(ch, 2)

take!(ch)
take!(ch)

2.3 关闭Channel #

julia
ch = Channel{Int}(10)
put!(ch, 1)
close(ch)

take!(ch)
take!(ch)

2.4 迭代Channel #

julia
function producer(ch::Channel)
    for i in 1:5
        put!(ch, i)
        sleep(0.1)
    end
end

ch = Channel(10)
@async producer(ch)

for x in ch
    println(x)
end

三、生产者-消费者模式 #

3.1 基本模式 #

julia
function producer(ch::Channel)
    for i in 1:10
        put!(ch, i^2)
    end
    close(ch)
end

function consumer(ch::Channel)
    for x in ch
        println("Consumed: $x")
    end
end

ch = Channel(5)
@async producer(ch)
consumer(ch)

3.2 多消费者 #

julia
function producer(ch::Channel, count)
    for i in 1:count
        put!(ch, i)
    end
    close(ch)
end

function consumer(ch::Channel, id)
    for x in ch
        println("Consumer $id: $x")
    end
end

ch = Channel(10)
@async producer(ch, 20)

@async consumer(ch, 1)
@async consumer(ch, 2)

四、异步编程 #

4.1 @async #

julia
tasks = [@async begin
    sleep(rand())
    println("Task $i done")
end for i in 1:5]

foreach(wait, tasks)

4.2 @sync #

julia
@sync begin
    for i in 1:5
        @async begin
            sleep(rand())
            println("Task $i done")
        end
    end
end

4.3 超时处理 #

julia
function with_timeout(f, timeout)
    t = @async f()
    result = Ref{Any}(nothing)
    done = Ref(false)
    
    @async begin
        result[] = fetch(t)
        done[] = true
    end
    
    sleep(timeout)
    
    if done[]
        return result[]
    else
        return nothing
    end
end

五、实践练习 #

5.1 练习1:并行下载 #

julia
function parallel_download(urls)
    results = Vector{String}(undef, length(urls))
    
    @sync for (i, url) in enumerate(urls)
        @async begin
            results[i] = download(url)
        end
    end
    
    return results
end

5.2 练习2:工作队列 #

julia
function worker(ch::Channel, results::Channel, id)
    for job in ch
        result = process_job(job)
        put!(results, (id, result))
    end
end

function process_job(job)
    sleep(0.1)
    job * 2
end

jobs = Channel(10)
results = Channel(10)

for i in 1:3
    @async worker(jobs, results, i)
end

for i in 1:10
    put!(jobs, i)
end
close(jobs)

for _ in 1:10
    println(take!(results))
end

5.3 练习3:管道 #

julia
function stage1(input::Channel, output::Channel)
    for x in input
        put!(output, x * 2)
    end
    close(output)
end

function stage2(input::Channel, output::Channel)
    for x in input
        put!(output, x + 1)
    end
    close(output)
end

function pipeline(input_data)
    ch1 = Channel(10)
    ch2 = Channel(10)
    ch3 = Channel(10)
    
    @async begin
        for x in input_data
            put!(ch1, x)
        end
        close(ch1)
    end
    
    @async stage1(ch1, ch2)
    @async stage2(ch2, ch3)
    
    return collect(ch3)
end

pipeline([1, 2, 3, 4, 5])

六、总结 #

本章我们学习了:

  1. Task:创建和调度协程
  2. Channel:协程间通信
  3. 生产者-消费者:并发模式
  4. 异步编程:@async和@sync

接下来让我们学习Julia的多线程!

最后更新:2026-03-27