NestJS微服务架构 #
微服务概述 #
NestJS支持构建微服务架构,提供了多种传输层实现,包括TCP、Redis、NATS、RabbitMQ、Kafka、gRPC等。
安装依赖 #
bash
npm install @nestjs/microservices
创建微服务 #
基本微服务 #
typescript
import { NestFactory } from '@nestjs/core';
import { Transport, MicroserviceOptions } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.TCP,
options: {
host: 'localhost',
port: 3001,
},
},
);
await app.listen();
}
bootstrap();
混合应用 #
typescript
import { NestFactory } from '@nestjs/core';
import { Transport, MicroserviceOptions } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.connectMicroservice<MicroserviceOptions>({
transport: Transport.TCP,
options: { port: 3001 },
});
await app.startAllMicroservices();
await app.listen(3000);
}
bootstrap();
消息模式 #
请求-响应模式 #
typescript
import { Controller } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';
@Controller()
export class MathController {
@MessagePattern({ cmd: 'sum' })
accumulate(data: number[]): number {
return (data || []).reduce((a, b) => a + b, 0);
}
@MessagePattern({ cmd: 'multiply' })
multiply(@Payload() data: number[]): number {
return (data || []).reduce((a, b) => a * b, 1);
}
}
事件模式 #
typescript
import { Controller } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';
@Controller()
export class UserController {
@EventPattern('user_created')
handleUserCreated(data: Record<string, unknown>) {
console.log('User created:', data);
}
@EventPattern('order_completed')
handleOrderCompleted(@Payload() data: any) {
console.log('Order completed:', data);
}
}
客户端 #
创建客户端 #
typescript
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
@Module({
imports: [
ClientsModule.register([
{
name: 'MATH_SERVICE',
transport: Transport.TCP,
options: { port: 3001 },
},
]),
],
})
export class AppModule {}
使用客户端 #
typescript
import { Injectable, Inject } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
@Injectable()
export class AppService {
constructor(
@Inject('MATH_SERVICE') private client: ClientProxy,
) {}
async sum(data: number[]) {
return this.client.send({ cmd: 'sum' }, data);
}
async multiply(data: number[]) {
return this.client.send({ cmd: 'multiply' }, data);
}
}
发送事件 #
typescript
@Injectable()
export class UserService {
constructor(
@Inject('NOTIFICATION_SERVICE') private client: ClientProxy,
) {}
async createUser(data: CreateUserDto) {
const user = await this.usersRepository.create(data);
this.client.emit('user_created', {
id: user.id,
email: user.email,
});
return user;
}
}
传输层 #
TCP #
typescript
// 服务端
{
transport: Transport.TCP,
options: {
host: 'localhost',
port: 3001,
},
}
// 客户端
{
name: 'SERVICE_NAME',
transport: Transport.TCP,
options: {
host: 'localhost',
port: 3001,
},
}
Redis #
typescript
{
transport: Transport.REDIS,
options: {
host: 'localhost',
port: 6379,
password: 'password',
},
}
NATS #
typescript
{
transport: Transport.NATS,
options: {
servers: ['nats://localhost:4222'],
},
}
RabbitMQ #
typescript
{
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false,
},
},
}
Kafka #
typescript
{
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'my-consumer',
},
},
}
gRPC #
定义proto文件:
protobuf
syntax = "proto3";
package hero;
service HeroService {
rpc FindOne (HeroById) returns (Hero) {}
rpc FindMany (stream HeroById) returns (stream Hero) {}
}
message HeroById {
int32 id = 1;
}
message Hero {
int32 id = 1;
string name = 2;
}
配置gRPC:
typescript
import { Join } from '@nestjs/common';
import { GrpcMethod } from '@nestjs/microservices';
@Controller()
export class HeroController {
@GrpcMethod('HeroService', 'FindOne')
findOne(data: HeroById): Hero {
const items = [
{ id: 1, name: 'John' },
{ id: 2, name: 'Doe' },
];
return items.find(item => item.id === data.id);
}
}
消息序列化 #
自定义序列化器 #
typescript
import { Serializer, OutgoingEvent } from '@nestjs/microservices';
export class CustomSerializer implements Serializer {
serialize(value: any): OutgoingEvent {
return {
...value,
timestamp: Date.now(),
};
}
}
自定义反序列化器 #
typescript
import { Deserializer, IncomingResponse } from '@nestjs/microservices';
export class CustomDeserializer implements Deserializer {
deserialize(value: any): IncomingResponse {
return {
...value,
timestamp: new Date(value.timestamp),
};
}
}
异常处理 #
异常过滤器 #
typescript
import { Catch, RpcExceptionFilter, ArgumentsHost } from '@nestjs/common';
import { RpcException } from '@nestjs/microservices';
import { Observable, throwError } from 'rxjs';
@Catch(RpcException)
export class RpcExceptionFilter implements RpcExceptionFilter<RpcException> {
catch(exception: RpcException, host: ArgumentsHost): Observable<any> {
return throwError(() => ({
statusCode: exception.getError()['statusCode'],
message: exception.message,
}));
}
}
抛出RPC异常 #
typescript
import { Controller } from '@nestjs/common';
import { MessagePattern, RpcException } from '@nestjs/microservices';
@Controller()
export class UserController {
@MessagePattern({ cmd: 'get_user' })
getUser(id: number) {
const user = this.usersService.findOne(id);
if (!user) {
throw new RpcException('User not found');
}
return user;
}
}
健康检查 #
typescript
import { Controller } from '@nestjs/common';
import { GrpcMethod } from '@nestjs/microservices';
import { HealthCheckService, HealthCheck } from '@nestjs/terminus';
@Controller('health')
export class HealthController {
constructor(private health: HealthCheckService) {}
@GrpcMethod('Health', 'Check')
@HealthCheck()
check() {
return this.health.check([]);
}
}
微服务架构示例 #
用户服务 #
typescript
// user.service.ts
import { Controller } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { UsersService } from './users.service';
@Controller()
export class UserController {
constructor(private usersService: UsersService) {}
@MessagePattern({ cmd: 'get_user' })
async getUser(@Payload() id: number) {
return this.usersService.findOne(id);
}
@MessagePattern({ cmd: 'create_user' })
async createUser(@Payload() data: CreateUserDto) {
return this.usersService.create(data);
}
@MessagePattern({ cmd: 'validate_user' })
async validateUser(@Payload() data: { email: string; password: string }) {
return this.usersService.validate(data.email, data.password);
}
}
订单服务 #
typescript
// order.service.ts
import { Controller, Inject } from '@nestjs/common';
import { MessagePattern, Payload, ClientProxy } from '@nestjs/microservices';
import { OrdersService } from './orders.service';
@Controller()
export class OrderController {
constructor(
private ordersService: OrdersService,
@Inject('USER_SERVICE') private userClient: ClientProxy,
) {}
@MessagePattern({ cmd: 'create_order' })
async createOrder(@Payload() data: CreateOrderDto) {
const user = await this.userClient
.send({ cmd: 'get_user' }, data.userId)
.toPromise();
if (!user) {
throw new Error('User not found');
}
return this.ordersService.create(data);
}
}
最佳实践 #
1. 服务发现 #
typescript
import { Injectable, OnModuleInit } from '@nestjs/common';
import { ClientProxyFactory, Transport } from '@nestjs/microservices';
@Injectable()
export class ServiceDiscovery implements OnModuleInit {
private services = new Map<string, any>();
async onModuleInit() {
await this.registerServices();
}
private async registerServices() {
const services = await this.fetchServicesFromRegistry();
services.forEach(service => {
const client = ClientProxyFactory.create({
transport: Transport.TCP,
options: {
host: service.host,
port: service.port,
},
});
this.services.set(service.name, client);
});
}
getService(name: string) {
return this.services.get(name);
}
}
2. 熔断器 #
typescript
import { Injectable } from '@nestjs/common';
import { CircuitBreaker } from 'opossum';
@Injectable()
export class ResilientService {
private breaker: CircuitBreaker;
constructor() {
this.breaker = new CircuitBreaker(this.callRemoteService.bind(this), {
timeout: 3000,
errorThresholdPercentage: 50,
resetTimeout: 30000,
});
}
async callRemoteService(data: any) {
// 调用远程服务
}
async execute(data: any) {
return this.breaker.fire(data);
}
}
3. 重试机制 #
typescript
import { Injectable } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { retry, catchError } from 'rxjs/operators';
@Injectable()
export class RetryService {
constructor(private client: ClientProxy) {}
async callWithRetry(pattern: any, data: any) {
return this.client
.send(pattern, data)
.pipe(
retry(3),
catchError(error => {
console.error('All retries failed', error);
throw error;
}),
)
.toPromise();
}
}
总结 #
本章学习了NestJS微服务架构:
- 微服务创建和配置
- 消息模式(请求-响应、事件)
- 多种传输层
- 客户端使用
- 异常处理
- 最佳实践
接下来,让我们学习 单元测试。
最后更新:2026-03-28