Memcached分布式缓存 #
一、分布式缓存概述 #
1.1 为什么需要分布式缓存 #
text
单机缓存的限制:
1. 内存容量有限
- 单机内存有限
- 无法存储大量数据
2. 性能瓶颈
- 单机QPS有限
- 无法应对高并发
3. 单点故障
- 服务器宕机
- 缓存全部丢失
分布式缓存的优势:
1. 容量可扩展
- 水平扩展节点
- 线性增长容量
2. 性能可扩展
- 分散请求压力
- 提高整体QPS
3. 高可用
- 多节点冗余
- 故障自动转移
1.2 Memcached分布式架构 #
text
Memcached分布式架构:
┌─────────────────────────────────────────┐
│ 应用服务器 │
│ ┌─────────────────────────────────┐ │
│ │ Memcached客户端 │ │
│ │ (一致性哈希、分片策略) │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────┘
│
┌───────────┼───────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ 节点1 │ │ 节点2 │ │ 节点3 │
│ 11211 │ │ 11212 │ │ 11213 │
│ 数据A │ │ 数据B │ │ 数据C │
└─────────┘ └─────────┘ └─────────┘
特点:
1. 客户端分片
2. 无主架构
3. 数据分散存储
二、数据分片策略 #
2.1 简单取模分片 #
text
简单取模分片:
公式:node = hash(key) % node_count
示例:
节点列表:[node1, node2, node3]
节点数量:3
key1 → hash(key1) = 123 → 123 % 3 = 0 → node1
key2 → hash(key2) = 456 → 456 % 3 = 0 → node1
key3 → hash(key3) = 789 → 789 % 3 = 0 → node1
key4 → hash(key4) = 234 → 234 % 3 = 1 → node2
key5 → hash(key5) = 567 → 567 % 3 = 0 → node1
key6 → hash(key6) = 890 → 890 % 3 = 2 → node3
优点:
- 简单易懂
- 计算快速
缺点:
- 节点增减时数据迁移量大
- 数据分布可能不均匀
2.2 一致性哈希 #
text
一致性哈希:
原理:
1. 将哈希值空间组织成环(0 ~ 2^32-1)
2. 将节点映射到环上
3. 将数据映射到环上
4. 数据顺时针找到的第一个节点就是存储节点
示意图:
0
│
node3 ──┼── node1
│
node2 ──┼
│
2^32-1
示例:
节点:node1, node2, node3
数据:key1, key2, key3
key1 → hash(key1) = 100 → 顺时针找到node1
key2 → hash(key2) = 200 → 顺时针找到node2
key3 → hash(key3) = 300 → 顺时针找到node3
优点:
- 节点增减时只影响相邻节点
- 数据迁移量小
缺点:
- 数据分布可能不均匀
- 需要虚拟节点解决
2.3 虚拟节点 #
text
虚拟节点:
原理:
1. 每个物理节点对应多个虚拟节点
2. 虚拟节点均匀分布在哈希环上
3. 数据映射到虚拟节点,再映射到物理节点
示例:
物理节点:node1, node2, node3
虚拟节点:每个物理节点对应100个虚拟节点
node1 → node1#1, node1#2, ..., node1#100
node2 → node2#1, node2#2, ..., node2#100
node3 → node3#1, node3#2, ..., node3#100
优点:
- 数据分布更均匀
- 负载更均衡
示意图:
0
│
node1#23 ────┼──── node2#45
│
node3#67 ────┼──── node1#89
│
2^32-1
三、一致性哈希实现 #
3.1 Python实现 #
python
import hashlib
from bisect import bisect
class ConsistentHash:
def __init__(self, nodes=None, virtual_nodes=100):
self.virtual_nodes = virtual_nodes
self.ring = {}
self.sorted_keys = []
if nodes:
for node in nodes:
self.add_node(node)
def add_node(self, node):
for i in range(self.virtual_nodes):
virtual_node = f"{node}#{i}"
key = self._hash(virtual_node)
self.ring[key] = node
self.sorted_keys.append(key)
self.sorted_keys.sort()
def remove_node(self, node):
for i in range(self.virtual_nodes):
virtual_node = f"{node}#{i}"
key = self._hash(virtual_node)
if key in self.ring:
del self.ring[key]
self.sorted_keys.remove(key)
def get_node(self, key):
if not self.ring:
return None
hash_key = self._hash(key)
index = bisect(self.sorted_keys, hash_key)
if index >= len(self.sorted_keys):
index = 0
return self.ring[self.sorted_keys[index]]
def _hash(self, key):
return int(hashlib.md5(key.encode()).hexdigest(), 16)
# 使用示例
nodes = ['node1', 'node2', 'node3']
ch = ConsistentHash(nodes, virtual_nodes=100)
print(ch.get_node('user:1001'))
print(ch.get_node('user:1002'))
print(ch.get_node('user:1003'))
3.2 Java实现 #
java
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
public class ConsistentHash {
private final int virtualNodes;
private final TreeMap<Long, String> ring = new TreeMap<>();
public ConsistentHash(List<String> nodes, int virtualNodes) {
this.virtualNodes = virtualNodes;
for (String node : nodes) {
addNode(node);
}
}
public void addNode(String node) {
for (int i = 0; i < virtualNodes; i++) {
String virtualNode = node + "#" + i;
long hash = hash(virtualNode);
ring.put(hash, node);
}
}
public void removeNode(String node) {
for (int i = 0; i < virtualNodes; i++) {
String virtualNode = node + "#" + i;
long hash = hash(virtualNode);
ring.remove(hash);
}
}
public String getNode(String key) {
if (ring.isEmpty()) {
return null;
}
long hash = hash(key);
Map.Entry<Long, String> entry = ring.ceilingEntry(hash);
if (entry == null) {
entry = ring.firstEntry();
}
return entry.getValue();
}
private long hash(String key) {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] digest = md.digest(key.getBytes());
long hash = 0;
for (int i = 0; i < 4; i++) {
hash <<= 8;
hash |= digest[i] & 0xFF;
}
return hash;
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
List<String> nodes = Arrays.asList("node1", "node2", "node3");
ConsistentHash ch = new ConsistentHash(nodes, 100);
System.out.println(ch.getNode("user:1001"));
System.out.println(ch.getNode("user:1002"));
System.out.println(ch.getNode("user:1003"));
}
}
四、集群部署 #
4.1 单机多实例 #
bash
# 启动多个Memcached实例
memcached -d -m 512 -p 11211 -u memcache
memcached -d -m 512 -p 11212 -u memcache
memcached -d -m 512 -p 11213 -u memcache
# 查看进程
ps aux | grep memcached
4.2 多机部署 #
text
多机部署架构:
┌─────────────────────────────────────────┐
│ 负载均衡器 │
│ (Nginx/HAProxy) │
└─────────────────────────────────────────┘
│
┌───────────┼───────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ 应用服务器│ │ 应用服务器│ │ 应用服务器│
│ App1 │ │ App2 │ │ App3 │
└─────────┘ └─────────┘ └─────────┘
│ │ │
└───────────┼───────────┘
│
┌───────────┼───────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│Memcached│ │Memcached│ │Memcached│
│ Server1 │ │ Server2 │ │ Server3 │
│ 11211 │ │ 11211 │ │ 11211 │
└─────────┘ └─────────┘ └─────────┘
4.3 客户端配置 #
python
from pymemcache.client.hash import HashClient
# 配置多个服务器
servers = [
('192.168.1.101', 11211),
('192.168.1.102', 11211),
('192.168.1.103', 11211)
]
# 创建客户端
client = HashClient(
servers,
use_pooling=True,
timeout=1,
ignore_exc=True # 忽略错误,继续尝试其他节点
)
# 使用
client.set('key', 'value')
value = client.get('key')
五、故障转移 #
5.1 故障检测 #
python
from pymemcache.client.hash import HashClient
import time
class FailoverClient:
def __init__(self, servers):
self.servers = servers
self.client = HashClient(
servers,
use_pooling=True,
timeout=1,
ignore_exc=True
)
self.failed_nodes = set()
def check_health(self):
for server in self.servers:
try:
# 尝试连接
test_client = HashClient([server], timeout=1)
test_client.stats()
# 如果之前失败,现在恢复
if server in self.failed_nodes:
self.failed_nodes.remove(server)
print(f"Node {server} recovered")
except Exception as e:
if server not in self.failed_nodes:
self.failed_nodes.add(server)
print(f"Node {server} failed: {e}")
def get(self, key):
try:
return self.client.get(key)
except Exception as e:
print(f"Error getting {key}: {e}")
return None
def set(self, key, value, expire=3600):
try:
self.client.set(key, value, expire=expire)
return True
except Exception as e:
print(f"Error setting {key}: {e}")
return False
# 使用
servers = [
('192.168.1.101', 11211),
('192.168.1.102', 11211),
('192.168.1.103', 11211)
]
client = FailoverClient(servers)
# 定期健康检查
while True:
client.check_health()
time.sleep(10)
5.2 自动故障转移 #
python
from pymemcache.client.hash import HashClient
import time
import threading
class AutoFailoverClient:
def __init__(self, servers, check_interval=10):
self.all_servers = servers
self.active_servers = list(servers)
self.check_interval = check_interval
self.client = HashClient(
self.active_servers,
use_pooling=True,
timeout=1,
ignore_exc=True
)
# 启动健康检查线程
self.health_thread = threading.Thread(target=self._health_check)
self.health_thread.daemon = True
self.health_thread.start()
def _health_check(self):
while True:
for server in self.all_servers:
try:
test_client = HashClient([server], timeout=1)
test_client.stats()
if server not in self.active_servers:
self.active_servers.append(server)
self._rebuild_client()
print(f"Node {server} added back")
except Exception as e:
if server in self.active_servers:
self.active_servers.remove(server)
self._rebuild_client()
print(f"Node {server} removed: {e}")
time.sleep(self.check_interval)
def _rebuild_client(self):
self.client = HashClient(
self.active_servers,
use_pooling=True,
timeout=1,
ignore_exc=True
)
def get(self, key):
return self.client.get(key)
def set(self, key, value, expire=3600):
self.client.set(key, value, expire=expire)
# 使用
servers = [
('192.168.1.101', 11211),
('192.168.1.102', 11211),
('192.168.1.103', 11211)
]
client = AutoFailoverClient(servers)
六、数据复制 #
6.1 客户端双写 #
python
from pymemcache.client.base import Client
class ReplicatedClient:
def __init__(self, servers):
self.clients = [Client(server) for server in servers]
def set(self, key, value, expire=3600):
# 写入所有节点
for client in self.clients:
try:
client.set(key, value, expire=expire)
except Exception as e:
print(f"Error writing to {client.server}: {e}")
def get(self, key):
# 从第一个可用节点读取
for client in self.clients:
try:
value = client.get(key)
if value is not None:
return value
except Exception as e:
print(f"Error reading from {client.server}: {e}")
return None
def delete(self, key):
# 从所有节点删除
for client in self.clients:
try:
client.delete(key)
except Exception as e:
print(f"Error deleting from {client.server}: {e}")
# 使用
servers = [
('192.168.1.101', 11211),
('192.168.1.102', 11211)
]
client = ReplicatedClient(servers)
client.set('key', 'value')
value = client.get('key')
6.2 使用代理 #
text
使用Twemproxy:
1. 安装Twemproxy
git clone https://github.com/twitter/twemproxy.git
cd twemproxy
autoreconf -fvi
./configure
make
sudo make install
2. 配置文件(nutcracker.yml)
memcached:
listen: 127.0.0.1:22122
hash: fnv1a_64
distribution: ketama
auto_eject_hosts: true
timeout: 400
server_retry_timeout: 2000
server_failure_limit: 2
servers:
- 192.168.1.101:11211:1
- 192.168.1.102:11211:1
- 192.168.1.103:11211:1
3. 启动Twemproxy
nutcracker -c nutcracker.yml -d
4. 客户端连接
client = Client(('127.0.0.1', 22122))
七、监控与管理 #
7.1 集群监控 #
python
from pymemcache.client.base import Client
import time
class ClusterMonitor:
def __init__(self, servers):
self.servers = servers
def get_stats(self):
stats = {}
for server in self.servers:
try:
client = Client(server, timeout=1)
server_stats = client.stats()
stats[server] = server_stats
client.close()
except Exception as e:
stats[server] = {'error': str(e)}
return stats
def print_report(self):
stats = self.get_stats()
print("=" * 60)
print(f"Memcached集群监控报告 - {time.strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)
for server, server_stats in stats.items():
print(f"\n服务器: {server}")
print("-" * 60)
if 'error' in server_stats:
print(f" 状态: 错误 - {server_stats['error']}")
continue
get_hits = int(server_stats.get(b'get_hits', 0))
get_misses = int(server_stats.get(b'get_misses', 0))
bytes_stored = int(server_stats.get(b'bytes', 0))
limit_maxbytes = int(server_stats.get(b'limit_maxbytes', 1))
curr_connections = int(server_stats.get(b'curr_connections', 0))
if get_hits + get_misses > 0:
hit_rate = get_hits / (get_hits + get_misses) * 100
else:
hit_rate = 0
mem_usage = bytes_stored / limit_maxbytes * 100
print(f" 命中率: {hit_rate:.2f}%")
print(f" 内存使用率: {mem_usage:.2f}%")
print(f" 当前连接数: {curr_connections}")
print(f" 状态: 正常")
print("=" * 60)
# 使用
servers = [
('192.168.1.101', 11211),
('192.168.1.102', 11211),
('192.168.1.103', 11211)
]
monitor = ClusterMonitor(servers)
monitor.print_report()
八、总结 #
分布式缓存要点:
| 方面 | 说明 |
|---|---|
| 分片策略 | 一致性哈希、虚拟节点 |
| 故障转移 | 健康检查、自动切换 |
| 数据复制 | 客户端双写、代理 |
| 监控管理 | 集群监控、告警 |
最佳实践:
- 使用一致性哈希分片
- 配置虚拟节点均衡负载
- 实现故障检测和转移
- 监控集群状态
- 合理规划容量
下一步,让我们学习Memcached的实战案例!
最后更新:2026-03-27