协程 #
一、Task基础 #
1.1 创建Task #
julia
t = Task(() -> println("Hello from task"))
schedule(t)
1.2 @task宏 #
julia
t = @task println("Hello from task")
schedule(t)
1.3 @async宏 #
julia
@async println("Hello from async")
1.4 等待Task完成 #
julia
t = @async begin
sleep(1)
println("Task done")
42
end
wait(t)
fetch(t)
二、Channel #
2.1 创建Channel #
julia
ch = Channel{Int}(10)
2.2 发送和接收 #
julia
ch = Channel{Int}(10)
put!(ch, 1)
put!(ch, 2)
take!(ch)
take!(ch)
2.3 关闭Channel #
julia
ch = Channel{Int}(10)
put!(ch, 1)
close(ch)
take!(ch)
take!(ch)
2.4 迭代Channel #
julia
function producer(ch::Channel)
for i in 1:5
put!(ch, i)
sleep(0.1)
end
end
ch = Channel(10)
@async producer(ch)
for x in ch
println(x)
end
三、生产者-消费者模式 #
3.1 基本模式 #
julia
function producer(ch::Channel)
for i in 1:10
put!(ch, i^2)
end
close(ch)
end
function consumer(ch::Channel)
for x in ch
println("Consumed: $x")
end
end
ch = Channel(5)
@async producer(ch)
consumer(ch)
3.2 多消费者 #
julia
function producer(ch::Channel, count)
for i in 1:count
put!(ch, i)
end
close(ch)
end
function consumer(ch::Channel, id)
for x in ch
println("Consumer $id: $x")
end
end
ch = Channel(10)
@async producer(ch, 20)
@async consumer(ch, 1)
@async consumer(ch, 2)
四、异步编程 #
4.1 @async #
julia
tasks = [@async begin
sleep(rand())
println("Task $i done")
end for i in 1:5]
foreach(wait, tasks)
4.2 @sync #
julia
@sync begin
for i in 1:5
@async begin
sleep(rand())
println("Task $i done")
end
end
end
4.3 超时处理 #
julia
function with_timeout(f, timeout)
t = @async f()
result = Ref{Any}(nothing)
done = Ref(false)
@async begin
result[] = fetch(t)
done[] = true
end
sleep(timeout)
if done[]
return result[]
else
return nothing
end
end
五、实践练习 #
5.1 练习1:并行下载 #
julia
function parallel_download(urls)
results = Vector{String}(undef, length(urls))
@sync for (i, url) in enumerate(urls)
@async begin
results[i] = download(url)
end
end
return results
end
5.2 练习2:工作队列 #
julia
function worker(ch::Channel, results::Channel, id)
for job in ch
result = process_job(job)
put!(results, (id, result))
end
end
function process_job(job)
sleep(0.1)
job * 2
end
jobs = Channel(10)
results = Channel(10)
for i in 1:3
@async worker(jobs, results, i)
end
for i in 1:10
put!(jobs, i)
end
close(jobs)
for _ in 1:10
println(take!(results))
end
5.3 练习3:管道 #
julia
function stage1(input::Channel, output::Channel)
for x in input
put!(output, x * 2)
end
close(output)
end
function stage2(input::Channel, output::Channel)
for x in input
put!(output, x + 1)
end
close(output)
end
function pipeline(input_data)
ch1 = Channel(10)
ch2 = Channel(10)
ch3 = Channel(10)
@async begin
for x in input_data
put!(ch1, x)
end
close(ch1)
end
@async stage1(ch1, ch2)
@async stage2(ch2, ch3)
return collect(ch3)
end
pipeline([1, 2, 3, 4, 5])
六、总结 #
本章我们学习了:
- Task:创建和调度协程
- Channel:协程间通信
- 生产者-消费者:并发模式
- 异步编程:@async和@sync
接下来让我们学习Julia的多线程!
最后更新:2026-03-27