任务队列 #
一、队列概述 #
1.1 什么是队列 #
队列允许你将耗时任务推迟处理,提高应用响应速度。
text
队列处理流程
┌─────────────┐
│ 用户请求 │
└──────┬──────┘
│
▼
┌─────────────┐
│ 分发任务 │
│ 到队列 │
└──────┬──────┘
│
▼
┌─────────────┐
│ 立即返回 │
│ 响应 │
└─────────────┘
↓ 异步
┌─────────────┐
│ 队列Worker │
│ 处理任务 │
└─────────────┘
1.2 使用场景 #
| 场景 | 说明 |
|---|---|
| 发送邮件 | 批量邮件发送 |
| 图片处理 | 图片压缩、裁剪 |
| 视频处理 | 视频转码 |
| 数据导入 | 大数据导入 |
| 报告生成 | 生成复杂报表 |
二、配置队列 #
2.1 队列驱动 #
php
// config/queue.php
'default' => env('QUEUE_CONNECTION', 'database'),
'connections' => [
'sync' => [
'driver' => 'sync',
],
'database' => [
'driver' => 'database',
'table' => 'jobs',
'queue' => 'default',
'retry_after' => 90,
'after_commit' => true,
],
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => 90,
'block_for' => null,
'after_commit' => true,
],
],
2.2 创建队列表 #
bash
php artisan queue:table
php artisan migrate
三、创建任务 #
3.1 创建Job #
bash
php artisan make:job SendEmail
3.2 任务类 #
php
// app/Jobs/SendEmail.php
namespace App\Jobs;
use App\Models\User;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Mail;
class SendEmail implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $user;
public function __construct(User $user)
{
$this->user = $user;
}
public function handle()
{
Mail::to($this->user->email)->send(new WelcomeEmail($this->user));
}
}
3.3 任务属性 #
php
class SendEmail implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public $tries = 3; // 最大重试次数
public $maxExceptions = 3; // 最大异常次数
public $timeout = 120; // 超时时间(秒)
public $backoff = [10, 30, 60]; // 重试间隔
// 或使用方法
public function retryUntil()
{
return now()->addMinutes(30);
}
public function middleware()
{
return [
new WithoutOverlapping,
new ThrottleRequests(10),
];
}
}
四、分发任务 #
4.1 基本分发 #
php
use App\Jobs\SendEmail;
SendEmail::dispatch($user);
4.2 延迟分发 #
php
// 延迟指定时间
SendEmail::dispatch($user)->delay(now()->addMinutes(10));
// 延迟指定秒数
SendEmail::dispatch($user)->delay(60);
4.3 指定队列 #
php
SendEmail::dispatch($user)->onQueue('emails');
// 在任务类中指定
class SendEmail implements ShouldQueue
{
public $queue = 'emails';
}
4.4 指定连接 #
php
SendEmail::dispatch($user)->onConnection('redis');
4.5 条件分发 #
php
if ($this->user->subscribed()) {
SendEmail::dispatch($this->user);
}
4.6 同步分发 #
php
SendEmail::dispatchSync($user);
五、运行队列Worker #
5.1 启动Worker #
bash
php artisan queue:work
5.2 Worker选项 #
bash
# 指定连接和队列
php artisan queue:work redis --queue=high,default
# 守护进程模式
php artisan queue:work --daemon
# 指定超时和内存
php artisan queue:work --timeout=60 --memory=128
# 指定尝试次数
php artisan queue:work --tries=3
# 延迟重试
php artisan queue:work --backoff=10
# 处理指定数量任务后停止
php artisan queue:work --max-jobs=1000
# 处理指定时间后停止
php artisan queue:work --max-time=3600
5.3 监听队列 #
bash
php artisan queue:listen
5.4 处理单个任务 #
bash
php artisan queue:work --once
六、失败处理 #
6.1 创建失败表 #
bash
php artisan queue:failed-table
php artisan migrate
6.2 任务失败处理 #
php
class SendEmail implements ShouldQueue
{
public function failed(\Throwable $exception)
{
// 记录日志
Log::error('邮件发送失败', [
'user' => $this->user->id,
'error' => $exception->getMessage(),
]);
// 通知管理员
Mail::to('admin@example.com')->send(new JobFailedMail($this->user));
}
}
6.3 重试失败任务 #
bash
# 重试所有失败任务
php artisan queue:retry all
# 重试指定任务
php artisan queue:retry 5
# 重试多个任务
php artisan queue:retry 5 6 7
6.4 删除失败任务 #
bash
# 删除指定任务
php artisan queue:forget 5
# 删除所有失败任务
php artisan queue:flush
七、任务中间件 #
7.1 创建中间件 #
php
// app/Jobs/Middleware/RateLimited.php
namespace App\Jobs\Middleware;
use Illuminate\Support\Facades\Redis;
class RateLimited
{
public function handle($job, $next)
{
Redis::throttle('key')
->allow(10)->every(60)
->then(function () use ($job, $next) {
$next($job);
}, function () use ($job) {
$job->release(10);
});
}
}
7.2 使用中间件 #
php
class SendEmail implements ShouldQueue
{
public function middleware()
{
return [
new RateLimited,
];
}
}
7.3 内置中间件 #
php
use Illuminate\Queue\Middleware\WithoutOverlapping;
use Illuminate\Queue\Middleware\ThrottleRequests;
use Illuminate\Queue\Middleware\SkipIfBatchCancelled;
public function middleware()
{
return [
(new WithoutOverlapping)->releaseAfter(60),
new ThrottleRequests(10),
new SkipIfBatchCancelled,
];
}
八、任务批处理 #
8.1 创建批处理表 #
bash
php artisan queue:batches-table
php artisan migrate
8.2 定义批处理任务 #
php
use Illuminate\Bus\Batchable;
class ImportCsv implements ShouldQueue
{
use Batchable;
public function handle()
{
if ($this->batch()->cancelled()) {
return;
}
// 处理数据
}
}
8.3 分发批处理 #
php
use Illuminate\Support\Facades\Bus;
use App\Jobs\ImportCsv;
$batch = Bus::batch([
new ImportCsv(1, 100),
new ImportCsv(101, 200),
new ImportCsv(201, 300),
])->then(function (Batch $batch) {
// 所有任务完成
})->catch(function (Batch $batch, Throwable $e) {
// 任务失败
})->finally(function (Batch $batch) {
// 无论成功失败都执行
})->dispatch();
九、Horizon #
9.1 安装Horizon #
bash
composer require laravel/horizon
php artisan horizon:install
9.2 配置Horizon #
php
// config/horizon.php
'environments' => [
'production' => [
'supervisor-1' => [
'maxProcesses' => 10,
'balanceMaxShift' => 1,
'balanceCooldown' => 3,
],
],
'local' => [
'supervisor-1' => [
'maxProcesses' => 3,
],
],
],
9.3 启动Horizon #
bash
php artisan horizon
十、总结 #
10.1 核心概念 #
| 概念 | 说明 |
|---|---|
| Job | 任务类 |
| Queue | 队列 |
| Worker | 队列处理器 |
| Dispatch | 分发任务 |
| Batch | 批处理 |
10.2 下一步 #
掌握了任务队列后,让我们继续学习 单元测试,了解Laravel测试!
最后更新:2026-03-28