数据库集成 #

一、数据库概述 #

Fastify支持多种数据库,本章将介绍主流数据库的集成方法。

1.1 支持的数据库 #

数据库 插件 说明
MongoDB @fastify/mongodb NoSQL文档数据库
PostgreSQL @fastify/postgres 关系型数据库
MySQL mysql2/promise 关系型数据库
Redis @fastify/redis 缓存数据库
SQLite better-sqlite3 轻量级数据库

1.2 项目结构 #

text
database-api/
├── src/
│   ├── app.js
│   ├── server.js
│   ├── plugins/
│   │   ├── mongodb.js
│   │   ├── postgres.js
│   │   ├── mysql.js
│   │   └── redis.js
│   ├── models/
│   │   ├── user.model.js
│   │   └── product.model.js
│   ├── repositories/
│   │   ├── user.repository.js
│   │   └── product.repository.js
│   └── routes/
│       └── api.js
├── package.json
└── .env

二、MongoDB集成 #

2.1 安装依赖 #

bash
npm install @fastify/mongodb

2.2 MongoDB插件 #

src/plugins/mongodb.js

javascript
import fp from 'fastify-plugin'
import mongodb from '@fastify/mongodb'

async function mongodbPlugin(fastify, opts) {
  await fastify.register(mongodb, {
    url: opts.url,
    database: opts.database
  })
  
  fastify.decorate('db', fastify.mongo.db)
  
  fastify.addHook('onClose', async (instance) => {
    await instance.mongo.client.close()
  })
}

export default fp(mongodbPlugin, {
  name: 'mongodb'
})

2.3 MongoDB Repository #

src/repositories/user.repository.js

javascript
import { ObjectId } from 'mongodb'

class UserRepository {
  constructor(db) {
    this.collection = db.collection('users')
  }
  
  async findAll(query = {}, options = {}) {
    const { page = 1, limit = 10, sort = { createdAt: -1 } } = options
    const skip = (page - 1) * limit
    
    const [items, total] = await Promise.all([
      this.collection.find(query).sort(sort).skip(skip).limit(limit).toArray(),
      this.collection.countDocuments(query)
    ])
    
    return { items, total, page, limit }
  }
  
  async findById(id) {
    return this.collection.findOne({ _id: new ObjectId(id) })
  }
  
  async findOne(query) {
    return this.collection.findOne(query)
  }
  
  async create(data) {
    const doc = {
      ...data,
      createdAt: new Date(),
      updatedAt: new Date()
    }
    
    const result = await this.collection.insertOne(doc)
    return this.findById(result.insertedId)
  }
  
  async update(id, data) {
    await this.collection.updateOne(
      { _id: new ObjectId(id) },
      {
        $set: {
          ...data,
          updatedAt: new Date()
        }
      }
    )
    
    return this.findById(id)
  }
  
  async delete(id) {
    const result = await this.collection.deleteOne({
      _id: new ObjectId(id)
    })
    return result.deletedCount > 0
  }
  
  async deleteMany(query) {
    const result = await this.collection.deleteMany(query)
    return result.deletedCount
  }
  
  async aggregate(pipeline) {
    return this.collection.aggregate(pipeline).toArray()
  }
}

export default UserRepository

2.4 使用示例 #

javascript
fastify.get('/users', async (request, reply) => {
  const { page, limit } = request.query
  const result = await fastify.repositories.user.findAll({}, { page, limit })
  return result
})

fastify.post('/users', async (request, reply) => {
  const user = await fastify.repositories.user.create(request.body)
  reply.code(201)
  return user
})

三、PostgreSQL集成 #

3.1 安装依赖 #

bash
npm install @fastify/postgres pg

3.2 PostgreSQL插件 #

src/plugins/postgres.js

javascript
import fp from 'fastify-plugin'
import postgres from '@fastify/postgres'

async function postgresPlugin(fastify, opts) {
  await fastify.register(postgres, {
    connectionString: opts.connectionString
  })
  
  fastify.addHook('onClose', async (instance) => {
    await instance.pg.end()
  })
}

export default fp(postgresPlugin, {
  name: 'postgres'
})

3.3 PostgreSQL Repository #

src/repositories/product.repository.js

javascript
class ProductRepository {
  constructor(pg) {
    this.pg = pg
  }
  
  async findAll(options = {}) {
    const { page = 1, limit = 10, sortBy = 'created_at', sortOrder = 'DESC' } = options
    const offset = (page - 1) * limit
    
    const countResult = await this.pg.query('SELECT COUNT(*) FROM products')
    const total = parseInt(countResult.rows[0].count, 10)
    
    const result = await this.pg.query(
      `SELECT * FROM products ORDER BY ${sortBy} ${sortOrder} LIMIT $1 OFFSET $2`,
      [limit, offset]
    )
    
    return {
      items: result.rows,
      total,
      page,
      limit
    }
  }
  
  async findById(id) {
    const result = await this.pg.query(
      'SELECT * FROM products WHERE id = $1',
      [id]
    )
    return result.rows[0]
  }
  
  async create(data) {
    const result = await this.pg.query(
      `INSERT INTO products (name, price, description, created_at, updated_at)
       VALUES ($1, $2, $3, NOW(), NOW())
       RETURNING *`,
      [data.name, data.price, data.description]
    )
    return result.rows[0]
  }
  
  async update(id, data) {
    const fields = []
    const values = [id]
    let paramIndex = 2
    
    for (const [key, value] of Object.entries(data)) {
      fields.push(`${key} = $${paramIndex}`)
      values.push(value)
      paramIndex++
    }
    
    fields.push('updated_at = NOW()')
    
    const result = await this.pg.query(
      `UPDATE products SET ${fields.join(', ')} WHERE id = $1 RETURNING *`,
      values
    )
    
    return result.rows[0]
  }
  
  async delete(id) {
    const result = await this.pg.query(
      'DELETE FROM products WHERE id = $1',
      [id]
    )
    return result.rowCount > 0
  }
  
  async transaction(callback) {
    const client = await this.pg.connect()
    
    try {
      await client.query('BEGIN')
      const result = await callback(client)
      await client.query('COMMIT')
      return result
    } catch (err) {
      await client.query('ROLLBACK')
      throw err
    } finally {
      client.release()
    }
  }
}

export default ProductRepository

3.4 事务示例 #

javascript
fastify.post('/orders', async (request, reply) => {
  const order = await fastify.repositories.product.transaction(async (client) => {
    const orderResult = await client.query(
      'INSERT INTO orders (user_id, total) VALUES ($1, $2) RETURNING *',
      [request.user.id, request.body.total]
    )
    
    const order = orderResult.rows[0]
    
    for (const item of request.body.items) {
      await client.query(
        'INSERT INTO order_items (order_id, product_id, quantity) VALUES ($1, $2, $3)',
        [order.id, item.productId, item.quantity]
      )
      
      await client.query(
        'UPDATE products SET stock = stock - $1 WHERE id = $2',
        [item.quantity, item.productId]
      )
    }
    
    return order
  })
  
  reply.code(201)
  return order
})

四、MySQL集成 #

4.1 安装依赖 #

bash
npm install mysql2

4.2 MySQL插件 #

src/plugins/mysql.js

javascript
import fp from 'fastify-plugin'
import mysql from 'mysql2/promise'

async function mysqlPlugin(fastify, opts) {
  const pool = mysql.createPool({
    host: opts.host,
    port: opts.port,
    user: opts.user,
    password: opts.password,
    database: opts.database,
    waitForConnections: true,
    connectionLimit: 10,
    queueLimit: 0
  })
  
  fastify.decorate('mysql', pool)
  
  fastify.addHook('onClose', async (instance) => {
    await pool.end()
  })
}

export default fp(mysqlPlugin, {
  name: 'mysql'
})

4.3 MySQL使用 #

javascript
fastify.get('/users', async (request, reply) => {
  const [rows] = await fastify.mysql.query('SELECT * FROM users')
  return rows
})

fastify.get('/users/:id', async (request, reply) => {
  const [rows] = await fastify.mysql.query(
    'SELECT * FROM users WHERE id = ?',
    [request.params.id]
  )
  
  if (rows.length === 0) {
    reply.code(404)
    return { error: 'Not Found' }
  }
  
  return rows[0]
})

五、Redis集成 #

5.1 安装依赖 #

bash
npm install @fastify/redis

5.2 Redis插件 #

src/plugins/redis.js

javascript
import fp from 'fastify-plugin'
import redis from '@fastify/redis'

async function redisPlugin(fastify, opts) {
  await fastify.register(redis, {
    host: opts.host,
    port: opts.port,
    password: opts.password
  })
  
  fastify.decorate('cache', {
    async get(key) {
      const data = await fastify.redis.get(key)
      return data ? JSON.parse(data) : null
    },
    
    async set(key, value, ttl = 3600) {
      await fastify.redis.set(key, JSON.stringify(value), 'EX', ttl)
    },
    
    async del(key) {
      await fastify.redis.del(key)
    },
    
    async getOrSet(key, fn, ttl = 3600) {
      const cached = await this.get(key)
      
      if (cached) {
        return cached
      }
      
      const data = await fn()
      await this.set(key, data, ttl)
      return data
    }
  })
}

export default fp(redisPlugin, {
  name: 'redis'
})

5.3 缓存使用 #

javascript
fastify.get('/users/:id', async (request, reply) => {
  const user = await fastify.cache.getOrSet(
    `user:${request.params.id}`,
    async () => {
      return fastify.repositories.user.findById(request.params.id)
    },
    3600
  )
  
  if (!user) {
    reply.code(404)
    return { error: 'Not Found' }
  }
  
  return user
})

fastify.put('/users/:id', async (request, reply) => {
  const user = await fastify.repositories.user.update(
    request.params.id,
    request.body
  )
  
  await fastify.cache.del(`user:${request.params.id}`)
  
  return user
})

六、ORM集成 #

6.1 Prisma集成 #

安装

bash
npm install prisma @prisma/client
npx prisma init

prisma/schema.prisma

prisma
datasource db {
  provider = "postgresql"
  url      = env("DATABASE_URL")
}

generator client {
  provider = "prisma-client-js"
}

model User {
  id        String   @id @default(uuid())
  email     String   @unique
  name      String
  createdAt DateTime @default(now())
  updatedAt DateTime @updatedAt
}

Prisma插件

javascript
import fp from 'fastify-plugin'
import { PrismaClient } from '@prisma/client'

async function prismaPlugin(fastify, opts) {
  const prisma = new PrismaClient()
  
  fastify.decorate('prisma', prisma)
  
  fastify.addHook('onClose', async (instance) => {
    await instance.prisma.$disconnect()
  })
}

export default fp(prismaPlugin, {
  name: 'prisma'
})

使用Prisma

javascript
fastify.get('/users', async (request, reply) => {
  const users = await fastify.prisma.user.findMany()
  return users
})

fastify.post('/users', async (request, reply) => {
  const user = await fastify.prisma.user.create({
    data: request.body
  })
  reply.code(201)
  return user
})

七、数据库迁移 #

7.1 PostgreSQL迁移 #

使用pg-migrate:

bash
npm install node-pg-migrate

migrations/001_create_users.js

javascript
exports.up = (pgm) => {
  pgm.createTable('users', {
    id: 'id',
    email: { type: 'varchar(255)', notNull: true, unique: true },
    name: { type: 'varchar(255)', notNull: true },
    created_at: { type: 'timestamp', notNull: true, default: pgm.func('NOW()') },
    updated_at: { type: 'timestamp', notNull: true, default: pgm.func('NOW()') }
  })
  
  pgm.createIndex('users', 'email')
}

exports.down = (pgm) => {
  pgm.dropTable('users')
}

八、最佳实践 #

8.1 连接池配置 #

javascript
const poolConfig = {
  min: 5,
  max: 20,
  acquireTimeoutMillis: 30000,
  idleTimeoutMillis: 30000
}

8.2 错误处理 #

javascript
fastify.setErrorHandler((error, request, reply) => {
  if (error.code === '23505') {
    reply.code(409)
    return { error: 'Conflict', message: 'Duplicate entry' }
  }
  
  if (error.code === '23503') {
    reply.code(400)
    return { error: 'Bad Request', message: 'Foreign key violation' }
  }
  
  reply.send(error)
})

8.3 健康检查 #

javascript
fastify.get('/health', async (request, reply) => {
  const checks = {
    mongodb: await checkMongoDB(fastify.mongo),
    postgres: await checkPostgres(fastify.pg),
    redis: await checkRedis(fastify.redis)
  }
  
  const healthy = Object.values(checks).every(v => v)
  
  reply.code(healthy ? 200 : 503)
  return { checks, healthy }
})

九、总结 #

本章我们学习了:

  1. MongoDB集成:插件、Repository、使用示例
  2. PostgreSQL集成:插件、Repository、事务
  3. MySQL集成:插件、使用方法
  4. Redis集成:缓存插件、缓存使用
  5. ORM集成:Prisma配置和使用
  6. 数据库迁移:迁移脚本编写
  7. 最佳实践:连接池、错误处理、健康检查

接下来让我们学习部署上线!

最后更新:2026-03-28