NestJS事件与Emitter #

事件系统概述 #

NestJS提供了内置的事件系统,允许应用程序的不同部分通过发布-订阅模式进行通信,实现松耦合的架构。

安装依赖 #

bash
npm install @nestjs/event-emitter

配置事件模块 #

typescript
import { Module } from '@nestjs/common';
import { EventEmitterModule } from '@nestjs/event-emitter';

@Module({
  imports: [
    EventEmitterModule.forRoot({
      wildcard: true,
      delimiter: '.',
      newListener: false,
      removeListener: false,
      maxListeners: 10,
      verboseMemoryLeak: true,
      ignoreErrors: false,
    }),
  ],
})
export class AppModule {}

定义事件 #

创建事件类 #

typescript
import { IEvent } from '@nestjs/cqrs';

export class UserCreatedEvent implements IEvent {
  constructor(
    public readonly userId: number,
    public readonly email: string,
    public readonly name: string,
  ) {}
}

export class OrderCreatedEvent implements IEvent {
  constructor(
    public readonly orderId: string,
    public readonly userId: number,
    public readonly amount: number,
  ) {}
}

发布事件 #

注入EventEmitter2 #

typescript
import { Injectable } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { UserCreatedEvent } from './events/user-created.event';

@Injectable()
export class UsersService {
  constructor(private eventEmitter: EventEmitter2) {}

  async create(createUserDto: CreateUserDto) {
    const user = await this.usersRepository.create(createUserDto);

    this.eventEmitter.emit(
      'user.created',
      new UserCreatedEvent(user.id, user.email, user.name),
    );

    return user;
  }
}

异步发布事件 #

typescript
await this.eventEmitter.emitAsync(
  'user.created',
  new UserCreatedEvent(user.id, user.email, user.name),
);

监听事件 #

使用@OnEvent装饰器 #

typescript
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { UserCreatedEvent } from './events/user-created.event';

@Injectable()
export class EmailService {
  @OnEvent('user.created')
  handleUserCreatedEvent(event: UserCreatedEvent) {
    console.log(`Sending welcome email to ${event.email}`);
    // 发送欢迎邮件
  }
}

配置监听器选项 #

typescript
@OnEvent('user.created', {
  async: true,              // 异步处理
  nextTick: false,          // 使用nextTick
  promisify: true,          // 返回Promise
  suppressErrors: false,    // 抑制错误
})
async handleUserCreatedEvent(event: UserCreatedEvent) {
  await this.sendWelcomeEmail(event.email);
}

通配符监听 #

typescript
@OnEvent('user.*')
handleAllUserEvents(event: any) {
  console.log('User event:', event);
}

@OnEvent('order.*.completed')
handleOrderCompleted(event: any) {
  console.log('Order completed:', event);
}

事件处理示例 #

用户注册事件处理 #

typescript
// events/user-created.event.ts
export class UserCreatedEvent {
  constructor(
    public readonly userId: number,
    public readonly email: string,
    public readonly name: string,
  ) {}
}

// listeners/user-created.listener.ts
import { Injectable, Logger } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { UserCreatedEvent } from '../events/user-created.event';
import { EmailService } from '../email/email.service';
import { AuditService } from '../audit/audit.service';

@Injectable()
export class UserCreatedListener {
  private readonly logger = new Logger(UserCreatedListener.name);

  constructor(
    private emailService: EmailService,
    private auditService: AuditService,
  ) {}

  @OnEvent('user.created')
  async handleUserCreated(event: UserCreatedEvent) {
    this.logger.log(`User created: ${event.email}`);

    await Promise.all([
      this.emailService.sendWelcomeEmail(event.email, event.name),
      this.auditService.log('user.created', event),
    ]);
  }
}

订单事件处理 #

typescript
// events/order-created.event.ts
export class OrderCreatedEvent {
  constructor(
    public readonly orderId: string,
    public readonly userId: number,
    public readonly amount: number,
    public readonly items: OrderItem[],
  ) {}
}

// listeners/order.listener.ts
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { OrderCreatedEvent } from '../events/order-created.event';
import { NotificationService } from '../notification/notification.service';
import { InventoryService } from '../inventory/inventory.service';

@Injectable()
export class OrderListener {
  constructor(
    private notificationService: NotificationService,
    private inventoryService: InventoryService,
  ) {}

  @OnEvent('order.created')
  async handleOrderCreated(event: OrderCreatedEvent) {
    await this.notificationService.sendOrderConfirmation(event);
    await this.inventoryService.reserveItems(event.items);
  }

  @OnEvent('order.cancelled')
  async handleOrderCancelled(event: any) {
    await this.inventoryService.releaseItems(event.items);
  }
}

事件优先级 #

typescript
@OnEvent('user.created', { priority: 1 })  // 高优先级
handleUserCreatedFirst(event: UserCreatedEvent) {
  // 最先执行
}

@OnEvent('user.created', { priority: 0 })  // 默认优先级
handleUserCreated(event: UserCreatedEvent) {
  // 按顺序执行
}

@OnEvent('user.created', { priority: -1 })  // 低优先级
handleUserCreatedLast(event: UserCreatedEvent) {
  // 最后执行
}

事件拦截器 #

typescript
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';

@Injectable()
export class EventLoggerInterceptor {
  @OnEvent('**', { prependListener: true })
  logAllEvents(event: any) {
    console.log('Event emitted:', event);
  }
}

条件监听 #

typescript
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';

@Injectable()
export class ConditionalListener {
  private enabled = true;

  setEnabled(enabled: boolean) {
    this.enabled = enabled;
  }

  @OnEvent('user.created')
  handleUserCreated(event: UserCreatedEvent) {
    if (!this.enabled) {
      return;
    }
    // 处理事件
  }
}

事件总线服务 #

typescript
import { Injectable } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';

@Injectable()
export class EventBus {
  constructor(private eventEmitter: EventEmitter2) {}

  emit(event: string, data: any) {
    this.eventEmitter.emit(event, data);
  }

  async emitAsync(event: string, data: any) {
    await this.eventEmitter.emitAsync(event, data);
  }

  on(event: string, listener: Function) {
    this.eventEmitter.on(event, listener as any);
  }

  once(event: string, listener: Function) {
    this.eventEmitter.once(event, listener as any);
  }

  off(event: string, listener: Function) {
    this.eventEmitter.off(event, listener as any);
  }
}

事件与队列集成 #

typescript
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { QueueService } from '../queue/queue.service';

@Injectable()
export class QueueEventListener {
  constructor(private queueService: QueueService) {}

  @OnEvent('email.send')
  async handleEmailSend(event: any) {
    await this.queueService.add('email', {
      to: event.to,
      subject: event.subject,
      body: event.body,
    });
  }
}

最佳实践 #

1. 命名规范 #

typescript
// 推荐:领域.动作
'user.created'
'order.completed'
'payment.failed'

// 不推荐
'userCreated'
'onUserCreated'

2. 事件类设计 #

typescript
// 事件应该是不可变的
export class UserCreatedEvent {
  constructor(
    public readonly userId: number,
    public readonly email: string,
  ) {}
}

3. 错误处理 #

typescript
@OnEvent('user.created', { suppressErrors: false })
async handleUserCreated(event: UserCreatedEvent) {
  try {
    await this.sendEmail(event.email);
  } catch (error) {
    this.logger.error('Failed to send email', error);
    throw error;
  }
}

4. 避免循环依赖 #

typescript
// 不要在事件监听器中发布相同的事件
@OnEvent('user.created')
handleUserCreated(event: UserCreatedEvent) {
  // 避免这样做
  // this.eventEmitter.emit('user.created', event);
}

总结 #

本章学习了NestJS事件系统:

  • 事件模块配置
  • 事件定义和发布
  • 事件监听
  • 事件优先级
  • 最佳实践

接下来,让我们学习 定时任务

最后更新:2026-03-28