NestJS定时任务 #

定时任务概述 #

NestJS提供了@nestjs/schedule模块,基于node-cron实现定时任务调度功能,支持Cron表达式、间隔执行和超时执行。

安装依赖 #

bash
npm install @nestjs/schedule

配置Schedule模块 #

typescript
import { Module } from '@nestjs/common';
import { ScheduleModule } from '@nestjs/schedule';

@Module({
  imports: [ScheduleModule.forRoot()],
})
export class AppModule {}

Cron任务 #

基本Cron任务 #

typescript
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';

@Injectable()
export class TasksService {
  private readonly logger = new Logger(TasksService.name);

  @Cron('45 * * * * *')
  handleCron() {
    this.logger.debug('Called every minute at 45 seconds');
  }
}

Cron表达式 #

text
┌───────────── 秒 (0 - 59)
│ ┌───────────── 分钟 (0 - 59)
│ │ ┌───────────── 小时 (0 - 23)
│ │ │ ┌───────────── 日期 (1 - 31)
│ │ │ │ ┌───────────── 月份 (1 - 12)
│ │ │ │ │ ┌───────────── 星期 (0 - 6) (0是周日)
│ │ │ │ │ │
* * * * * *

预定义Cron表达式 #

typescript
import { Cron, CronExpression } from '@nestjs/schedule';

@Injectable()
export class TasksService {
  @Cron(CronExpression.EVERY_MINUTE)
  everyMinute() {
    // 每分钟执行
  }

  @Cron(CronExpression.EVERY_HOUR)
  everyHour() {
    // 每小时执行
  }

  @Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT)
  everyDayAtMidnight() {
    // 每天午夜执行
  }

  @Cron(CronExpression.EVERY_WEEK)
  everyWeek() {
    // 每周执行
  }

  @Cron(CronExpression.EVERY_WEEKDAY)
  everyWeekday() {
    // 每个工作日执行
  }

  @Cron(CronExpression.EVERY_WEEKEND)
  everyWeekend() {
    // 每个周末执行
  }
}

常用Cron表达式 #

表达式 说明
* * * * * * 每秒
0 * * * * * 每分钟
0 0 * * * * 每小时
0 0 0 * * * 每天午夜
0 0 0 * * 1 每周一
0 0 0 1 * * 每月1号
0 30 9 * * 1-5 工作日9:30

Cron选项 #

typescript
@Cron('0 0 9 * * 1-5', {
  name: 'weekdayTask',
  timeZone: 'Asia/Shanghai',
})
handleWeekdayTask() {
  // 工作日9点执行
}

Interval任务 #

固定间隔 #

typescript
import { Injectable } from '@nestjs/common';
import { Interval } from '@nestjs/schedule';

@Injectable()
export class TasksService {
  @Interval(10000)
  handleInterval() {
    // 每10秒执行一次
  }

  @Interval('taskName', 5000)
  handleNamedInterval() {
    // 每5秒执行一次,带名称
  }
}

Timeout任务 #

延迟执行 #

typescript
import { Injectable } from '@nestjs/common';
import { Timeout } from '@nestjs/schedule';

@Injectable()
export class TasksService {
  @Timeout(5000)
  handleTimeout() {
    // 应用启动后5秒执行一次
  }

  @Timeout('onceTask', 10000)
  handleNamedTimeout() {
    // 应用启动后10秒执行一次,带名称
  }
}

动态调度 #

注入SchedulerRegistry #

typescript
import { Injectable } from '@nestjs/common';
import { SchedulerRegistry } from '@nestjs/schedule';
import { CronJob } from 'cron';

@Injectable()
export class TasksService {
  constructor(private schedulerRegistry: SchedulerRegistry) {}

  addCronJob(name: string, cronTime: string, callback: () => void) {
    const job = new CronJob(cronTime, callback);
    this.schedulerRegistry.addCronJob(name, job);
    job.start();
  }

  stopCronJob(name: string) {
    const job = this.schedulerRegistry.getCronJob(name);
    job.stop();
  }

  startCronJob(name: string) {
    const job = this.schedulerRegistry.getCronJob(name);
    job.start();
  }

  deleteCronJob(name: string) {
    this.schedulerRegistry.deleteCronJob(name);
  }

  getCronJobs() {
    const jobs = this.schedulerRegistry.getCronJobs();
    jobs.forEach((job, name) => {
      console.log(`Job: ${name}, Next run: ${job.nextDate()}`);
    });
  }
}

动态Interval #

typescript
@Injectable()
export class TasksService {
  constructor(private schedulerRegistry: SchedulerRegistry) {}

  addInterval(name: string, milliseconds: number, callback: () => void) {
    const interval = setInterval(callback, milliseconds);
    this.schedulerRegistry.addInterval(name, interval);
  }

  clearInterval(name: string) {
    this.schedulerRegistry.deleteInterval(name);
  }

  getIntervals() {
    return this.schedulerRegistry.getIntervals();
  }
}

动态Timeout #

typescript
@Injectable()
export class TasksService {
  constructor(private schedulerRegistry: SchedulerRegistry) {}

  addTimeout(name: string, milliseconds: number, callback: () => void) {
    const timeout = setTimeout(callback, milliseconds);
    this.schedulerRegistry.addTimeout(name, timeout);
  }

  clearTimeout(name: string) {
    this.schedulerRegistry.deleteTimeout(name);
  }

  getTimeouts() {
    return this.schedulerRegistry.getTimeouts();
  }
}

实际应用示例 #

数据清理任务 #

typescript
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository, LessThan } from 'typeorm';
import { Session } from './entities/session.entity';

@Injectable()
export class CleanupService {
  private readonly logger = new Logger(CleanupService.name);

  constructor(
    @InjectRepository(Session)
    private sessionRepository: Repository<Session>,
  ) {}

  @Cron(CronExpression.EVERY_DAY_AT_3AM)
  async cleanExpiredSessions() {
    this.logger.log('Starting session cleanup...');
    
    const result = await this.sessionRepository.delete({
      expiresAt: LessThan(new Date()),
    });

    this.logger.log(`Cleaned ${result.affected} expired sessions`);
  }
}

报表生成任务 #

typescript
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { ReportsService } from '../reports/reports.service';
import { EmailService } from '../email/email.service';

@Injectable()
export class ReportScheduler {
  private readonly logger = new Logger(ReportScheduler.name);

  constructor(
    private reportsService: ReportsService,
    private emailService: EmailService,
  ) {}

  @Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT)
  async generateDailyReport() {
    this.logger.log('Generating daily report...');

    const report = await this.reportsService.generateDailyReport();
    await this.emailService.sendReport('admin@example.com', report);

    this.logger.log('Daily report sent');
  }

  @Cron('0 0 0 * * 0')  // 每周日
  async generateWeeklyReport() {
    this.logger.log('Generating weekly report...');
    const report = await this.reportsService.generateWeeklyReport();
    await this.emailService.sendReport('admin@example.com', report);
  }

  @Cron('0 0 0 1 * *')  // 每月1号
  async generateMonthlyReport() {
    this.logger.log('Generating monthly report...');
    const report = await this.reportsService.generateMonthlyReport();
    await this.emailService.sendReport('admin@example.com', report);
  }
}

健康检查任务 #

typescript
import { Injectable, Logger } from '@nestjs/common';
import { Interval } from '@nestjs/schedule';
import { HealthService } from '../health/health.service';

@Injectable()
export class HealthCheckScheduler {
  private readonly logger = new Logger(HealthCheckScheduler.name);

  constructor(private healthService: HealthService) {}

  @Interval(30000)  // 每30秒
  async checkHealth() {
    const health = await this.healthService.check();

    if (health.status !== 'ok') {
      this.logger.warn('Health check failed', health);
    }
  }
}

缓存预热任务 #

typescript
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { CacheService } from '../cache/cache.service';
import { ProductsService } from '../products/products.service';

@Injectable()
export class CacheWarmupScheduler {
  private readonly logger = new Logger(CacheWarmupScheduler.name);

  constructor(
    private cacheService: CacheService,
    private productsService: ProductsService,
  ) {}

  @Cron(CronExpression.EVERY_HOUR)
  async warmupProductCache() {
    this.logger.log('Starting cache warmup...');

    const products = await this.productsService.findAll();
    
    await this.cacheService.set('products:all', products, 3600);

    this.logger.log(`Cached ${products.length} products`);
  }
}

任务管理API #

typescript
import { Controller, Get, Post, Param } from '@nestjs/common';
import { SchedulerRegistry } from '@nestjs/schedule';

@Controller('tasks')
export class TasksController {
  constructor(private schedulerRegistry: SchedulerRegistry) {}

  @Get('cron')
  getCronJobs() {
    const jobs = this.schedulerRegistry.getCronJobs();
    const result = [];
    jobs.forEach((job, name) => {
      result.push({
        name,
        nextDate: job.nextDate(),
        running: job.running,
      });
    });
    return result;
  }

  @Post('cron/:name/stop')
  stopCronJob(@Param('name') name: string) {
    const job = this.schedulerRegistry.getCronJob(name);
    job.stop();
    return { message: `Job ${name} stopped` };
  }

  @Post('cron/:name/start')
  startCronJob(@Param('name') name: string) {
    const job = this.schedulerRegistry.getCronJob(name);
    job.start();
    return { message: `Job ${name} started` };
  }
}

最佳实践 #

1. 错误处理 #

typescript
@Cron(CronExpression.EVERY_HOUR)
async handleTask() {
  try {
    await this.doSomething();
  } catch (error) {
    this.logger.error('Task failed', error);
  }
}

2. 避免重叠执行 #

typescript
@Injectable()
export class TasksService {
  private isRunning = false;

  @Cron(CronExpression.EVERY_MINUTE)
  async handleTask() {
    if (this.isRunning) {
      this.logger.warn('Task already running, skipping');
      return;
    }

    this.isRunning = true;
    try {
      await this.doSomething();
    } finally {
      this.isRunning = false;
    }
  }
}

3. 使用分布式锁 #

typescript
import { Injectable } from '@nestjs/common';
import { Cron } from '@nestjs/schedule';
import { RedisService } from '../redis/redis.service';

@Injectable()
export class DistributedTaskService {
  constructor(private redisService: RedisService) {}

  @Cron('0 */5 * * * *')
  async handleDistributedTask() {
    const lockKey = 'task:lock';
    const acquired = await this.redisService.acquireLock(lockKey, 300);

    if (!acquired) {
      return;
    }

    try {
      await this.doSomething();
    } finally {
      await this.redisService.releaseLock(lockKey);
    }
  }
}

总结 #

本章学习了NestJS定时任务:

  • Cron任务
  • Interval任务
  • Timeout任务
  • 动态调度
  • 实际应用示例
  • 最佳实践

接下来,让我们学习 微服务架构

最后更新:2026-03-28