Node.js后端 #
安装 #
bash
npm install algoliasearch
初始化 #
基本初始化 #
javascript
const algoliasearch = require('algoliasearch');
const client = algoliasearch('APP_ID', 'ADMIN_KEY');
const index = client.initIndex('products');
环境变量配置 #
javascript
require('dotenv').config();
const client = algoliasearch(
process.env.ALGOLIA_APP_ID,
process.env.ALGOLIA_ADMIN_KEY
);
数据索引服务 #
索引服务类 #
javascript
class IndexingService {
constructor(client, indexName) {
this.client = client;
this.index = client.initIndex(indexName);
}
async indexProducts(products) {
const objects = products.map(product => ({
objectID: product.id,
name: product.name,
description: product.description,
price: product.price,
brand: product.brand,
category: product.category,
image: product.imageUrl,
stock: product.stock,
rating: product.rating,
createdAt: product.createdAt.getTime()
}));
const { objectIDs, taskID } = await this.index.saveObjects(objects);
await this.index.waitTask(taskID);
return objectIDs;
}
async updateProduct(productId, updates) {
return await this.index.partialUpdateObject({
objectID: productId,
...updates
});
}
async deleteProduct(productId) {
return await this.index.deleteObject(productId);
}
async clearIndex() {
return await this.index.clearObjects();
}
}
批量索引 #
分批处理 #
javascript
async function batchIndex(index, objects, batchSize = 1000) {
const batches = [];
for (let i = 0; i < objects.length; i += batchSize) {
batches.push(objects.slice(i, i + batchSize));
}
for (const batch of batches) {
const { taskID } = await index.saveObjects(batch);
await index.waitTask(taskID);
console.log(`Indexed ${batch.length} objects`);
}
}
使用replaceObjects #
javascript
async function replaceAllObjects(index, objects) {
const { taskID } = await index.replaceAllObjects(objects, {
safe: true
});
await index.waitTask(taskID);
}
数据同步 #
数据库同步 #
javascript
const { Pool } = require('pg');
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
async function syncProductsToAlgolia() {
const client = await pool.connect();
try {
const result = await client.query('SELECT * FROM products WHERE active = true');
const products = result.rows.map(row => ({
objectID: row.id.toString(),
name: row.name,
description: row.description,
price: parseFloat(row.price),
brand: row.brand,
category: row.category,
stock: row.stock,
rating: parseFloat(row.rating),
createdAt: row.created_at.getTime()
}));
await batchIndex(index, products);
console.log(`Synced ${products.length} products`);
} finally {
client.release();
}
}
增量同步 #
javascript
async function syncUpdatedProducts(lastSyncTime) {
const client = await pool.connect();
try {
const result = await client.query(
'SELECT * FROM products WHERE updated_at > $1',
[lastSyncTime]
);
const products = result.rows.map(formatProduct);
if (products.length > 0) {
await index.saveObjects(products);
}
return {
count: products.length,
lastSyncTime: new Date()
};
} finally {
client.release();
}
}
API端点 #
Express集成 #
javascript
const express = require('express');
const router = express.Router();
// 搜索端点
router.get('/search', async (req, res) => {
try {
const { q, page = 0, filters } = req.query;
const results = await index.search(q, {
page: parseInt(page),
hitsPerPage: 20,
filters: filters || undefined,
attributesToHighlight: ['name', 'description']
});
res.json({
success: true,
data: results
});
} catch (error) {
res.status(500).json({
success: false,
error: error.message
});
}
});
// 索引端点
router.post('/index', async (req, res) => {
try {
const { products } = req.body;
const { objectIDs } = await index.saveObjects(products);
res.json({
success: true,
objectIDs
});
} catch (error) {
res.status(500).json({
success: false,
error: error.message
});
}
});
module.exports = router;
Webhook处理 #
数据变更Webhook #
javascript
router.post('/webhook/product-updated', async (req, res) => {
try {
const { id, action } = req.body;
if (action === 'delete') {
await index.deleteObject(id.toString());
} else {
const product = await getProductFromDB(id);
await index.saveObject(formatProduct(product));
}
res.json({ success: true });
} catch (error) {
console.error('Webhook error:', error);
res.status(500).json({ success: false });
}
});
安全API密钥 #
生成受限密钥 #
javascript
function generateUserKey(userId, filters) {
return client.generateSecuredApiKey('SEARCH_ONLY_KEY', {
filters: filters,
userToken: userId,
validUntil: Math.floor(Date.now() / 1000) + 3600, // 1小时有效
restrictIndices: ['products']
});
}
// API端点
router.get('/search-key', authenticate, async (req, res) => {
const userId = req.user.id;
const filters = `user_id:${userId}`;
const key = generateUserKey(userId, filters);
res.json({ apiKey: key });
});
定时任务 #
使用node-cron #
javascript
const cron = require('node-cron');
// 每天凌晨同步数据
cron.schedule('0 0 * * *', async () => {
console.log('Starting daily sync...');
try {
await syncProductsToAlgolia();
console.log('Daily sync completed');
} catch (error) {
console.error('Sync failed:', error);
}
});
// 每小时增量同步
cron.schedule('0 * * * *', async () => {
const lastSyncTime = getLastSyncTime();
try {
const result = await syncUpdatedProducts(lastSyncTime);
updateLastSyncTime(result.lastSyncTime);
console.log(`Incremental sync: ${result.count} products`);
} catch (error) {
console.error('Incremental sync failed:', error);
}
});
错误处理 #
重试机制 #
javascript
async function withRetry(fn, maxRetries = 3) {
let lastError;
for (let i = 0; i < maxRetries; i++) {
try {
return await fn();
} catch (error) {
lastError = error;
if (error.status >= 400 && error.status < 500) {
throw error; // 客户端错误不重试
}
await new Promise(resolve =>
setTimeout(resolve, 1000 * Math.pow(2, i))
);
}
}
throw lastError;
}
// 使用
await withRetry(() => index.saveObjects(products));
完整示例 #
javascript
const algoliasearch = require('algoliasearch');
const express = require('express');
const { Pool } = require('pg');
require('dotenv').config();
const app = express();
app.use(express.json());
const client = algoliasearch(
process.env.ALGOLIA_APP_ID,
process.env.ALGOLIA_ADMIN_KEY
);
const index = client.initIndex('products');
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
// 搜索API
app.get('/api/search', async (req, res) => {
try {
const { q, page = 0, brand, category, minPrice, maxPrice } = req.query;
const filters = [];
if (brand) filters.push(`brand:${brand}`);
if (category) filters.push(`category:${category}`);
if (minPrice) filters.push(`price >= ${minPrice}`);
if (maxPrice) filters.push(`price <= ${maxPrice}`);
const results = await index.search(q, {
page: parseInt(page),
hitsPerPage: 20,
filters: filters.join(' AND ') || undefined,
facets: ['brand', 'category']
});
res.json({ success: true, data: results });
} catch (error) {
res.status(500).json({ success: false, error: error.message });
}
});
// 同步API
app.post('/api/sync', async (req, res) => {
try {
const dbClient = await pool.connect();
const result = await dbClient.query('SELECT * FROM products WHERE active = true');
const products = result.rows.map(row => ({
objectID: row.id.toString(),
name: row.name,
price: parseFloat(row.price),
brand: row.brand,
category: row.category
}));
await index.replaceAllObjects(products);
dbClient.release();
res.json({ success: true, count: products.length });
} catch (error) {
res.status(500).json({ success: false, error: error.message });
}
});
app.listen(3000, () => {
console.log('Server running on port 3000');
});
总结 #
Node.js后端要点:
| 要点 | 说明 |
|---|---|
| 初始化 | algoliasearch(APP_ID, ADMIN_KEY) |
| 索引 | saveObjects, partialUpdateObject |
| 同步 | 全量同步、增量同步 |
| API | Express集成 |
| 安全 | generateSecuredApiKey |
| 定时 | node-cron定时同步 |
接下来,让我们学习 Python客户端。
最后更新:2026-03-28