Channel模式 #

一、生产者消费者模式 #

1.1 基本模式 #

go
func producer(ch chan<- int) {
    for i := 0; i < 10; i++ {
        ch <- i
    }
    close(ch)
}

func consumer(ch <-chan int) {
    for v := range ch {
        fmt.Println(v)
    }
}

func main() {
    ch := make(chan int, 5)
    go producer(ch)
    consumer(ch)
}

1.2 多生产者 #

go
func producer(id int, ch chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 0; i < 5; i++ {
        ch <- id*10 + i
    }
}

func main() {
    ch := make(chan int, 10)
    var wg sync.WaitGroup
    
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go producer(i, ch, &wg)
    }
    
    go func() {
        wg.Wait()
        close(ch)
    }()
    
    for v := range ch {
        fmt.Println(v)
    }
}

1.3 多消费者 #

go
func consumer(id int, ch <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for v := range ch {
        fmt.Printf("Consumer %d: %d\n", id, v)
    }
}

func main() {
    ch := make(chan int, 10)
    var wg sync.WaitGroup
    
    go func() {
        for i := 0; i < 20; i++ {
            ch <- i
        }
        close(ch)
    }()
    
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go consumer(i, ch, &wg)
    }
    
    wg.Wait()
}

二、Worker Pool模式 #

2.1 基本Worker Pool #

go
func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        results <- j * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)
    
    for i := 0; i < 3; i++ {
        go worker(i, jobs, results)
    }
    
    for i := 0; i < 10; i++ {
        jobs <- i
    }
    close(jobs)
    
    for i := 0; i < 10; i++ {
        fmt.Println(<-results)
    }
}

2.2 优雅关闭 #

go
type WorkerPool struct {
    jobs    chan Job
    results chan Result
    workers int
    wg      sync.WaitGroup
}

func NewWorkerPool(workers int) *WorkerPool {
    return &WorkerPool{
        jobs:    make(chan Job, 100),
        results: make(chan Result, 100),
        workers: workers,
    }
}

func (p *WorkerPool) Start() {
    for i := 0; i < p.workers; i++ {
        p.wg.Add(1)
        go p.worker(i)
    }
}

func (p *WorkerPool) worker(id int) {
    defer p.wg.Done()
    for job := range p.jobs {
        p.results <- job.Process()
    }
}

func (p *WorkerPool) Stop() {
    close(p.jobs)
    p.wg.Wait()
    close(p.results)
}

三、Pipeline模式 #

3.1 基本Pipeline #

go
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

func main() {
    nums := generate(1, 2, 3, 4, 5)
    squares := square(nums)
    
    for v := range squares {
        fmt.Println(v)
    }
}

3.2 多阶段Pipeline #

go
func stage1(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n + 1
        }
    }()
    return out
}

func stage2(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * 2
        }
    }()
    return out
}

func main() {
    in := generate(1, 2, 3)
    out := stage2(stage1(in))
    
    for v := range out {
        fmt.Println(v)
    }
}

四、Fan-out/Fan-in模式 #

4.1 Fan-out #

多个goroutine从同一个channel读取:

go
func fanOut(in <-chan int, n int) []<-chan int {
    var channels []<-chan int
    for i := 0; i < n; i++ {
        channels = append(channels, worker(in))
    }
    return channels
}

func worker(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

4.2 Fan-in #

多个channel合并到一个channel:

go
func fanIn(channels ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                out <- v
            }
        }(ch)
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

4.3 完整示例 #

go
func main() {
    in := generate(1, 2, 3, 4, 5)
    
    // Fan-out
    ch1 := worker(in)
    ch2 := worker(in)
    ch3 := worker(in)
    
    // Fan-in
    result := fanIn(ch1, ch2, ch3)
    
    for v := range result {
        fmt.Println(v)
    }
}

五、取消模式 #

5.1 done通道 #

go
func worker(done <-chan struct{}, jobs <-chan int) {
    for {
        select {
        case job, ok := <-jobs:
            if !ok {
                return
            }
            process(job)
        case <-done:
            return
        }
    }
}

5.2 Context取消 #

go
func worker(ctx context.Context, jobs <-chan int) {
    for {
        select {
        case job, ok := <-jobs:
            if !ok {
                return
            }
            process(job)
        case <-ctx.Done():
            return
        }
    }
}

六、超时模式 #

6.1 单次超时 #

go
func withTimeout(ch <-chan int, timeout time.Duration) (int, error) {
    select {
    case v := <-ch:
        return v, nil
    case <-time.After(timeout):
        return 0, errors.New("timeout")
    }
}

6.2 心跳超时 #

go
func monitor(heartbeat <-chan time.Time, timeout time.Duration) {
    for {
        select {
        case <-heartbeat:
            // 正常
        case <-time.After(timeout):
            fmt.Println("No heartbeat")
            return
        }
    }
}

七、实际应用 #

7.1 并发爬虫 #

go
type Crawler struct {
    workers int
    jobs    chan string
    results chan Page
}

func (c *Crawler) Run(urls []string) {
    var wg sync.WaitGroup
    
    for i := 0; i < c.workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for url := range c.jobs {
                page := fetch(url)
                c.results <- page
            }
        }()
    }
    
    go func() {
        for _, url := range urls {
            c.jobs <- url
        }
        close(c.jobs)
    }()
    
    go func() {
        wg.Wait()
        close(c.results)
    }()
    
    for page := range c.results {
        process(page)
    }
}

7.2 并发下载 #

go
func downloadAll(urls []string, concurrency int) [][]byte {
    jobs := make(chan string, len(urls))
    results := make(chan []byte, len(urls))
    
    var wg sync.WaitGroup
    for i := 0; i < concurrency; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for url := range jobs {
                data, _ := http.Get(url)
                results <- data
            }
        }()
    }
    
    for _, url := range urls {
        jobs <- url
    }
    close(jobs)
    
    go func() {
        wg.Wait()
        close(results)
    }()
    
    var all [][]byte
    for data := range results {
        all = append(all, data)
    }
    return all
}

八、最佳实践 #

8.1 关闭channel #

  • 由发送方关闭
  • 使用defer确保关闭
  • 检查是否已关闭

8.2 避免goroutine泄漏 #

go
func safeWorker(done <-chan struct{}, jobs <-chan int) {
    for {
        select {
        case <-done:
            return
        case job, ok := <-jobs:
            if !ok {
                return
            }
            process(job)
        }
    }
}

8.3 合理设置缓冲 #

go
ch := make(chan int, runtime.NumCPU()*10)

九、总结 #

Channel模式要点:

模式 说明
生产者消费者 解耦生产和消费
Worker Pool 复用goroutine
Pipeline 数据处理流水线
Fan-out/Fan-in 并行处理和合并
取消模式 优雅退出

关键点:

  1. 生产者消费者:解耦生产者和消费者
  2. Worker Pool:复用goroutine,控制并发数
  3. Pipeline:多阶段数据处理
  4. Fan-out/Fan-in:并行处理和结果合并
  5. 取消模式:优雅退出goroutine

准备好学习错误处理了吗?让我们进入下一章!

最后更新:2026-03-26