NestJS事件与Emitter #
事件系统概述 #
NestJS提供了内置的事件系统,允许应用程序的不同部分通过发布-订阅模式进行通信,实现松耦合的架构。
安装依赖 #
bash
npm install @nestjs/event-emitter
配置事件模块 #
typescript
import { Module } from '@nestjs/common';
import { EventEmitterModule } from '@nestjs/event-emitter';
@Module({
imports: [
EventEmitterModule.forRoot({
wildcard: true,
delimiter: '.',
newListener: false,
removeListener: false,
maxListeners: 10,
verboseMemoryLeak: true,
ignoreErrors: false,
}),
],
})
export class AppModule {}
定义事件 #
创建事件类 #
typescript
import { IEvent } from '@nestjs/cqrs';
export class UserCreatedEvent implements IEvent {
constructor(
public readonly userId: number,
public readonly email: string,
public readonly name: string,
) {}
}
export class OrderCreatedEvent implements IEvent {
constructor(
public readonly orderId: string,
public readonly userId: number,
public readonly amount: number,
) {}
}
发布事件 #
注入EventEmitter2 #
typescript
import { Injectable } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { UserCreatedEvent } from './events/user-created.event';
@Injectable()
export class UsersService {
constructor(private eventEmitter: EventEmitter2) {}
async create(createUserDto: CreateUserDto) {
const user = await this.usersRepository.create(createUserDto);
this.eventEmitter.emit(
'user.created',
new UserCreatedEvent(user.id, user.email, user.name),
);
return user;
}
}
异步发布事件 #
typescript
await this.eventEmitter.emitAsync(
'user.created',
new UserCreatedEvent(user.id, user.email, user.name),
);
监听事件 #
使用@OnEvent装饰器 #
typescript
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { UserCreatedEvent } from './events/user-created.event';
@Injectable()
export class EmailService {
@OnEvent('user.created')
handleUserCreatedEvent(event: UserCreatedEvent) {
console.log(`Sending welcome email to ${event.email}`);
// 发送欢迎邮件
}
}
配置监听器选项 #
typescript
@OnEvent('user.created', {
async: true, // 异步处理
nextTick: false, // 使用nextTick
promisify: true, // 返回Promise
suppressErrors: false, // 抑制错误
})
async handleUserCreatedEvent(event: UserCreatedEvent) {
await this.sendWelcomeEmail(event.email);
}
通配符监听 #
typescript
@OnEvent('user.*')
handleAllUserEvents(event: any) {
console.log('User event:', event);
}
@OnEvent('order.*.completed')
handleOrderCompleted(event: any) {
console.log('Order completed:', event);
}
事件处理示例 #
用户注册事件处理 #
typescript
// events/user-created.event.ts
export class UserCreatedEvent {
constructor(
public readonly userId: number,
public readonly email: string,
public readonly name: string,
) {}
}
// listeners/user-created.listener.ts
import { Injectable, Logger } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { UserCreatedEvent } from '../events/user-created.event';
import { EmailService } from '../email/email.service';
import { AuditService } from '../audit/audit.service';
@Injectable()
export class UserCreatedListener {
private readonly logger = new Logger(UserCreatedListener.name);
constructor(
private emailService: EmailService,
private auditService: AuditService,
) {}
@OnEvent('user.created')
async handleUserCreated(event: UserCreatedEvent) {
this.logger.log(`User created: ${event.email}`);
await Promise.all([
this.emailService.sendWelcomeEmail(event.email, event.name),
this.auditService.log('user.created', event),
]);
}
}
订单事件处理 #
typescript
// events/order-created.event.ts
export class OrderCreatedEvent {
constructor(
public readonly orderId: string,
public readonly userId: number,
public readonly amount: number,
public readonly items: OrderItem[],
) {}
}
// listeners/order.listener.ts
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { OrderCreatedEvent } from '../events/order-created.event';
import { NotificationService } from '../notification/notification.service';
import { InventoryService } from '../inventory/inventory.service';
@Injectable()
export class OrderListener {
constructor(
private notificationService: NotificationService,
private inventoryService: InventoryService,
) {}
@OnEvent('order.created')
async handleOrderCreated(event: OrderCreatedEvent) {
await this.notificationService.sendOrderConfirmation(event);
await this.inventoryService.reserveItems(event.items);
}
@OnEvent('order.cancelled')
async handleOrderCancelled(event: any) {
await this.inventoryService.releaseItems(event.items);
}
}
事件优先级 #
typescript
@OnEvent('user.created', { priority: 1 }) // 高优先级
handleUserCreatedFirst(event: UserCreatedEvent) {
// 最先执行
}
@OnEvent('user.created', { priority: 0 }) // 默认优先级
handleUserCreated(event: UserCreatedEvent) {
// 按顺序执行
}
@OnEvent('user.created', { priority: -1 }) // 低优先级
handleUserCreatedLast(event: UserCreatedEvent) {
// 最后执行
}
事件拦截器 #
typescript
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
@Injectable()
export class EventLoggerInterceptor {
@OnEvent('**', { prependListener: true })
logAllEvents(event: any) {
console.log('Event emitted:', event);
}
}
条件监听 #
typescript
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
@Injectable()
export class ConditionalListener {
private enabled = true;
setEnabled(enabled: boolean) {
this.enabled = enabled;
}
@OnEvent('user.created')
handleUserCreated(event: UserCreatedEvent) {
if (!this.enabled) {
return;
}
// 处理事件
}
}
事件总线服务 #
typescript
import { Injectable } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
@Injectable()
export class EventBus {
constructor(private eventEmitter: EventEmitter2) {}
emit(event: string, data: any) {
this.eventEmitter.emit(event, data);
}
async emitAsync(event: string, data: any) {
await this.eventEmitter.emitAsync(event, data);
}
on(event: string, listener: Function) {
this.eventEmitter.on(event, listener as any);
}
once(event: string, listener: Function) {
this.eventEmitter.once(event, listener as any);
}
off(event: string, listener: Function) {
this.eventEmitter.off(event, listener as any);
}
}
事件与队列集成 #
typescript
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { QueueService } from '../queue/queue.service';
@Injectable()
export class QueueEventListener {
constructor(private queueService: QueueService) {}
@OnEvent('email.send')
async handleEmailSend(event: any) {
await this.queueService.add('email', {
to: event.to,
subject: event.subject,
body: event.body,
});
}
}
最佳实践 #
1. 命名规范 #
typescript
// 推荐:领域.动作
'user.created'
'order.completed'
'payment.failed'
// 不推荐
'userCreated'
'onUserCreated'
2. 事件类设计 #
typescript
// 事件应该是不可变的
export class UserCreatedEvent {
constructor(
public readonly userId: number,
public readonly email: string,
) {}
}
3. 错误处理 #
typescript
@OnEvent('user.created', { suppressErrors: false })
async handleUserCreated(event: UserCreatedEvent) {
try {
await this.sendEmail(event.email);
} catch (error) {
this.logger.error('Failed to send email', error);
throw error;
}
}
4. 避免循环依赖 #
typescript
// 不要在事件监听器中发布相同的事件
@OnEvent('user.created')
handleUserCreated(event: UserCreatedEvent) {
// 避免这样做
// this.eventEmitter.emit('user.created', event);
}
总结 #
本章学习了NestJS事件系统:
- 事件模块配置
- 事件定义和发布
- 事件监听
- 事件优先级
- 最佳实践
接下来,让我们学习 定时任务。
最后更新:2026-03-28