Memcached实战案例 #

一、Session存储 #

1.1 场景说明 #

text
Session存储需求:

1. 分布式环境
   - 多台应用服务器
   - Session需要共享

2. 高可用
   - 服务器宕机
   - Session不丢失

3. 高性能
   - 快速读写
   - 低延迟

Memcached优势:
- 高性能读写
- 分布式存储
- 自动过期

1.2 Python实现 #

python
import json
import time
from pymemcache.client.base import Client

class SessionManager:
    def __init__(self, memcached_servers):
        self.client = Client(memcached_servers)
        self.session_prefix = "session:"
        self.default_expire = 1800  # 30分钟
    
    def create_session(self, session_id, user_data):
        key = f"{self.session_prefix}{session_id}"
        session_data = {
            "user_id": user_data.get("user_id"),
            "username": user_data.get("username"),
            "login_time": int(time.time()),
            "last_activity": int(time.time()),
            "ip": user_data.get("ip")
        }
        
        self.client.set(
            key,
            json.dumps(session_data),
            expire=self.default_expire
        )
        
        return session_id
    
    def get_session(self, session_id):
        key = f"{self.session_prefix}{session_id}"
        data = self.client.get(key)
        
        if data is None:
            return None
        
        session = json.loads(data)
        
        # 更新最后活动时间
        session["last_activity"] = int(time.time())
        self.client.set(
            key,
            json.dumps(session),
            expire=self.default_expire
        )
        
        return session
    
    def update_session(self, session_id, updates):
        key = f"{self.session_prefix}{session_id}"
        data = self.client.get(key)
        
        if data is None:
            return False
        
        session = json.loads(data)
        session.update(updates)
        session["last_activity"] = int(time.time())
        
        self.client.set(
            key,
            json.dumps(session),
            expire=self.default_expire
        )
        
        return True
    
    def delete_session(self, session_id):
        key = f"{self.session_prefix}{session_id}"
        self.client.delete(key)
    
    def extend_session(self, session_id, expire=None):
        key = f"{self.session_prefix}{session_id}"
        
        if expire is None:
            expire = self.default_expire
        
        # 使用touch更新过期时间
        self.client.touch(key, expire)

# 使用示例
session_manager = SessionManager(('localhost', 11211))

# 创建Session
session_id = "abc123"
user_data = {
    "user_id": 1001,
    "username": "john",
    "ip": "192.168.1.100"
}
session_manager.create_session(session_id, user_data)

# 获取Session
session = session_manager.get_session(session_id)
print(session)

# 更新Session
session_manager.update_session(session_id, {"page": "dashboard"})

# 延长Session
session_manager.extend_session(session_id)

# 删除Session
session_manager.delete_session(session_id)

1.3 Java实现 #

java
import net.rubyeye.xmemcached.MemcachedClient;
import net.rubyeye.xmemcached.MemcachedClientBuilder;
import net.rubyeye.xmemcached.XMemcachedClientBuilder;
import net.rubyeye.xmemcached.utils.AddrUtil;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.HashMap;
import java.util.Map;

public class SessionManager {
    private MemcachedClient client;
    private ObjectMapper objectMapper;
    private String sessionPrefix = "session:";
    private int defaultExpire = 1800;
    
    public SessionManager(String servers) throws Exception {
        MemcachedClientBuilder builder = new XMemcachedClientBuilder(
            AddrUtil.getAddresses(servers)
        );
        this.client = builder.build();
        this.objectMapper = new ObjectMapper();
    }
    
    public String createSession(String sessionId, Map<String, Object> userData) throws Exception {
        String key = sessionPrefix + sessionId;
        
        Map<String, Object> sessionData = new HashMap<>();
        sessionData.put("user_id", userData.get("user_id"));
        sessionData.put("username", userData.get("username"));
        sessionData.put("login_time", System.currentTimeMillis() / 1000);
        sessionData.put("last_activity", System.currentTimeMillis() / 1000);
        sessionData.put("ip", userData.get("ip"));
        
        client.set(key, defaultExpire, objectMapper.writeValueAsString(sessionData));
        
        return sessionId;
    }
    
    public Map<String, Object> getSession(String sessionId) throws Exception {
        String key = sessionPrefix + sessionId;
        String data = client.get(key);
        
        if (data == null) {
            return null;
        }
        
        Map<String, Object> session = objectMapper.readValue(data, Map.class);
        session.put("last_activity", System.currentTimeMillis() / 1000);
        
        client.set(key, defaultExpire, objectMapper.writeValueAsString(session));
        
        return session;
    }
    
    public void updateSession(String sessionId, Map<String, Object> updates) throws Exception {
        String key = sessionPrefix + sessionId;
        String data = client.get(key);
        
        if (data != null) {
            Map<String, Object> session = objectMapper.readValue(data, Map.class);
            session.putAll(updates);
            session.put("last_activity", System.currentTimeMillis() / 1000);
            
            client.set(key, defaultExpire, objectMapper.writeValueAsString(session));
        }
    }
    
    public void deleteSession(String sessionId) throws Exception {
        String key = sessionPrefix + sessionId;
        client.delete(key);
    }
    
    public void shutdown() throws Exception {
        client.shutdown();
    }
    
    public static void main(String[] args) throws Exception {
        SessionManager manager = new SessionManager("localhost:11211");
        
        // 创建Session
        Map<String, Object> userData = new HashMap<>();
        userData.put("user_id", 1001);
        userData.put("username", "john");
        userData.put("ip", "192.168.1.100");
        
        String sessionId = "abc123";
        manager.createSession(sessionId, userData);
        
        // 获取Session
        Map<String, Object> session = manager.getSession(sessionId);
        System.out.println(session);
        
        // 删除Session
        manager.deleteSession(sessionId);
        
        manager.shutdown();
    }
}

二、页面缓存 #

2.1 场景说明 #

text
页面缓存需求:

1. 减少数据库查询
   - 缓存页面内容
   - 减少后端压力

2. 提高响应速度
   - 直接返回缓存
   - 降低延迟

3. 降低服务器负载
   - 减少计算
   - 节省资源

适用场景:
- 静态页面
- 不经常变化的页面
- 高访问量的页面

2.2 Python实现 #

python
import json
import hashlib
from functools import wraps
from pymemcache.client.base import Client

class PageCache:
    def __init__(self, memcached_servers):
        self.client = Client(memcached_servers)
        self.cache_prefix = "page:"
    
    def cache_key(self, path, params=None):
        key = path
        if params:
            params_str = json.dumps(params, sort_keys=True)
            key += ":" + hashlib.md5(params_str.encode()).hexdigest()
        
        return f"{self.cache_prefix}{key}"
    
    def get(self, path, params=None):
        key = self.cache_key(path, params)
        return self.client.get(key)
    
    def set(self, path, content, expire=3600, params=None):
        key = self.cache_key(path, params)
        self.client.set(key, content, expire=expire)
    
    def delete(self, path, params=None):
        key = self.cache_key(path, params)
        self.client.delete(key)
    
    def cached(self, expire=3600):
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                # 生成缓存键
                path = args[0] if args else ""
                params = kwargs if kwargs else None
                
                # 尝试获取缓存
                cached_content = self.get(path, params)
                if cached_content is not None:
                    return cached_content
                
                # 执行函数
                content = func(*args, **kwargs)
                
                # 缓存结果
                if content is not None:
                    self.set(path, content, expire=expire, params=params)
                
                return content
            
            return wrapper
        
        return decorator

# 使用示例
page_cache = PageCache(('localhost', 11211))

# 装饰器方式
@page_cache.cached(expire=3600)
def render_page(path):
    print(f"Rendering page: {path}")
    return f"<html><body>Page content for {path}</body></html>"

# 手动方式
def get_product_page(product_id):
    cache_key = f"/product/{product_id}"
    
    # 尝试获取缓存
    cached = page_cache.get(cache_key)
    if cached:
        return cached
    
    # 渲染页面
    content = f"<html><body>Product {product_id}</body></html>"
    
    # 缓存页面
    page_cache.set(cache_key, content, expire=3600)
    
    return content

# 使用
print(render_page("/home"))
print(render_page("/home"))  # 第二次从缓存获取
print(get_product_page(1001))

2.3 PHP实现 #

php
<?php
class PageCache {
    private $memcached;
    private $cachePrefix = "page:";
    
    public function __construct($servers) {
        $this->memcached = new Memcached();
        foreach ($servers as $server) {
            $this->memcached->addServer($server['host'], $server['port']);
        }
    }
    
    private function cacheKey($path, $params = []) {
        $key = $path;
        if (!empty($params)) {
            ksort($params);
            $key .= ":" . md5(json_encode($params));
        }
        return $this->cachePrefix . $key;
    }
    
    public function get($path, $params = []) {
        $key = $this->cacheKey($path, $params);
        return $this->memcached->get($key);
    }
    
    public function set($path, $content, $expire = 3600, $params = []) {
        $key = $this->cacheKey($path, $params);
        return $this->memcached->set($key, $content, $expire);
    }
    
    public function delete($path, $params = []) {
        $key = $this->cacheKey($path, $params);
        return $this->memcached->delete($key);
    }
    
    public function cached($expire = 3600) {
        return function($path, $callback) use ($expire) {
            $cached = $this->get($path);
            if ($cached !== false) {
                return $cached;
            }
            
            $content = $callback();
            $this->set($path, $content, $expire);
            
            return $content;
        };
    }
}

// 使用示例
$cache = new PageCache([
    ['host' => 'localhost', 'port' => 11211]
]);

// 手动方式
function getProductPage($productId) {
    global $cache;
    
    $cacheKey = "/product/{$productId}";
    
    $cached = $cache->get($cacheKey);
    if ($cached !== false) {
        return $cached;
    }
    
    $content = "<html><body>Product {$productId}</body></html>";
    $cache->set($cacheKey, $content, 3600);
    
    return $content;
}

// 闭包方式
$cached = $cache->cached(3600);
$content = $cached('/home', function() {
    return "<html><body>Home Page</body></html>";
});

echo $content;
?>

三、数据库缓存 #

3.1 场景说明 #

text
数据库缓存需求:

1. 减少数据库查询
   - 缓存查询结果
   - 降低数据库负载

2. 提高查询速度
   - 内存读取
   - 毫秒级响应

3. 缓解热点问题
   - 缓存热点数据
   - 分散数据库压力

适用场景:
- 用户信息查询
- 商品信息查询
- 配置信息查询
- 统计数据查询

3.2 Python实现 #

python
import json
import hashlib
from functools import wraps
from pymemcache.client.base import Client

class DatabaseCache:
    def __init__(self, memcached_servers):
        self.client = Client(memcached_servers)
        self.cache_prefix = "db:"
    
    def cache_key(self, table, query):
        query_str = json.dumps(query, sort_keys=True)
        query_hash = hashlib.md5(query_str.encode()).hexdigest()
        return f"{self.cache_prefix}{table}:{query_hash}"
    
    def get(self, table, query):
        key = self.cache_key(table, query)
        data = self.client.get(key)
        
        if data is not None:
            return json.loads(data)
        
        return None
    
    def set(self, table, query, data, expire=3600):
        key = self.cache_key(table, query)
        self.client.set(key, json.dumps(data), expire=expire)
    
    def delete(self, table, query):
        key = self.cache_key(table, query)
        self.client.delete(key)
    
    def delete_pattern(self, table):
        # Memcached不支持模式删除
        # 需要维护键列表或使用命名空间
        pass
    
    def cached_query(self, table, expire=3600):
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                query = kwargs if kwargs else {"args": args}
                
                # 尝试获取缓存
                cached = self.get(table, query)
                if cached is not None:
                    return cached
                
                # 执行查询
                result = func(*args, **kwargs)
                
                # 缓存结果
                if result is not None:
                    self.set(table, query, result, expire=expire)
                
                return result
            
            return wrapper
        
        return decorator

# 使用示例
db_cache = DatabaseCache(('localhost', 11211))

# 装饰器方式
@db_cache.cached_query(table="users", expire=3600)
def get_user_by_id(user_id):
    print(f"Querying database for user {user_id}")
    # 模拟数据库查询
    return {
        "id": user_id,
        "name": "John",
        "email": "john@example.com"
    }

# 手动方式
def get_product(product_id):
    query = {"id": product_id}
    
    # 尝试获取缓存
    cached = db_cache.get("products", query)
    if cached is not None:
        return cached
    
    # 查询数据库
    product = query_database(product_id)
    
    # 缓存结果
    if product is not None:
        db_cache.set("products", query, product, expire=3600)
    
    return product

def query_database(product_id):
    print(f"Querying database for product {product_id}")
    return {
        "id": product_id,
        "name": "iPhone",
        "price": 999
    }

# 使用
print(get_user_by_id(1001))
print(get_user_by_id(1001))  # 第二次从缓存获取
print(get_product(2001))

3.3 缓存更新策略 #

python
import json
from pymemcache.client.base import Client

class CacheAside:
    def __init__(self, memcached_servers):
        self.client = Client(memcached_servers)
    
    def get(self, key, db_query_func, expire=3600):
        # 1. 先查缓存
        data = self.client.get(key)
        
        if data is not None:
            return json.loads(data)
        
        # 2. 缓存未命中,查数据库
        data = db_query_func()
        
        # 3. 写入缓存
        if data is not None:
            self.client.set(key, json.dumps(data), expire=expire)
        
        return data
    
    def set(self, key, data, expire=3600):
        # 1. 写入缓存
        self.client.set(key, json.dumps(data), expire=expire)
        
        # 2. 写入数据库(由调用方处理)
    
    def delete(self, key):
        # 1. 删除缓存
        self.client.delete(key)
        
        # 2. 删除数据库(由调用方处理)

class WriteThrough:
    def __init__(self, memcached_servers):
        self.client = Client(memcached_servers)
    
    def get(self, key, db_query_func, expire=3600):
        # 先查缓存
        data = self.client.get(key)
        
        if data is not None:
            return json.loads(data)
        
        # 缓存未命中,查数据库
        data = db_query_func()
        
        # 写入缓存
        if data is not None:
            self.client.set(key, json.dumps(data), expire=expire)
        
        return data
    
    def set(self, key, data, db_write_func, expire=3600):
        # 1. 写入数据库
        db_write_func(data)
        
        # 2. 写入缓存
        self.client.set(key, json.dumps(data), expire=expire)

class WriteBehind:
    def __init__(self, memcached_servers):
        self.client = Client(memcached_servers)
        self.write_queue = []
    
    def get(self, key, db_query_func, expire=3600):
        data = self.client.get(key)
        
        if data is not None:
            return json.loads(data)
        
        data = db_query_func()
        
        if data is not None:
            self.client.set(key, json.dumps(data), expire=expire)
        
        return data
    
    def set(self, key, data, expire=3600):
        # 1. 写入缓存
        self.client.set(key, json.dumps(data), expire=expire)
        
        # 2. 加入写队列(异步写入数据库)
        self.write_queue.append((key, data))
    
    def flush(self, db_write_func):
        # 批量写入数据库
        for key, data in self.write_queue:
            db_write_func(key, data)
        
        self.write_queue.clear()

# 使用示例
cache_aside = CacheAside(('localhost', 11211))

# Cache-Aside模式
def get_user(user_id):
    key = f"user:{user_id}"
    
    def query_db():
        print(f"Querying database for user {user_id}")
        return {"id": user_id, "name": "John"}
    
    return cache_aside.get(key, query_db, expire=3600)

user = get_user(1001)
print(user)

四、API缓存 #

4.1 场景说明 #

text
API缓存需求:

1. 减少后端压力
   - 缓存API响应
   - 降低服务负载

2. 提高响应速度
   - 快速返回缓存
   - 改善用户体验

3. 降低成本
   - 减少API调用
   - 节省费用

适用场景:
- 天气API
- 汇率API
- 新闻API
- 第三方API

4.2 Python实现 #

python
import json
import hashlib
import requests
from pymemcache.client.base import Client

class APICache:
    def __init__(self, memcached_servers):
        self.client = Client(memcached_servers)
        self.cache_prefix = "api:"
    
    def cache_key(self, url, params=None):
        key = url
        if params:
            params_str = json.dumps(params, sort_keys=True)
            key += ":" + hashlib.md5(params_str.encode()).hexdigest()
        
        return f"{self.cache_prefix}{key}"
    
    def get(self, url, params=None, expire=3600, headers=None):
        key = self.cache_key(url, params)
        
        # 尝试获取缓存
        cached = self.client.get(key)
        if cached is not None:
            return json.loads(cached)
        
        # 发起请求
        response = requests.get(url, params=params, headers=headers)
        
        if response.status_code == 200:
            data = response.json()
            
            # 缓存结果
            self.client.set(key, json.dumps(data), expire=expire)
            
            return data
        
        return None
    
    def invalidate(self, url, params=None):
        key = self.cache_key(url, params)
        self.client.delete(key)

# 使用示例
api_cache = APICache(('localhost', 11211))

# 天气API缓存
def get_weather(city):
    url = "https://api.openweathermap.org/data/2.5/weather"
    params = {
        "q": city,
        "appid": "your_api_key"
    }
    
    return api_cache.get(url, params=params, expire=1800)

# 汇率API缓存
def get_exchange_rate(from_currency, to_currency):
    url = f"https://api.exchangerate-api.com/v4/latest/{from_currency}"
    
    data = api_cache.get(url, expire=3600)
    
    if data:
        return data["rates"].get(to_currency)
    
    return None

# 使用
weather = get_weather("Beijing")
print(weather)

rate = get_exchange_rate("USD", "CNY")
print(rate)

五、限流 #

5.1 场景说明 #

text
限流需求:

1. 防止滥用
   - 限制请求频率
   - 保护系统资源

2. 公平分配
   - 限制单用户频率
   - 保证服务质量

3. 成本控制
   - 限制API调用
   - 控制费用

实现方式:
- 计数器限流
- 滑动窗口限流
- 令牌桶限流

5.2 计数器限流 #

python
import time
from pymemcache.client.base import Client

class RateLimiter:
    def __init__(self, memcached_servers):
        self.client = Client(memcached_servers)
        self.prefix = "rate:"
    
    def is_allowed(self, key, limit, period):
        cache_key = f"{self.prefix}{key}"
        
        # 获取当前计数
        count = self.client.get(cache_key)
        
        if count is None:
            # 初始化计数器
            self.client.set(cache_key, "1", expire=period)
            return True
        
        count = int(count)
        
        if count >= limit:
            return False
        
        # 递增计数器
        self.client.incr(cache_key, 1)
        
        return True
    
    def reset(self, key):
        cache_key = f"{self.prefix}{key}"
        self.client.delete(cache_key)

# 使用示例
rate_limiter = RateLimiter(('localhost', 11211))

def check_rate_limit(user_id, limit=100, period=60):
    key = f"user:{user_id}"
    
    if not rate_limiter.is_allowed(key, limit, period):
        return False, f"Rate limit exceeded. Max {limit} requests per {period} seconds."
    
    return True, "OK"

# 检查限流
allowed, message = check_rate_limit(1001, limit=10, period=60)
print(f"Allowed: {allowed}, Message: {message}")

5.3 滑动窗口限流 #

python
import time
import json
from pymemcache.client.base import Client

class SlidingWindowRateLimiter:
    def __init__(self, memcached_servers):
        self.client = Client(memcached_servers)
        self.prefix = "sliding:"
    
    def is_allowed(self, key, limit, period):
        cache_key = f"{self.prefix}{key}"
        now = time.time()
        
        # 获取窗口数据
        data = self.client.get(cache_key)
        
        if data is None:
            # 初始化窗口
            window = [now]
            self.client.set(cache_key, json.dumps(window), expire=period)
            return True
        
        window = json.loads(data)
        
        # 移除过期的请求
        window = [t for t in window if now - t < period]
        
        # 检查是否超过限制
        if len(window) >= limit:
            # 更新窗口
            self.client.set(cache_key, json.dumps(window), expire=period)
            return False
        
        # 添加当前请求
        window.append(now)
        self.client.set(cache_key, json.dumps(window), expire=period)
        
        return True

# 使用示例
limiter = SlidingWindowRateLimiter(('localhost', 11211))

def api_request(user_id):
    key = f"api:{user_id}"
    
    if not limiter.is_allowed(key, limit=10, period=60):
        return {"error": "Rate limit exceeded"}
    
    return {"data": "success"}

# 测试
for i in range(15):
    result = api_request(1001)
    print(f"Request {i+1}: {result}")

六、总结 #

实战案例要点:

场景 说明 过期时间
Session存储 分布式会话 30分钟
页面缓存 减少渲染 1小时
数据库缓存 减少查询 1小时
API缓存 减少调用 根据API特性
限流 保护系统 根据限流周期

最佳实践:

  1. 合理设置过期时间
  2. 实现缓存更新策略
  3. 处理缓存穿透、雪崩
  4. 监控缓存命中率
  5. 优化键命名规范

下一步,让我们学习Memcached的最佳实践!

最后更新:2026-03-27