任务队列 #

一、队列概述 #

1.1 什么是队列 #

队列允许你将耗时任务推迟处理,提高应用响应速度。

text
队列处理流程
┌─────────────┐
│   用户请求   │
└──────┬──────┘
       │
       ▼
┌─────────────┐
│  分发任务    │
│  到队列      │
└──────┬──────┘
       │
       ▼
┌─────────────┐
│  立即返回    │
│  响应        │
└─────────────┘

       ↓ 异步

┌─────────────┐
│  队列Worker  │
│  处理任务    │
└─────────────┘

1.2 使用场景 #

场景 说明
发送邮件 批量邮件发送
图片处理 图片压缩、裁剪
视频处理 视频转码
数据导入 大数据导入
报告生成 生成复杂报表

二、配置队列 #

2.1 队列驱动 #

php
// config/queue.php
'default' => env('QUEUE_CONNECTION', 'database'),

'connections' => [
    'sync' => [
        'driver' => 'sync',
    ],
    
    'database' => [
        'driver' => 'database',
        'table' => 'jobs',
        'queue' => 'default',
        'retry_after' => 90,
        'after_commit' => true,
    ],
    
    'redis' => [
        'driver' => 'redis',
        'connection' => 'default',
        'queue' => env('REDIS_QUEUE', 'default'),
        'retry_after' => 90,
        'block_for' => null,
        'after_commit' => true,
    ],
],

2.2 创建队列表 #

bash
php artisan queue:table
php artisan migrate

三、创建任务 #

3.1 创建Job #

bash
php artisan make:job SendEmail

3.2 任务类 #

php
// app/Jobs/SendEmail.php
namespace App\Jobs;

use App\Models\User;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Mail;

class SendEmail implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $user;

    public function __construct(User $user)
    {
        $this->user = $user;
    }

    public function handle()
    {
        Mail::to($this->user->email)->send(new WelcomeEmail($this->user));
    }
}

3.3 任务属性 #

php
class SendEmail implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public $tries = 3;           // 最大重试次数
    public $maxExceptions = 3;   // 最大异常次数
    public $timeout = 120;       // 超时时间(秒)
    public $backoff = [10, 30, 60]; // 重试间隔
    
    // 或使用方法
    public function retryUntil()
    {
        return now()->addMinutes(30);
    }
    
    public function middleware()
    {
        return [
            new WithoutOverlapping,
            new ThrottleRequests(10),
        ];
    }
}

四、分发任务 #

4.1 基本分发 #

php
use App\Jobs\SendEmail;

SendEmail::dispatch($user);

4.2 延迟分发 #

php
// 延迟指定时间
SendEmail::dispatch($user)->delay(now()->addMinutes(10));

// 延迟指定秒数
SendEmail::dispatch($user)->delay(60);

4.3 指定队列 #

php
SendEmail::dispatch($user)->onQueue('emails');

// 在任务类中指定
class SendEmail implements ShouldQueue
{
    public $queue = 'emails';
}

4.4 指定连接 #

php
SendEmail::dispatch($user)->onConnection('redis');

4.5 条件分发 #

php
if ($this->user->subscribed()) {
    SendEmail::dispatch($this->user);
}

4.6 同步分发 #

php
SendEmail::dispatchSync($user);

五、运行队列Worker #

5.1 启动Worker #

bash
php artisan queue:work

5.2 Worker选项 #

bash
# 指定连接和队列
php artisan queue:work redis --queue=high,default

# 守护进程模式
php artisan queue:work --daemon

# 指定超时和内存
php artisan queue:work --timeout=60 --memory=128

# 指定尝试次数
php artisan queue:work --tries=3

# 延迟重试
php artisan queue:work --backoff=10

# 处理指定数量任务后停止
php artisan queue:work --max-jobs=1000

# 处理指定时间后停止
php artisan queue:work --max-time=3600

5.3 监听队列 #

bash
php artisan queue:listen

5.4 处理单个任务 #

bash
php artisan queue:work --once

六、失败处理 #

6.1 创建失败表 #

bash
php artisan queue:failed-table
php artisan migrate

6.2 任务失败处理 #

php
class SendEmail implements ShouldQueue
{
    public function failed(\Throwable $exception)
    {
        // 记录日志
        Log::error('邮件发送失败', [
            'user' => $this->user->id,
            'error' => $exception->getMessage(),
        ]);
        
        // 通知管理员
        Mail::to('admin@example.com')->send(new JobFailedMail($this->user));
    }
}

6.3 重试失败任务 #

bash
# 重试所有失败任务
php artisan queue:retry all

# 重试指定任务
php artisan queue:retry 5

# 重试多个任务
php artisan queue:retry 5 6 7

6.4 删除失败任务 #

bash
# 删除指定任务
php artisan queue:forget 5

# 删除所有失败任务
php artisan queue:flush

七、任务中间件 #

7.1 创建中间件 #

php
// app/Jobs/Middleware/RateLimited.php
namespace App\Jobs\Middleware;

use Illuminate\Support\Facades\Redis;

class RateLimited
{
    public function handle($job, $next)
    {
        Redis::throttle('key')
            ->allow(10)->every(60)
            ->then(function () use ($job, $next) {
                $next($job);
            }, function () use ($job) {
                $job->release(10);
            });
    }
}

7.2 使用中间件 #

php
class SendEmail implements ShouldQueue
{
    public function middleware()
    {
        return [
            new RateLimited,
        ];
    }
}

7.3 内置中间件 #

php
use Illuminate\Queue\Middleware\WithoutOverlapping;
use Illuminate\Queue\Middleware\ThrottleRequests;
use Illuminate\Queue\Middleware\SkipIfBatchCancelled;

public function middleware()
{
    return [
        (new WithoutOverlapping)->releaseAfter(60),
        new ThrottleRequests(10),
        new SkipIfBatchCancelled,
    ];
}

八、任务批处理 #

8.1 创建批处理表 #

bash
php artisan queue:batches-table
php artisan migrate

8.2 定义批处理任务 #

php
use Illuminate\Bus\Batchable;

class ImportCsv implements ShouldQueue
{
    use Batchable;

    public function handle()
    {
        if ($this->batch()->cancelled()) {
            return;
        }

        // 处理数据
    }
}

8.3 分发批处理 #

php
use Illuminate\Support\Facades\Bus;
use App\Jobs\ImportCsv;

$batch = Bus::batch([
    new ImportCsv(1, 100),
    new ImportCsv(101, 200),
    new ImportCsv(201, 300),
])->then(function (Batch $batch) {
    // 所有任务完成
})->catch(function (Batch $batch, Throwable $e) {
    // 任务失败
})->finally(function (Batch $batch) {
    // 无论成功失败都执行
})->dispatch();

九、Horizon #

9.1 安装Horizon #

bash
composer require laravel/horizon
php artisan horizon:install

9.2 配置Horizon #

php
// config/horizon.php
'environments' => [
    'production' => [
        'supervisor-1' => [
            'maxProcesses' => 10,
            'balanceMaxShift' => 1,
            'balanceCooldown' => 3,
        ],
    ],
    
    'local' => [
        'supervisor-1' => [
            'maxProcesses' => 3,
        ],
    ],
],

9.3 启动Horizon #

bash
php artisan horizon

十、总结 #

10.1 核心概念 #

概念 说明
Job 任务类
Queue 队列
Worker 队列处理器
Dispatch 分发任务
Batch 批处理

10.2 下一步 #

掌握了任务队列后,让我们继续学习 单元测试,了解Laravel测试!

最后更新:2026-03-28