Memcached分布式缓存 #

一、分布式缓存概述 #

1.1 为什么需要分布式缓存 #

text
单机缓存的限制:

1. 内存容量有限
   - 单机内存有限
   - 无法存储大量数据

2. 性能瓶颈
   - 单机QPS有限
   - 无法应对高并发

3. 单点故障
   - 服务器宕机
   - 缓存全部丢失

分布式缓存的优势:

1. 容量可扩展
   - 水平扩展节点
   - 线性增长容量

2. 性能可扩展
   - 分散请求压力
   - 提高整体QPS

3. 高可用
   - 多节点冗余
   - 故障自动转移

1.2 Memcached分布式架构 #

text
Memcached分布式架构:

┌─────────────────────────────────────────┐
│            应用服务器                    │
│  ┌─────────────────────────────────┐   │
│  │      Memcached客户端             │   │
│  │   (一致性哈希、分片策略)         │   │
│  └─────────────────────────────────┘   │
└─────────────────────────────────────────┘
                    │
        ┌───────────┼───────────┐
        │           │           │
        ▼           ▼           ▼
   ┌─────────┐ ┌─────────┐ ┌─────────┐
   │ 节点1   │ │ 节点2   │ │ 节点3   │
   │ 11211   │ │ 11212   │ │ 11213   │
   │ 数据A   │ │ 数据B   │ │ 数据C   │
   └─────────┘ └─────────┘ └─────────┘

特点:
1. 客户端分片
2. 无主架构
3. 数据分散存储

二、数据分片策略 #

2.1 简单取模分片 #

text
简单取模分片:

公式:node = hash(key) % node_count

示例:
节点列表:[node1, node2, node3]
节点数量:3

key1 → hash(key1) = 123 → 123 % 3 = 0 → node1
key2 → hash(key2) = 456 → 456 % 3 = 0 → node1
key3 → hash(key3) = 789 → 789 % 3 = 0 → node1
key4 → hash(key4) = 234 → 234 % 3 = 1 → node2
key5 → hash(key5) = 567 → 567 % 3 = 0 → node1
key6 → hash(key6) = 890 → 890 % 3 = 2 → node3

优点:
- 简单易懂
- 计算快速

缺点:
- 节点增减时数据迁移量大
- 数据分布可能不均匀

2.2 一致性哈希 #

text
一致性哈希:

原理:
1. 将哈希值空间组织成环(0 ~ 2^32-1)
2. 将节点映射到环上
3. 将数据映射到环上
4. 数据顺时针找到的第一个节点就是存储节点

示意图:
                    0
                    │
            node3 ──┼── node1
                    │
            node2 ──┼
                    │
                   2^32-1

示例:
节点:node1, node2, node3
数据:key1, key2, key3

key1 → hash(key1) = 100 → 顺时针找到node1
key2 → hash(key2) = 200 → 顺时针找到node2
key3 → hash(key3) = 300 → 顺时针找到node3

优点:
- 节点增减时只影响相邻节点
- 数据迁移量小

缺点:
- 数据分布可能不均匀
- 需要虚拟节点解决

2.3 虚拟节点 #

text
虚拟节点:

原理:
1. 每个物理节点对应多个虚拟节点
2. 虚拟节点均匀分布在哈希环上
3. 数据映射到虚拟节点,再映射到物理节点

示例:
物理节点:node1, node2, node3
虚拟节点:每个物理节点对应100个虚拟节点

node1 → node1#1, node1#2, ..., node1#100
node2 → node2#1, node2#2, ..., node2#100
node3 → node3#1, node3#2, ..., node3#100

优点:
- 数据分布更均匀
- 负载更均衡

示意图:
                    0
                    │
       node1#23 ────┼──── node2#45
                    │
       node3#67 ────┼──── node1#89
                    │
                   2^32-1

三、一致性哈希实现 #

3.1 Python实现 #

python
import hashlib
from bisect import bisect

class ConsistentHash:
    def __init__(self, nodes=None, virtual_nodes=100):
        self.virtual_nodes = virtual_nodes
        self.ring = {}
        self.sorted_keys = []
        
        if nodes:
            for node in nodes:
                self.add_node(node)
    
    def add_node(self, node):
        for i in range(self.virtual_nodes):
            virtual_node = f"{node}#{i}"
            key = self._hash(virtual_node)
            self.ring[key] = node
            self.sorted_keys.append(key)
        
        self.sorted_keys.sort()
    
    def remove_node(self, node):
        for i in range(self.virtual_nodes):
            virtual_node = f"{node}#{i}"
            key = self._hash(virtual_node)
            
            if key in self.ring:
                del self.ring[key]
                self.sorted_keys.remove(key)
    
    def get_node(self, key):
        if not self.ring:
            return None
        
        hash_key = self._hash(key)
        index = bisect(self.sorted_keys, hash_key)
        
        if index >= len(self.sorted_keys):
            index = 0
        
        return self.ring[self.sorted_keys[index]]
    
    def _hash(self, key):
        return int(hashlib.md5(key.encode()).hexdigest(), 16)

# 使用示例
nodes = ['node1', 'node2', 'node3']
ch = ConsistentHash(nodes, virtual_nodes=100)

print(ch.get_node('user:1001'))
print(ch.get_node('user:1002'))
print(ch.get_node('user:1003'))

3.2 Java实现 #

java
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;

public class ConsistentHash {
    private final int virtualNodes;
    private final TreeMap<Long, String> ring = new TreeMap<>();
    
    public ConsistentHash(List<String> nodes, int virtualNodes) {
        this.virtualNodes = virtualNodes;
        
        for (String node : nodes) {
            addNode(node);
        }
    }
    
    public void addNode(String node) {
        for (int i = 0; i < virtualNodes; i++) {
            String virtualNode = node + "#" + i;
            long hash = hash(virtualNode);
            ring.put(hash, node);
        }
    }
    
    public void removeNode(String node) {
        for (int i = 0; i < virtualNodes; i++) {
            String virtualNode = node + "#" + i;
            long hash = hash(virtualNode);
            ring.remove(hash);
        }
    }
    
    public String getNode(String key) {
        if (ring.isEmpty()) {
            return null;
        }
        
        long hash = hash(key);
        Map.Entry<Long, String> entry = ring.ceilingEntry(hash);
        
        if (entry == null) {
            entry = ring.firstEntry();
        }
        
        return entry.getValue();
    }
    
    private long hash(String key) {
        try {
            MessageDigest md = MessageDigest.getInstance("MD5");
            byte[] digest = md.digest(key.getBytes());
            long hash = 0;
            for (int i = 0; i < 4; i++) {
                hash <<= 8;
                hash |= digest[i] & 0xFF;
            }
            return hash;
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException(e);
        }
    }
    
    public static void main(String[] args) {
        List<String> nodes = Arrays.asList("node1", "node2", "node3");
        ConsistentHash ch = new ConsistentHash(nodes, 100);
        
        System.out.println(ch.getNode("user:1001"));
        System.out.println(ch.getNode("user:1002"));
        System.out.println(ch.getNode("user:1003"));
    }
}

四、集群部署 #

4.1 单机多实例 #

bash
# 启动多个Memcached实例
memcached -d -m 512 -p 11211 -u memcache
memcached -d -m 512 -p 11212 -u memcache
memcached -d -m 512 -p 11213 -u memcache

# 查看进程
ps aux | grep memcached

4.2 多机部署 #

text
多机部署架构:

┌─────────────────────────────────────────┐
│            负载均衡器                    │
│         (Nginx/HAProxy)                 │
└─────────────────────────────────────────┘
                    │
        ┌───────────┼───────────┐
        │           │           │
        ▼           ▼           ▼
   ┌─────────┐ ┌─────────┐ ┌─────────┐
   │ 应用服务器│ │ 应用服务器│ │ 应用服务器│
   │ App1    │ │ App2    │ │ App3    │
   └─────────┘ └─────────┘ └─────────┘
        │           │           │
        └───────────┼───────────┘
                    │
        ┌───────────┼───────────┐
        │           │           │
        ▼           ▼           ▼
   ┌─────────┐ ┌─────────┐ ┌─────────┐
   │Memcached│ │Memcached│ │Memcached│
   │ Server1 │ │ Server2 │ │ Server3 │
   │ 11211   │ │ 11211   │ │ 11211   │
   └─────────┘ └─────────┘ └─────────┘

4.3 客户端配置 #

python
from pymemcache.client.hash import HashClient

# 配置多个服务器
servers = [
    ('192.168.1.101', 11211),
    ('192.168.1.102', 11211),
    ('192.168.1.103', 11211)
]

# 创建客户端
client = HashClient(
    servers,
    use_pooling=True,
    timeout=1,
    ignore_exc=True  # 忽略错误,继续尝试其他节点
)

# 使用
client.set('key', 'value')
value = client.get('key')

五、故障转移 #

5.1 故障检测 #

python
from pymemcache.client.hash import HashClient
import time

class FailoverClient:
    def __init__(self, servers):
        self.servers = servers
        self.client = HashClient(
            servers,
            use_pooling=True,
            timeout=1,
            ignore_exc=True
        )
        self.failed_nodes = set()
    
    def check_health(self):
        for server in self.servers:
            try:
                # 尝试连接
                test_client = HashClient([server], timeout=1)
                test_client.stats()
                
                # 如果之前失败,现在恢复
                if server in self.failed_nodes:
                    self.failed_nodes.remove(server)
                    print(f"Node {server} recovered")
            except Exception as e:
                if server not in self.failed_nodes:
                    self.failed_nodes.add(server)
                    print(f"Node {server} failed: {e}")
    
    def get(self, key):
        try:
            return self.client.get(key)
        except Exception as e:
            print(f"Error getting {key}: {e}")
            return None
    
    def set(self, key, value, expire=3600):
        try:
            self.client.set(key, value, expire=expire)
            return True
        except Exception as e:
            print(f"Error setting {key}: {e}")
            return False

# 使用
servers = [
    ('192.168.1.101', 11211),
    ('192.168.1.102', 11211),
    ('192.168.1.103', 11211)
]

client = FailoverClient(servers)

# 定期健康检查
while True:
    client.check_health()
    time.sleep(10)

5.2 自动故障转移 #

python
from pymemcache.client.hash import HashClient
import time
import threading

class AutoFailoverClient:
    def __init__(self, servers, check_interval=10):
        self.all_servers = servers
        self.active_servers = list(servers)
        self.check_interval = check_interval
        self.client = HashClient(
            self.active_servers,
            use_pooling=True,
            timeout=1,
            ignore_exc=True
        )
        
        # 启动健康检查线程
        self.health_thread = threading.Thread(target=self._health_check)
        self.health_thread.daemon = True
        self.health_thread.start()
    
    def _health_check(self):
        while True:
            for server in self.all_servers:
                try:
                    test_client = HashClient([server], timeout=1)
                    test_client.stats()
                    
                    if server not in self.active_servers:
                        self.active_servers.append(server)
                        self._rebuild_client()
                        print(f"Node {server} added back")
                except Exception as e:
                    if server in self.active_servers:
                        self.active_servers.remove(server)
                        self._rebuild_client()
                        print(f"Node {server} removed: {e}")
            
            time.sleep(self.check_interval)
    
    def _rebuild_client(self):
        self.client = HashClient(
            self.active_servers,
            use_pooling=True,
            timeout=1,
            ignore_exc=True
        )
    
    def get(self, key):
        return self.client.get(key)
    
    def set(self, key, value, expire=3600):
        self.client.set(key, value, expire=expire)

# 使用
servers = [
    ('192.168.1.101', 11211),
    ('192.168.1.102', 11211),
    ('192.168.1.103', 11211)
]

client = AutoFailoverClient(servers)

六、数据复制 #

6.1 客户端双写 #

python
from pymemcache.client.base import Client

class ReplicatedClient:
    def __init__(self, servers):
        self.clients = [Client(server) for server in servers]
    
    def set(self, key, value, expire=3600):
        # 写入所有节点
        for client in self.clients:
            try:
                client.set(key, value, expire=expire)
            except Exception as e:
                print(f"Error writing to {client.server}: {e}")
    
    def get(self, key):
        # 从第一个可用节点读取
        for client in self.clients:
            try:
                value = client.get(key)
                if value is not None:
                    return value
            except Exception as e:
                print(f"Error reading from {client.server}: {e}")
        
        return None
    
    def delete(self, key):
        # 从所有节点删除
        for client in self.clients:
            try:
                client.delete(key)
            except Exception as e:
                print(f"Error deleting from {client.server}: {e}")

# 使用
servers = [
    ('192.168.1.101', 11211),
    ('192.168.1.102', 11211)
]

client = ReplicatedClient(servers)
client.set('key', 'value')
value = client.get('key')

6.2 使用代理 #

text
使用Twemproxy:

1. 安装Twemproxy
   git clone https://github.com/twitter/twemproxy.git
   cd twemproxy
   autoreconf -fvi
   ./configure
   make
   sudo make install

2. 配置文件(nutcracker.yml)
   memcached:
     listen: 127.0.0.1:22122
     hash: fnv1a_64
     distribution: ketama
     auto_eject_hosts: true
     timeout: 400
     server_retry_timeout: 2000
     server_failure_limit: 2
     servers:
       - 192.168.1.101:11211:1
       - 192.168.1.102:11211:1
       - 192.168.1.103:11211:1

3. 启动Twemproxy
   nutcracker -c nutcracker.yml -d

4. 客户端连接
   client = Client(('127.0.0.1', 22122))

七、监控与管理 #

7.1 集群监控 #

python
from pymemcache.client.base import Client
import time

class ClusterMonitor:
    def __init__(self, servers):
        self.servers = servers
    
    def get_stats(self):
        stats = {}
        
        for server in self.servers:
            try:
                client = Client(server, timeout=1)
                server_stats = client.stats()
                stats[server] = server_stats
                client.close()
            except Exception as e:
                stats[server] = {'error': str(e)}
        
        return stats
    
    def print_report(self):
        stats = self.get_stats()
        
        print("=" * 60)
        print(f"Memcached集群监控报告 - {time.strftime('%Y-%m-%d %H:%M:%S')}")
        print("=" * 60)
        
        for server, server_stats in stats.items():
            print(f"\n服务器: {server}")
            print("-" * 60)
            
            if 'error' in server_stats:
                print(f"  状态: 错误 - {server_stats['error']}")
                continue
            
            get_hits = int(server_stats.get(b'get_hits', 0))
            get_misses = int(server_stats.get(b'get_misses', 0))
            bytes_stored = int(server_stats.get(b'bytes', 0))
            limit_maxbytes = int(server_stats.get(b'limit_maxbytes', 1))
            curr_connections = int(server_stats.get(b'curr_connections', 0))
            
            if get_hits + get_misses > 0:
                hit_rate = get_hits / (get_hits + get_misses) * 100
            else:
                hit_rate = 0
            
            mem_usage = bytes_stored / limit_maxbytes * 100
            
            print(f"  命中率: {hit_rate:.2f}%")
            print(f"  内存使用率: {mem_usage:.2f}%")
            print(f"  当前连接数: {curr_connections}")
            print(f"  状态: 正常")
        
        print("=" * 60)

# 使用
servers = [
    ('192.168.1.101', 11211),
    ('192.168.1.102', 11211),
    ('192.168.1.103', 11211)
]

monitor = ClusterMonitor(servers)
monitor.print_report()

八、总结 #

分布式缓存要点:

方面 说明
分片策略 一致性哈希、虚拟节点
故障转移 健康检查、自动切换
数据复制 客户端双写、代理
监控管理 集群监控、告警

最佳实践:

  1. 使用一致性哈希分片
  2. 配置虚拟节点均衡负载
  3. 实现故障检测和转移
  4. 监控集群状态
  5. 合理规划容量

下一步,让我们学习Memcached的实战案例!

最后更新:2026-03-27