Go内存模型 #

Go内存模型定义了在并发环境下,变量读写的可见性规则。理解内存模型是编写正确并发程序的基础。

happens-before原则 #

基本概念 #

如果操作A “happens-before” 操作B,那么A的结果对B可见。

主要规则 #

  1. goroutine创建
    • 启动goroutine的go语句 happens-before 该goroutine开始执行
go
package main

import "fmt"

var a string

func f() {
    fmt.Println(a)
}

func main() {
    a = "hello, world"
    go f()
}
  1. goroutine销毁
    • goroutine的退出不保证 happens-before 任何操作
go
package main

var a string

func main() {
    go func() {
        a = "hello"
    }()
    
}
  1. 通道发送
    • 向通道发送 happens-before 相应的接收完成
go
package main

import "fmt"

var a string

func main() {
    done := make(chan bool)
    
    go func() {
        a = "hello, world"
        done <- true
    }()
    
    <-done
    fmt.Println(a)
}
  1. 通道关闭
    • 关闭通道 happens-before 从该通道接收到零值
go
package main

import "fmt"

var a string

func main() {
    done := make(chan bool)
    
    go func() {
        a = "hello, world"
        close(done)
    }()
    
    <-done
    fmt.Println(a)
}
  1. 无缓冲通道接收
    • 从无缓冲通道接收 happens-before 发送完成
go
package main

import "fmt"

var a string

func main() {
    done := make(chan bool)
    
    go func() {
        a = "hello, world"
        <-done
    }()
    
    done <- true
    fmt.Println(a)
}
  1. 有缓冲通道
    • 对于容量为C的通道,第k次接收 happens-before 第k+C次发送完成

sync包同步原语 #

sync.Mutex #

go
package main

import (
    "fmt"
    "sync"
)

var (
    counter int
    mu      sync.Mutex
)

func increment() {
    mu.Lock()
    defer mu.Unlock()
    counter++
}

func main() {
    var wg sync.WaitGroup
    
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            increment()
        }()
    }
    
    wg.Wait()
    fmt.Println("计数器:", counter)
}

内存模型保证

  • 对于变量x,x = 1 之后调用 mu.Lock(),在 mu.Unlock() 之后 x 的值对其他goroutine可见

sync.RWMutex #

go
package main

import (
    "fmt"
    "sync"
)

type SafeMap struct {
    mu   sync.RWMutex
    data map[string]int
}

func (m *SafeMap) Get(key string) (int, bool) {
    m.mu.RLock()
    defer m.mu.RUnlock()
    val, ok := m.data[key]
    return val, ok
}

func (m *SafeMap) Set(key string, value int) {
    m.mu.Lock()
    defer m.mu.Unlock()
    m.data[key] = value
}

func main() {
    m := &SafeMap{
        data: make(map[string]int),
    }
    
    var wg sync.WaitGroup
    
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            m.Set(fmt.Sprintf("key%d", i), i)
        }(i)
    }
    
    wg.Wait()
    
    for i := 0; i < 10; i++ {
        if val, ok := m.Get(fmt.Sprintf("key%d", i)); ok {
            fmt.Printf("key%d = %d\n", i, val)
        }
    }
}

sync.WaitGroup #

go
package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    var mu sync.Mutex
    results := make([]int, 0)
    
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            
            mu.Lock()
            results = append(results, n*2)
            mu.Unlock()
        }(i)
    }
    
    wg.Wait()
    fmt.Println("结果:", results)
}

内存模型保证

  • wg.Add() happens-before wg.Wait() 返回

sync.Once #

go
package main

import (
    "fmt"
    "sync"
)

type Singleton struct {
    data string
}

var (
    instance *Singleton
    once     sync.Once
)

func GetInstance() *Singleton {
    once.Do(func() {
        instance = &Singleton{data: "initialized"}
    })
    return instance
}

func main() {
    var wg sync.WaitGroup
    
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            inst := GetInstance()
            fmt.Printf("实例地址: %p\n", inst)
        }()
    }
    
    wg.Wait()
}

内存模型保证

  • once.Do(f) 中的f() happens-before once.Do() 返回

sync.Cond #

go
package main

import (
    "fmt"
    "sync"
    "time"
)

type Queue struct {
    items []int
    mu    sync.Mutex
    cond  *sync.Cond
}

func NewQueue() *Queue {
    q := &Queue{}
    q.cond = sync.NewCond(&q.mu)
    return q
}

func (q *Queue) Put(item int) {
    q.mu.Lock()
    q.items = append(q.items, item)
    q.cond.Signal()
    q.mu.Unlock()
}

func (q *Queue) Get() int {
    q.mu.Lock()
    defer q.mu.Unlock()
    
    for len(q.items) == 0 {
        q.cond.Wait()
    }
    
    item := q.items[0]
    q.items = q.items[1:]
    return item
}

func main() {
    q := NewQueue()
    
    go func() {
        time.Sleep(100 * time.Millisecond)
        q.Put(1)
        time.Sleep(100 * time.Millisecond)
        q.Put(2)
    }()
    
    fmt.Println("获取:", q.Get())
    fmt.Println("获取:", q.Get())
}

原子操作 #

sync/atomic包 #

go
package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

func main() {
    var counter int64
    var wg sync.WaitGroup
    
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            atomic.AddInt64(&counter, 1)
        }()
    }
    
    wg.Wait()
    fmt.Println("计数器:", atomic.LoadInt64(&counter))
}

原子操作类型 #

go
package main

import (
    "fmt"
    "sync/atomic"
)

func main() {
    var i int64 = 0
    
    atomic.AddInt64(&i, 10)
    fmt.Println("加法:", atomic.LoadInt64(&i))
    
    atomic.StoreInt64(&i, 100)
    fmt.Println("存储:", atomic.LoadInt64(&i))
    
    old := atomic.SwapInt64(&i, 200)
    fmt.Printf("交换: 旧值=%d, 新值=%d\n", old, atomic.LoadInt64(&i))
    
    swapped := atomic.CompareAndSwapInt64(&i, 200, 300)
    fmt.Printf("CAS: 成功=%v, 值=%d\n", swapped, atomic.LoadInt64(&i))
}

原子值 #

go
package main

import (
    "fmt"
    "sync/atomic"
)

func main() {
    var value atomic.Value
    
    value.Store("hello")
    
    v := value.Load()
    fmt.Println("值:", v)
    
    value.Store(42)
    v = value.Load()
    fmt.Println("新值:", v)
}

使用原子值存储配置 #

go
package main

import (
    "fmt"
    "sync/atomic"
    "time"
)

type Config struct {
    Host string
    Port int
}

var config atomic.Value

func GetConfig() Config {
    return config.Load().(Config)
}

func UpdateConfig(c Config) {
    config.Store(c)
}

func main() {
    config.Store(Config{Host: "localhost", Port: 8080})
    
    go func() {
        for {
            cfg := GetConfig()
            fmt.Printf("配置: %s:%d\n", cfg.Host, cfg.Port)
            time.Sleep(500 * time.Millisecond)
        }
    }()
    
    time.Sleep(time.Second)
    UpdateConfig(Config{Host: "127.0.0.1", Port: 3000})
    
    time.Sleep(time.Second)
}

常见陷阱 #

数据竞争 #

go
package main

var x int

func main() {
    go func() {
        x = 1
    }()
    
    if x == 1 {
        println("x is 1")
    }
}

检测:

bash
go run -race main.go

错误的同步 #

go
package main

var done bool
var x int

func wrong() {
    x = 1
    done = true
}

func main() {
    go wrong()
    
    for !done {
    }
    
    println(x)
}

正确做法:

go
package main

import "sync"

var done bool
var x int
var mu sync.Mutex

func correct() {
    mu.Lock()
    x = 1
    done = true
    mu.Unlock()
}

func main() {
    go correct()
    
    for {
        mu.Lock()
        d := done
        mu.Unlock()
        if d {
            break
        }
    }
    
    mu.Lock()
    println(x)
    mu.Unlock()
}

小结 #

同步原语 用途
sync.Mutex 互斥锁
sync.RWMutex 读写锁
sync.WaitGroup 等待一组goroutine
sync.Once 只执行一次
sync.Cond 条件变量
atomic 原子操作

理解Go内存模型是编写正确并发程序的关键。始终使用适当的同步原语来保护共享数据,避免数据竞争。