Go内存模型 #
Go内存模型定义了在并发环境下,变量读写的可见性规则。理解内存模型是编写正确并发程序的基础。
happens-before原则 #
基本概念 #
如果操作A “happens-before” 操作B,那么A的结果对B可见。
主要规则 #
- goroutine创建
- 启动goroutine的go语句 happens-before 该goroutine开始执行
go
package main
import "fmt"
var a string
func f() {
fmt.Println(a)
}
func main() {
a = "hello, world"
go f()
}
- goroutine销毁
- goroutine的退出不保证 happens-before 任何操作
go
package main
var a string
func main() {
go func() {
a = "hello"
}()
}
- 通道发送
- 向通道发送 happens-before 相应的接收完成
go
package main
import "fmt"
var a string
func main() {
done := make(chan bool)
go func() {
a = "hello, world"
done <- true
}()
<-done
fmt.Println(a)
}
- 通道关闭
- 关闭通道 happens-before 从该通道接收到零值
go
package main
import "fmt"
var a string
func main() {
done := make(chan bool)
go func() {
a = "hello, world"
close(done)
}()
<-done
fmt.Println(a)
}
- 无缓冲通道接收
- 从无缓冲通道接收 happens-before 发送完成
go
package main
import "fmt"
var a string
func main() {
done := make(chan bool)
go func() {
a = "hello, world"
<-done
}()
done <- true
fmt.Println(a)
}
- 有缓冲通道
- 对于容量为C的通道,第k次接收 happens-before 第k+C次发送完成
sync包同步原语 #
sync.Mutex #
go
package main
import (
"fmt"
"sync"
)
var (
counter int
mu sync.Mutex
)
func increment() {
mu.Lock()
defer mu.Unlock()
counter++
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
increment()
}()
}
wg.Wait()
fmt.Println("计数器:", counter)
}
内存模型保证:
- 对于变量x,
x = 1之后调用mu.Lock(),在mu.Unlock()之后x的值对其他goroutine可见
sync.RWMutex #
go
package main
import (
"fmt"
"sync"
)
type SafeMap struct {
mu sync.RWMutex
data map[string]int
}
func (m *SafeMap) Get(key string) (int, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
val, ok := m.data[key]
return val, ok
}
func (m *SafeMap) Set(key string, value int) {
m.mu.Lock()
defer m.mu.Unlock()
m.data[key] = value
}
func main() {
m := &SafeMap{
data: make(map[string]int),
}
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
m.Set(fmt.Sprintf("key%d", i), i)
}(i)
}
wg.Wait()
for i := 0; i < 10; i++ {
if val, ok := m.Get(fmt.Sprintf("key%d", i)); ok {
fmt.Printf("key%d = %d\n", i, val)
}
}
}
sync.WaitGroup #
go
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
var mu sync.Mutex
results := make([]int, 0)
for i := 0; i < 5; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
mu.Lock()
results = append(results, n*2)
mu.Unlock()
}(i)
}
wg.Wait()
fmt.Println("结果:", results)
}
内存模型保证:
wg.Add()happens-beforewg.Wait()返回
sync.Once #
go
package main
import (
"fmt"
"sync"
)
type Singleton struct {
data string
}
var (
instance *Singleton
once sync.Once
)
func GetInstance() *Singleton {
once.Do(func() {
instance = &Singleton{data: "initialized"}
})
return instance
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
inst := GetInstance()
fmt.Printf("实例地址: %p\n", inst)
}()
}
wg.Wait()
}
内存模型保证:
once.Do(f)中的f() happens-beforeonce.Do()返回
sync.Cond #
go
package main
import (
"fmt"
"sync"
"time"
)
type Queue struct {
items []int
mu sync.Mutex
cond *sync.Cond
}
func NewQueue() *Queue {
q := &Queue{}
q.cond = sync.NewCond(&q.mu)
return q
}
func (q *Queue) Put(item int) {
q.mu.Lock()
q.items = append(q.items, item)
q.cond.Signal()
q.mu.Unlock()
}
func (q *Queue) Get() int {
q.mu.Lock()
defer q.mu.Unlock()
for len(q.items) == 0 {
q.cond.Wait()
}
item := q.items[0]
q.items = q.items[1:]
return item
}
func main() {
q := NewQueue()
go func() {
time.Sleep(100 * time.Millisecond)
q.Put(1)
time.Sleep(100 * time.Millisecond)
q.Put(2)
}()
fmt.Println("获取:", q.Get())
fmt.Println("获取:", q.Get())
}
原子操作 #
sync/atomic包 #
go
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var counter int64
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1)
}()
}
wg.Wait()
fmt.Println("计数器:", atomic.LoadInt64(&counter))
}
原子操作类型 #
go
package main
import (
"fmt"
"sync/atomic"
)
func main() {
var i int64 = 0
atomic.AddInt64(&i, 10)
fmt.Println("加法:", atomic.LoadInt64(&i))
atomic.StoreInt64(&i, 100)
fmt.Println("存储:", atomic.LoadInt64(&i))
old := atomic.SwapInt64(&i, 200)
fmt.Printf("交换: 旧值=%d, 新值=%d\n", old, atomic.LoadInt64(&i))
swapped := atomic.CompareAndSwapInt64(&i, 200, 300)
fmt.Printf("CAS: 成功=%v, 值=%d\n", swapped, atomic.LoadInt64(&i))
}
原子值 #
go
package main
import (
"fmt"
"sync/atomic"
)
func main() {
var value atomic.Value
value.Store("hello")
v := value.Load()
fmt.Println("值:", v)
value.Store(42)
v = value.Load()
fmt.Println("新值:", v)
}
使用原子值存储配置 #
go
package main
import (
"fmt"
"sync/atomic"
"time"
)
type Config struct {
Host string
Port int
}
var config atomic.Value
func GetConfig() Config {
return config.Load().(Config)
}
func UpdateConfig(c Config) {
config.Store(c)
}
func main() {
config.Store(Config{Host: "localhost", Port: 8080})
go func() {
for {
cfg := GetConfig()
fmt.Printf("配置: %s:%d\n", cfg.Host, cfg.Port)
time.Sleep(500 * time.Millisecond)
}
}()
time.Sleep(time.Second)
UpdateConfig(Config{Host: "127.0.0.1", Port: 3000})
time.Sleep(time.Second)
}
常见陷阱 #
数据竞争 #
go
package main
var x int
func main() {
go func() {
x = 1
}()
if x == 1 {
println("x is 1")
}
}
检测:
bash
go run -race main.go
错误的同步 #
go
package main
var done bool
var x int
func wrong() {
x = 1
done = true
}
func main() {
go wrong()
for !done {
}
println(x)
}
正确做法:
go
package main
import "sync"
var done bool
var x int
var mu sync.Mutex
func correct() {
mu.Lock()
x = 1
done = true
mu.Unlock()
}
func main() {
go correct()
for {
mu.Lock()
d := done
mu.Unlock()
if d {
break
}
}
mu.Lock()
println(x)
mu.Unlock()
}
小结 #
| 同步原语 | 用途 |
|---|---|
sync.Mutex |
互斥锁 |
sync.RWMutex |
读写锁 |
sync.WaitGroup |
等待一组goroutine |
sync.Once |
只执行一次 |
sync.Cond |
条件变量 |
atomic |
原子操作 |
理解Go内存模型是编写正确并发程序的关键。始终使用适当的同步原语来保护共享数据,避免数据竞争。