NestJS微服务架构 #

微服务概述 #

NestJS支持构建微服务架构,提供了多种传输层实现,包括TCP、Redis、NATS、RabbitMQ、Kafka、gRPC等。

安装依赖 #

bash
npm install @nestjs/microservices

创建微服务 #

基本微服务 #

typescript
import { NestFactory } from '@nestjs/core';
import { Transport, MicroserviceOptions } from '@nestjs/microservices';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.TCP,
      options: {
        host: 'localhost',
        port: 3001,
      },
    },
  );
  await app.listen();
}
bootstrap();

混合应用 #

typescript
import { NestFactory } from '@nestjs/core';
import { Transport, MicroserviceOptions } from '@nestjs/microservices';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);

  app.connectMicroservice<MicroserviceOptions>({
    transport: Transport.TCP,
    options: { port: 3001 },
  });

  await app.startAllMicroservices();
  await app.listen(3000);
}
bootstrap();

消息模式 #

请求-响应模式 #

typescript
import { Controller } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';

@Controller()
export class MathController {
  @MessagePattern({ cmd: 'sum' })
  accumulate(data: number[]): number {
    return (data || []).reduce((a, b) => a + b, 0);
  }

  @MessagePattern({ cmd: 'multiply' })
  multiply(@Payload() data: number[]): number {
    return (data || []).reduce((a, b) => a * b, 1);
  }
}

事件模式 #

typescript
import { Controller } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';

@Controller()
export class UserController {
  @EventPattern('user_created')
  handleUserCreated(data: Record<string, unknown>) {
    console.log('User created:', data);
  }

  @EventPattern('order_completed')
  handleOrderCompleted(@Payload() data: any) {
    console.log('Order completed:', data);
  }
}

客户端 #

创建客户端 #

typescript
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'MATH_SERVICE',
        transport: Transport.TCP,
        options: { port: 3001 },
      },
    ]),
  ],
})
export class AppModule {}

使用客户端 #

typescript
import { Injectable, Inject } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';

@Injectable()
export class AppService {
  constructor(
    @Inject('MATH_SERVICE') private client: ClientProxy,
  ) {}

  async sum(data: number[]) {
    return this.client.send({ cmd: 'sum' }, data);
  }

  async multiply(data: number[]) {
    return this.client.send({ cmd: 'multiply' }, data);
  }
}

发送事件 #

typescript
@Injectable()
export class UserService {
  constructor(
    @Inject('NOTIFICATION_SERVICE') private client: ClientProxy,
  ) {}

  async createUser(data: CreateUserDto) {
    const user = await this.usersRepository.create(data);
    
    this.client.emit('user_created', {
      id: user.id,
      email: user.email,
    });

    return user;
  }
}

传输层 #

TCP #

typescript
// 服务端
{
  transport: Transport.TCP,
  options: {
    host: 'localhost',
    port: 3001,
  },
}

// 客户端
{
  name: 'SERVICE_NAME',
  transport: Transport.TCP,
  options: {
    host: 'localhost',
    port: 3001,
  },
}

Redis #

typescript
{
  transport: Transport.REDIS,
  options: {
    host: 'localhost',
    port: 6379,
    password: 'password',
  },
}

NATS #

typescript
{
  transport: Transport.NATS,
  options: {
    servers: ['nats://localhost:4222'],
  },
}

RabbitMQ #

typescript
{
  transport: Transport.RMQ,
  options: {
    urls: ['amqp://localhost:5672'],
    queue: 'cats_queue',
    queueOptions: {
      durable: false,
    },
  },
}

Kafka #

typescript
{
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
    },
    consumer: {
      groupId: 'my-consumer',
    },
  },
}

gRPC #

定义proto文件:

protobuf
syntax = "proto3";

package hero;

service HeroService {
  rpc FindOne (HeroById) returns (Hero) {}
  rpc FindMany (stream HeroById) returns (stream Hero) {}
}

message HeroById {
  int32 id = 1;
}

message Hero {
  int32 id = 1;
  string name = 2;
}

配置gRPC:

typescript
import { Join } from '@nestjs/common';
import { GrpcMethod } from '@nestjs/microservices';

@Controller()
export class HeroController {
  @GrpcMethod('HeroService', 'FindOne')
  findOne(data: HeroById): Hero {
    const items = [
      { id: 1, name: 'John' },
      { id: 2, name: 'Doe' },
    ];
    return items.find(item => item.id === data.id);
  }
}

消息序列化 #

自定义序列化器 #

typescript
import { Serializer, OutgoingEvent } from '@nestjs/microservices';

export class CustomSerializer implements Serializer {
  serialize(value: any): OutgoingEvent {
    return {
      ...value,
      timestamp: Date.now(),
    };
  }
}

自定义反序列化器 #

typescript
import { Deserializer, IncomingResponse } from '@nestjs/microservices';

export class CustomDeserializer implements Deserializer {
  deserialize(value: any): IncomingResponse {
    return {
      ...value,
      timestamp: new Date(value.timestamp),
    };
  }
}

异常处理 #

异常过滤器 #

typescript
import { Catch, RpcExceptionFilter, ArgumentsHost } from '@nestjs/common';
import { RpcException } from '@nestjs/microservices';
import { Observable, throwError } from 'rxjs';

@Catch(RpcException)
export class RpcExceptionFilter implements RpcExceptionFilter<RpcException> {
  catch(exception: RpcException, host: ArgumentsHost): Observable<any> {
    return throwError(() => ({
      statusCode: exception.getError()['statusCode'],
      message: exception.message,
    }));
  }
}

抛出RPC异常 #

typescript
import { Controller } from '@nestjs/common';
import { MessagePattern, RpcException } from '@nestjs/microservices';

@Controller()
export class UserController {
  @MessagePattern({ cmd: 'get_user' })
  getUser(id: number) {
    const user = this.usersService.findOne(id);
    
    if (!user) {
      throw new RpcException('User not found');
    }
    
    return user;
  }
}

健康检查 #

typescript
import { Controller } from '@nestjs/common';
import { GrpcMethod } from '@nestjs/microservices';
import { HealthCheckService, HealthCheck } from '@nestjs/terminus';

@Controller('health')
export class HealthController {
  constructor(private health: HealthCheckService) {}

  @GrpcMethod('Health', 'Check')
  @HealthCheck()
  check() {
    return this.health.check([]);
  }
}

微服务架构示例 #

用户服务 #

typescript
// user.service.ts
import { Controller } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { UsersService } from './users.service';

@Controller()
export class UserController {
  constructor(private usersService: UsersService) {}

  @MessagePattern({ cmd: 'get_user' })
  async getUser(@Payload() id: number) {
    return this.usersService.findOne(id);
  }

  @MessagePattern({ cmd: 'create_user' })
  async createUser(@Payload() data: CreateUserDto) {
    return this.usersService.create(data);
  }

  @MessagePattern({ cmd: 'validate_user' })
  async validateUser(@Payload() data: { email: string; password: string }) {
    return this.usersService.validate(data.email, data.password);
  }
}

订单服务 #

typescript
// order.service.ts
import { Controller, Inject } from '@nestjs/common';
import { MessagePattern, Payload, ClientProxy } from '@nestjs/microservices';
import { OrdersService } from './orders.service';

@Controller()
export class OrderController {
  constructor(
    private ordersService: OrdersService,
    @Inject('USER_SERVICE') private userClient: ClientProxy,
  ) {}

  @MessagePattern({ cmd: 'create_order' })
  async createOrder(@Payload() data: CreateOrderDto) {
    const user = await this.userClient
      .send({ cmd: 'get_user' }, data.userId)
      .toPromise();

    if (!user) {
      throw new Error('User not found');
    }

    return this.ordersService.create(data);
  }
}

最佳实践 #

1. 服务发现 #

typescript
import { Injectable, OnModuleInit } from '@nestjs/common';
import { ClientProxyFactory, Transport } from '@nestjs/microservices';

@Injectable()
export class ServiceDiscovery implements OnModuleInit {
  private services = new Map<string, any>();

  async onModuleInit() {
    await this.registerServices();
  }

  private async registerServices() {
    const services = await this.fetchServicesFromRegistry();
    
    services.forEach(service => {
      const client = ClientProxyFactory.create({
        transport: Transport.TCP,
        options: {
          host: service.host,
          port: service.port,
        },
      });
      
      this.services.set(service.name, client);
    });
  }

  getService(name: string) {
    return this.services.get(name);
  }
}

2. 熔断器 #

typescript
import { Injectable } from '@nestjs/common';
import { CircuitBreaker } from 'opossum';

@Injectable()
export class ResilientService {
  private breaker: CircuitBreaker;

  constructor() {
    this.breaker = new CircuitBreaker(this.callRemoteService.bind(this), {
      timeout: 3000,
      errorThresholdPercentage: 50,
      resetTimeout: 30000,
    });
  }

  async callRemoteService(data: any) {
    // 调用远程服务
  }

  async execute(data: any) {
    return this.breaker.fire(data);
  }
}

3. 重试机制 #

typescript
import { Injectable } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { retry, catchError } from 'rxjs/operators';

@Injectable()
export class RetryService {
  constructor(private client: ClientProxy) {}

  async callWithRetry(pattern: any, data: any) {
    return this.client
      .send(pattern, data)
      .pipe(
        retry(3),
        catchError(error => {
          console.error('All retries failed', error);
          throw error;
        }),
      )
      .toPromise();
  }
}

总结 #

本章学习了NestJS微服务架构:

  • 微服务创建和配置
  • 消息模式(请求-响应、事件)
  • 多种传输层
  • 客户端使用
  • 异常处理
  • 最佳实践

接下来,让我们学习 单元测试

最后更新:2026-03-28