Channel模式 #
一、生产者消费者模式 #
1.1 基本模式 #
go
func producer(ch chan<- int) {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
}
func consumer(ch <-chan int) {
for v := range ch {
fmt.Println(v)
}
}
func main() {
ch := make(chan int, 5)
go producer(ch)
consumer(ch)
}
1.2 多生产者 #
go
func producer(id int, ch chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 5; i++ {
ch <- id*10 + i
}
}
func main() {
ch := make(chan int, 10)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go producer(i, ch, &wg)
}
go func() {
wg.Wait()
close(ch)
}()
for v := range ch {
fmt.Println(v)
}
}
1.3 多消费者 #
go
func consumer(id int, ch <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for v := range ch {
fmt.Printf("Consumer %d: %d\n", id, v)
}
}
func main() {
ch := make(chan int, 10)
var wg sync.WaitGroup
go func() {
for i := 0; i < 20; i++ {
ch <- i
}
close(ch)
}()
for i := 0; i < 3; i++ {
wg.Add(1)
go consumer(i, ch, &wg)
}
wg.Wait()
}
二、Worker Pool模式 #
2.1 基本Worker Pool #
go
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
for i := 0; i < 3; i++ {
go worker(i, jobs, results)
}
for i := 0; i < 10; i++ {
jobs <- i
}
close(jobs)
for i := 0; i < 10; i++ {
fmt.Println(<-results)
}
}
2.2 优雅关闭 #
go
type WorkerPool struct {
jobs chan Job
results chan Result
workers int
wg sync.WaitGroup
}
func NewWorkerPool(workers int) *WorkerPool {
return &WorkerPool{
jobs: make(chan Job, 100),
results: make(chan Result, 100),
workers: workers,
}
}
func (p *WorkerPool) Start() {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go p.worker(i)
}
}
func (p *WorkerPool) worker(id int) {
defer p.wg.Done()
for job := range p.jobs {
p.results <- job.Process()
}
}
func (p *WorkerPool) Stop() {
close(p.jobs)
p.wg.Wait()
close(p.results)
}
三、Pipeline模式 #
3.1 基本Pipeline #
go
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func main() {
nums := generate(1, 2, 3, 4, 5)
squares := square(nums)
for v := range squares {
fmt.Println(v)
}
}
3.2 多阶段Pipeline #
go
func stage1(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n + 1
}
}()
return out
}
func stage2(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * 2
}
}()
return out
}
func main() {
in := generate(1, 2, 3)
out := stage2(stage1(in))
for v := range out {
fmt.Println(v)
}
}
四、Fan-out/Fan-in模式 #
4.1 Fan-out #
多个goroutine从同一个channel读取:
go
func fanOut(in <-chan int, n int) []<-chan int {
var channels []<-chan int
for i := 0; i < n; i++ {
channels = append(channels, worker(in))
}
return channels
}
func worker(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
4.2 Fan-in #
多个channel合并到一个channel:
go
func fanIn(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
out <- v
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
4.3 完整示例 #
go
func main() {
in := generate(1, 2, 3, 4, 5)
// Fan-out
ch1 := worker(in)
ch2 := worker(in)
ch3 := worker(in)
// Fan-in
result := fanIn(ch1, ch2, ch3)
for v := range result {
fmt.Println(v)
}
}
五、取消模式 #
5.1 done通道 #
go
func worker(done <-chan struct{}, jobs <-chan int) {
for {
select {
case job, ok := <-jobs:
if !ok {
return
}
process(job)
case <-done:
return
}
}
}
5.2 Context取消 #
go
func worker(ctx context.Context, jobs <-chan int) {
for {
select {
case job, ok := <-jobs:
if !ok {
return
}
process(job)
case <-ctx.Done():
return
}
}
}
六、超时模式 #
6.1 单次超时 #
go
func withTimeout(ch <-chan int, timeout time.Duration) (int, error) {
select {
case v := <-ch:
return v, nil
case <-time.After(timeout):
return 0, errors.New("timeout")
}
}
6.2 心跳超时 #
go
func monitor(heartbeat <-chan time.Time, timeout time.Duration) {
for {
select {
case <-heartbeat:
// 正常
case <-time.After(timeout):
fmt.Println("No heartbeat")
return
}
}
}
七、实际应用 #
7.1 并发爬虫 #
go
type Crawler struct {
workers int
jobs chan string
results chan Page
}
func (c *Crawler) Run(urls []string) {
var wg sync.WaitGroup
for i := 0; i < c.workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for url := range c.jobs {
page := fetch(url)
c.results <- page
}
}()
}
go func() {
for _, url := range urls {
c.jobs <- url
}
close(c.jobs)
}()
go func() {
wg.Wait()
close(c.results)
}()
for page := range c.results {
process(page)
}
}
7.2 并发下载 #
go
func downloadAll(urls []string, concurrency int) [][]byte {
jobs := make(chan string, len(urls))
results := make(chan []byte, len(urls))
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for url := range jobs {
data, _ := http.Get(url)
results <- data
}
}()
}
for _, url := range urls {
jobs <- url
}
close(jobs)
go func() {
wg.Wait()
close(results)
}()
var all [][]byte
for data := range results {
all = append(all, data)
}
return all
}
八、最佳实践 #
8.1 关闭channel #
- 由发送方关闭
- 使用defer确保关闭
- 检查是否已关闭
8.2 避免goroutine泄漏 #
go
func safeWorker(done <-chan struct{}, jobs <-chan int) {
for {
select {
case <-done:
return
case job, ok := <-jobs:
if !ok {
return
}
process(job)
}
}
}
8.3 合理设置缓冲 #
go
ch := make(chan int, runtime.NumCPU()*10)
九、总结 #
Channel模式要点:
| 模式 | 说明 |
|---|---|
| 生产者消费者 | 解耦生产和消费 |
| Worker Pool | 复用goroutine |
| Pipeline | 数据处理流水线 |
| Fan-out/Fan-in | 并行处理和合并 |
| 取消模式 | 优雅退出 |
关键点:
- 生产者消费者:解耦生产者和消费者
- Worker Pool:复用goroutine,控制并发数
- Pipeline:多阶段数据处理
- Fan-out/Fan-in:并行处理和结果合并
- 取消模式:优雅退出goroutine
准备好学习错误处理了吗?让我们进入下一章!
最后更新:2026-03-26