Memcached实战案例 #
一、Session存储 #
1.1 场景说明 #
text
Session存储需求:
1. 分布式环境
- 多台应用服务器
- Session需要共享
2. 高可用
- 服务器宕机
- Session不丢失
3. 高性能
- 快速读写
- 低延迟
Memcached优势:
- 高性能读写
- 分布式存储
- 自动过期
1.2 Python实现 #
python
import json
import time
from pymemcache.client.base import Client
class SessionManager:
def __init__(self, memcached_servers):
self.client = Client(memcached_servers)
self.session_prefix = "session:"
self.default_expire = 1800 # 30分钟
def create_session(self, session_id, user_data):
key = f"{self.session_prefix}{session_id}"
session_data = {
"user_id": user_data.get("user_id"),
"username": user_data.get("username"),
"login_time": int(time.time()),
"last_activity": int(time.time()),
"ip": user_data.get("ip")
}
self.client.set(
key,
json.dumps(session_data),
expire=self.default_expire
)
return session_id
def get_session(self, session_id):
key = f"{self.session_prefix}{session_id}"
data = self.client.get(key)
if data is None:
return None
session = json.loads(data)
# 更新最后活动时间
session["last_activity"] = int(time.time())
self.client.set(
key,
json.dumps(session),
expire=self.default_expire
)
return session
def update_session(self, session_id, updates):
key = f"{self.session_prefix}{session_id}"
data = self.client.get(key)
if data is None:
return False
session = json.loads(data)
session.update(updates)
session["last_activity"] = int(time.time())
self.client.set(
key,
json.dumps(session),
expire=self.default_expire
)
return True
def delete_session(self, session_id):
key = f"{self.session_prefix}{session_id}"
self.client.delete(key)
def extend_session(self, session_id, expire=None):
key = f"{self.session_prefix}{session_id}"
if expire is None:
expire = self.default_expire
# 使用touch更新过期时间
self.client.touch(key, expire)
# 使用示例
session_manager = SessionManager(('localhost', 11211))
# 创建Session
session_id = "abc123"
user_data = {
"user_id": 1001,
"username": "john",
"ip": "192.168.1.100"
}
session_manager.create_session(session_id, user_data)
# 获取Session
session = session_manager.get_session(session_id)
print(session)
# 更新Session
session_manager.update_session(session_id, {"page": "dashboard"})
# 延长Session
session_manager.extend_session(session_id)
# 删除Session
session_manager.delete_session(session_id)
1.3 Java实现 #
java
import net.rubyeye.xmemcached.MemcachedClient;
import net.rubyeye.xmemcached.MemcachedClientBuilder;
import net.rubyeye.xmemcached.XMemcachedClientBuilder;
import net.rubyeye.xmemcached.utils.AddrUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.Map;
public class SessionManager {
private MemcachedClient client;
private ObjectMapper objectMapper;
private String sessionPrefix = "session:";
private int defaultExpire = 1800;
public SessionManager(String servers) throws Exception {
MemcachedClientBuilder builder = new XMemcachedClientBuilder(
AddrUtil.getAddresses(servers)
);
this.client = builder.build();
this.objectMapper = new ObjectMapper();
}
public String createSession(String sessionId, Map<String, Object> userData) throws Exception {
String key = sessionPrefix + sessionId;
Map<String, Object> sessionData = new HashMap<>();
sessionData.put("user_id", userData.get("user_id"));
sessionData.put("username", userData.get("username"));
sessionData.put("login_time", System.currentTimeMillis() / 1000);
sessionData.put("last_activity", System.currentTimeMillis() / 1000);
sessionData.put("ip", userData.get("ip"));
client.set(key, defaultExpire, objectMapper.writeValueAsString(sessionData));
return sessionId;
}
public Map<String, Object> getSession(String sessionId) throws Exception {
String key = sessionPrefix + sessionId;
String data = client.get(key);
if (data == null) {
return null;
}
Map<String, Object> session = objectMapper.readValue(data, Map.class);
session.put("last_activity", System.currentTimeMillis() / 1000);
client.set(key, defaultExpire, objectMapper.writeValueAsString(session));
return session;
}
public void updateSession(String sessionId, Map<String, Object> updates) throws Exception {
String key = sessionPrefix + sessionId;
String data = client.get(key);
if (data != null) {
Map<String, Object> session = objectMapper.readValue(data, Map.class);
session.putAll(updates);
session.put("last_activity", System.currentTimeMillis() / 1000);
client.set(key, defaultExpire, objectMapper.writeValueAsString(session));
}
}
public void deleteSession(String sessionId) throws Exception {
String key = sessionPrefix + sessionId;
client.delete(key);
}
public void shutdown() throws Exception {
client.shutdown();
}
public static void main(String[] args) throws Exception {
SessionManager manager = new SessionManager("localhost:11211");
// 创建Session
Map<String, Object> userData = new HashMap<>();
userData.put("user_id", 1001);
userData.put("username", "john");
userData.put("ip", "192.168.1.100");
String sessionId = "abc123";
manager.createSession(sessionId, userData);
// 获取Session
Map<String, Object> session = manager.getSession(sessionId);
System.out.println(session);
// 删除Session
manager.deleteSession(sessionId);
manager.shutdown();
}
}
二、页面缓存 #
2.1 场景说明 #
text
页面缓存需求:
1. 减少数据库查询
- 缓存页面内容
- 减少后端压力
2. 提高响应速度
- 直接返回缓存
- 降低延迟
3. 降低服务器负载
- 减少计算
- 节省资源
适用场景:
- 静态页面
- 不经常变化的页面
- 高访问量的页面
2.2 Python实现 #
python
import json
import hashlib
from functools import wraps
from pymemcache.client.base import Client
class PageCache:
def __init__(self, memcached_servers):
self.client = Client(memcached_servers)
self.cache_prefix = "page:"
def cache_key(self, path, params=None):
key = path
if params:
params_str = json.dumps(params, sort_keys=True)
key += ":" + hashlib.md5(params_str.encode()).hexdigest()
return f"{self.cache_prefix}{key}"
def get(self, path, params=None):
key = self.cache_key(path, params)
return self.client.get(key)
def set(self, path, content, expire=3600, params=None):
key = self.cache_key(path, params)
self.client.set(key, content, expire=expire)
def delete(self, path, params=None):
key = self.cache_key(path, params)
self.client.delete(key)
def cached(self, expire=3600):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存键
path = args[0] if args else ""
params = kwargs if kwargs else None
# 尝试获取缓存
cached_content = self.get(path, params)
if cached_content is not None:
return cached_content
# 执行函数
content = func(*args, **kwargs)
# 缓存结果
if content is not None:
self.set(path, content, expire=expire, params=params)
return content
return wrapper
return decorator
# 使用示例
page_cache = PageCache(('localhost', 11211))
# 装饰器方式
@page_cache.cached(expire=3600)
def render_page(path):
print(f"Rendering page: {path}")
return f"<html><body>Page content for {path}</body></html>"
# 手动方式
def get_product_page(product_id):
cache_key = f"/product/{product_id}"
# 尝试获取缓存
cached = page_cache.get(cache_key)
if cached:
return cached
# 渲染页面
content = f"<html><body>Product {product_id}</body></html>"
# 缓存页面
page_cache.set(cache_key, content, expire=3600)
return content
# 使用
print(render_page("/home"))
print(render_page("/home")) # 第二次从缓存获取
print(get_product_page(1001))
2.3 PHP实现 #
php
<?php
class PageCache {
private $memcached;
private $cachePrefix = "page:";
public function __construct($servers) {
$this->memcached = new Memcached();
foreach ($servers as $server) {
$this->memcached->addServer($server['host'], $server['port']);
}
}
private function cacheKey($path, $params = []) {
$key = $path;
if (!empty($params)) {
ksort($params);
$key .= ":" . md5(json_encode($params));
}
return $this->cachePrefix . $key;
}
public function get($path, $params = []) {
$key = $this->cacheKey($path, $params);
return $this->memcached->get($key);
}
public function set($path, $content, $expire = 3600, $params = []) {
$key = $this->cacheKey($path, $params);
return $this->memcached->set($key, $content, $expire);
}
public function delete($path, $params = []) {
$key = $this->cacheKey($path, $params);
return $this->memcached->delete($key);
}
public function cached($expire = 3600) {
return function($path, $callback) use ($expire) {
$cached = $this->get($path);
if ($cached !== false) {
return $cached;
}
$content = $callback();
$this->set($path, $content, $expire);
return $content;
};
}
}
// 使用示例
$cache = new PageCache([
['host' => 'localhost', 'port' => 11211]
]);
// 手动方式
function getProductPage($productId) {
global $cache;
$cacheKey = "/product/{$productId}";
$cached = $cache->get($cacheKey);
if ($cached !== false) {
return $cached;
}
$content = "<html><body>Product {$productId}</body></html>";
$cache->set($cacheKey, $content, 3600);
return $content;
}
// 闭包方式
$cached = $cache->cached(3600);
$content = $cached('/home', function() {
return "<html><body>Home Page</body></html>";
});
echo $content;
?>
三、数据库缓存 #
3.1 场景说明 #
text
数据库缓存需求:
1. 减少数据库查询
- 缓存查询结果
- 降低数据库负载
2. 提高查询速度
- 内存读取
- 毫秒级响应
3. 缓解热点问题
- 缓存热点数据
- 分散数据库压力
适用场景:
- 用户信息查询
- 商品信息查询
- 配置信息查询
- 统计数据查询
3.2 Python实现 #
python
import json
import hashlib
from functools import wraps
from pymemcache.client.base import Client
class DatabaseCache:
def __init__(self, memcached_servers):
self.client = Client(memcached_servers)
self.cache_prefix = "db:"
def cache_key(self, table, query):
query_str = json.dumps(query, sort_keys=True)
query_hash = hashlib.md5(query_str.encode()).hexdigest()
return f"{self.cache_prefix}{table}:{query_hash}"
def get(self, table, query):
key = self.cache_key(table, query)
data = self.client.get(key)
if data is not None:
return json.loads(data)
return None
def set(self, table, query, data, expire=3600):
key = self.cache_key(table, query)
self.client.set(key, json.dumps(data), expire=expire)
def delete(self, table, query):
key = self.cache_key(table, query)
self.client.delete(key)
def delete_pattern(self, table):
# Memcached不支持模式删除
# 需要维护键列表或使用命名空间
pass
def cached_query(self, table, expire=3600):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
query = kwargs if kwargs else {"args": args}
# 尝试获取缓存
cached = self.get(table, query)
if cached is not None:
return cached
# 执行查询
result = func(*args, **kwargs)
# 缓存结果
if result is not None:
self.set(table, query, result, expire=expire)
return result
return wrapper
return decorator
# 使用示例
db_cache = DatabaseCache(('localhost', 11211))
# 装饰器方式
@db_cache.cached_query(table="users", expire=3600)
def get_user_by_id(user_id):
print(f"Querying database for user {user_id}")
# 模拟数据库查询
return {
"id": user_id,
"name": "John",
"email": "john@example.com"
}
# 手动方式
def get_product(product_id):
query = {"id": product_id}
# 尝试获取缓存
cached = db_cache.get("products", query)
if cached is not None:
return cached
# 查询数据库
product = query_database(product_id)
# 缓存结果
if product is not None:
db_cache.set("products", query, product, expire=3600)
return product
def query_database(product_id):
print(f"Querying database for product {product_id}")
return {
"id": product_id,
"name": "iPhone",
"price": 999
}
# 使用
print(get_user_by_id(1001))
print(get_user_by_id(1001)) # 第二次从缓存获取
print(get_product(2001))
3.3 缓存更新策略 #
python
import json
from pymemcache.client.base import Client
class CacheAside:
def __init__(self, memcached_servers):
self.client = Client(memcached_servers)
def get(self, key, db_query_func, expire=3600):
# 1. 先查缓存
data = self.client.get(key)
if data is not None:
return json.loads(data)
# 2. 缓存未命中,查数据库
data = db_query_func()
# 3. 写入缓存
if data is not None:
self.client.set(key, json.dumps(data), expire=expire)
return data
def set(self, key, data, expire=3600):
# 1. 写入缓存
self.client.set(key, json.dumps(data), expire=expire)
# 2. 写入数据库(由调用方处理)
def delete(self, key):
# 1. 删除缓存
self.client.delete(key)
# 2. 删除数据库(由调用方处理)
class WriteThrough:
def __init__(self, memcached_servers):
self.client = Client(memcached_servers)
def get(self, key, db_query_func, expire=3600):
# 先查缓存
data = self.client.get(key)
if data is not None:
return json.loads(data)
# 缓存未命中,查数据库
data = db_query_func()
# 写入缓存
if data is not None:
self.client.set(key, json.dumps(data), expire=expire)
return data
def set(self, key, data, db_write_func, expire=3600):
# 1. 写入数据库
db_write_func(data)
# 2. 写入缓存
self.client.set(key, json.dumps(data), expire=expire)
class WriteBehind:
def __init__(self, memcached_servers):
self.client = Client(memcached_servers)
self.write_queue = []
def get(self, key, db_query_func, expire=3600):
data = self.client.get(key)
if data is not None:
return json.loads(data)
data = db_query_func()
if data is not None:
self.client.set(key, json.dumps(data), expire=expire)
return data
def set(self, key, data, expire=3600):
# 1. 写入缓存
self.client.set(key, json.dumps(data), expire=expire)
# 2. 加入写队列(异步写入数据库)
self.write_queue.append((key, data))
def flush(self, db_write_func):
# 批量写入数据库
for key, data in self.write_queue:
db_write_func(key, data)
self.write_queue.clear()
# 使用示例
cache_aside = CacheAside(('localhost', 11211))
# Cache-Aside模式
def get_user(user_id):
key = f"user:{user_id}"
def query_db():
print(f"Querying database for user {user_id}")
return {"id": user_id, "name": "John"}
return cache_aside.get(key, query_db, expire=3600)
user = get_user(1001)
print(user)
四、API缓存 #
4.1 场景说明 #
text
API缓存需求:
1. 减少后端压力
- 缓存API响应
- 降低服务负载
2. 提高响应速度
- 快速返回缓存
- 改善用户体验
3. 降低成本
- 减少API调用
- 节省费用
适用场景:
- 天气API
- 汇率API
- 新闻API
- 第三方API
4.2 Python实现 #
python
import json
import hashlib
import requests
from pymemcache.client.base import Client
class APICache:
def __init__(self, memcached_servers):
self.client = Client(memcached_servers)
self.cache_prefix = "api:"
def cache_key(self, url, params=None):
key = url
if params:
params_str = json.dumps(params, sort_keys=True)
key += ":" + hashlib.md5(params_str.encode()).hexdigest()
return f"{self.cache_prefix}{key}"
def get(self, url, params=None, expire=3600, headers=None):
key = self.cache_key(url, params)
# 尝试获取缓存
cached = self.client.get(key)
if cached is not None:
return json.loads(cached)
# 发起请求
response = requests.get(url, params=params, headers=headers)
if response.status_code == 200:
data = response.json()
# 缓存结果
self.client.set(key, json.dumps(data), expire=expire)
return data
return None
def invalidate(self, url, params=None):
key = self.cache_key(url, params)
self.client.delete(key)
# 使用示例
api_cache = APICache(('localhost', 11211))
# 天气API缓存
def get_weather(city):
url = "https://api.openweathermap.org/data/2.5/weather"
params = {
"q": city,
"appid": "your_api_key"
}
return api_cache.get(url, params=params, expire=1800)
# 汇率API缓存
def get_exchange_rate(from_currency, to_currency):
url = f"https://api.exchangerate-api.com/v4/latest/{from_currency}"
data = api_cache.get(url, expire=3600)
if data:
return data["rates"].get(to_currency)
return None
# 使用
weather = get_weather("Beijing")
print(weather)
rate = get_exchange_rate("USD", "CNY")
print(rate)
五、限流 #
5.1 场景说明 #
text
限流需求:
1. 防止滥用
- 限制请求频率
- 保护系统资源
2. 公平分配
- 限制单用户频率
- 保证服务质量
3. 成本控制
- 限制API调用
- 控制费用
实现方式:
- 计数器限流
- 滑动窗口限流
- 令牌桶限流
5.2 计数器限流 #
python
import time
from pymemcache.client.base import Client
class RateLimiter:
def __init__(self, memcached_servers):
self.client = Client(memcached_servers)
self.prefix = "rate:"
def is_allowed(self, key, limit, period):
cache_key = f"{self.prefix}{key}"
# 获取当前计数
count = self.client.get(cache_key)
if count is None:
# 初始化计数器
self.client.set(cache_key, "1", expire=period)
return True
count = int(count)
if count >= limit:
return False
# 递增计数器
self.client.incr(cache_key, 1)
return True
def reset(self, key):
cache_key = f"{self.prefix}{key}"
self.client.delete(cache_key)
# 使用示例
rate_limiter = RateLimiter(('localhost', 11211))
def check_rate_limit(user_id, limit=100, period=60):
key = f"user:{user_id}"
if not rate_limiter.is_allowed(key, limit, period):
return False, f"Rate limit exceeded. Max {limit} requests per {period} seconds."
return True, "OK"
# 检查限流
allowed, message = check_rate_limit(1001, limit=10, period=60)
print(f"Allowed: {allowed}, Message: {message}")
5.3 滑动窗口限流 #
python
import time
import json
from pymemcache.client.base import Client
class SlidingWindowRateLimiter:
def __init__(self, memcached_servers):
self.client = Client(memcached_servers)
self.prefix = "sliding:"
def is_allowed(self, key, limit, period):
cache_key = f"{self.prefix}{key}"
now = time.time()
# 获取窗口数据
data = self.client.get(cache_key)
if data is None:
# 初始化窗口
window = [now]
self.client.set(cache_key, json.dumps(window), expire=period)
return True
window = json.loads(data)
# 移除过期的请求
window = [t for t in window if now - t < period]
# 检查是否超过限制
if len(window) >= limit:
# 更新窗口
self.client.set(cache_key, json.dumps(window), expire=period)
return False
# 添加当前请求
window.append(now)
self.client.set(cache_key, json.dumps(window), expire=period)
return True
# 使用示例
limiter = SlidingWindowRateLimiter(('localhost', 11211))
def api_request(user_id):
key = f"api:{user_id}"
if not limiter.is_allowed(key, limit=10, period=60):
return {"error": "Rate limit exceeded"}
return {"data": "success"}
# 测试
for i in range(15):
result = api_request(1001)
print(f"Request {i+1}: {result}")
六、总结 #
实战案例要点:
| 场景 | 说明 | 过期时间 |
|---|---|---|
| Session存储 | 分布式会话 | 30分钟 |
| 页面缓存 | 减少渲染 | 1小时 |
| 数据库缓存 | 减少查询 | 1小时 |
| API缓存 | 减少调用 | 根据API特性 |
| 限流 | 保护系统 | 根据限流周期 |
最佳实践:
- 合理设置过期时间
- 实现缓存更新策略
- 处理缓存穿透、雪崩
- 监控缓存命中率
- 优化键命名规范
下一步,让我们学习Memcached的最佳实践!
最后更新:2026-03-27