当创建队列jobs、监听器或订阅服务器以推送到队列中时,您可能会开始认为,一旦分派,队列工作器决定如何处理您的逻辑就完全由您自己决定了。
嗯……并不是说你不能从作业内部与队列工作器交互,但是通常情况下,哪怕你做了,也是没必要的。
这个神奇的骚操作的出现是因为“InteractsWithQueue”这个trait。.当排队作业正在从队列中拉出, 这个 [CallQueuedListener](https://github.com/laravel/framework/blob/5.8/src/Illuminate/Events/CallQueuedListener.php#L90-L104) 会检查它是否在使用 InteractsWithQueue trait, 如果是的话,框架会将底层的“队列jobs”实例注入到内部。
这个 “任务” 实例类似于一个包装了真正的 Job 类的驱动,其中包含队列连接和尝试等信息。
背景
我将以一个转码 Job 为例。 这是一个将广播音频文件转换成192kbps MP3格式的任务。因为这是在自由转码队列中设置的,所以它的作用有限。
检查尝试次数
attempts()是被调用的第一个方法, 顾名思义,它返回尝试次数,一个队列 job总是伴随着一个attempt启动。
此方法旨在与其他方法一起使用 ..., 类似 fail() 或者 release() (delay). 为了便于说明,我们将通知用户第几次重试: 每次我们尝试在空闲队列中转换(转换代码)时,我们都会通知用户我们正在第几次重试,让他可以选择取消将来的转换(转换代码)。
- 1 <?php
- 2 namespace App\Jobs;
- 3 use App\Podcast;
- 4 use Transcoder\Transcoder;
- 5 use Illuminate\Bus\Queueable;
- 6 use Illuminate\Queue\SerializesModels;
- 7 use App\Notifications\PodcastTranscoded;
- 8 use Illuminate\Queue\InteractsWithQueue;
- 9 use Illuminate\Foundation\Bus\Dispatchable;
- 10 use App\Notifications\RetyingPodcastTranscode;
- 11 class TranscodePodcast
- 12 {
- 13 use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
- 14 /**
- 15 * Transcoder Instance
- 16 *
- 17 * @var \App\Podcast
- 18 */
- 19 protected $podcast;
- 20 /**
- 21 * 创建一个新的转码podcast实例。
- 22 *
- 23 * @param \App\Podcast $podcast
- 24 * @return void
- 25 */
- 26 public function __construct(Podcast $podcast)
- 27 {
- 28 $this->podcast = $podcast;
- 29 }
- 30 /**
- 31 * 执行队列job.
- 32 *
- 33 * @param \Transcoder\Transcoder $podcast
- 34 * @return void
- 35 */
- 36 public function handle(Transcoder $transcoder)
- 37 {
- 38 // 告诉用户我们第几次重试
- 39 if ($this->attempts() > 1) {
- 40 $this->podcast->publisher->notify(new RetryingPodcastTranscode($this->podcast, $this->attempts());
- 41 }
- 42 $transcoded = $this->transcoder->setFile($event->podcast)
- 43 ->format('mp3')
- 44 ->bitrate(192)
- 45 ->start();
- 46
- 47
- 48 // 将转码podcast与原始podcast关联
- 49 $this->podcast->transcode()->associate($transcoded);
- 50
- 51 // 通知podcast的发布者他的podcast已经准备好了
- 52
- 53 $this->publisher->notify(new PodcastTranscoded($this->podcast));
- 54 }
- 55 }
告诉用户我们将在第几次时重试某些内容,这在逻辑预先失败时很有用,让用户(或开发人员)检查出了什么问题,但当然您可以做更多的事情。
就我个人而言,我喜欢在“Job作业”失败后再这样做,如果还有重试时间,告诉他我们稍后会重试当然,这个例子只是为了举例说明。
删除作业队列 Job
第二个方法就是 delete(). 跟你猜想的一样,您可以从队列中删除当前的“队列 Job”。 当队列 Job或侦听器由于多种原因排队后不应处理时,这将会很方便,例如,考虑一下这个场景:在转码发生之前,上传podcast的发布者由于任何原因(比如TOS冲突)被停用,我们应该不处理podcast。
我们将在前面的示例中添加该代码:
- 1 <?php
- 2 namespace App\Jobs;
- 3 use App\Podcast;
- 4 use Transcoder\Transcoder;
- 5 use Illuminate\Bus\Queueable;
- 6 use Illuminate\Queue\SerializesModels;
- 7 use App\Notifications\PodcastTranscoded;
- 8 use Illuminate\Queue\InteractsWithQueue;
- 9 use Illuminate\Foundation\Bus\Dispatchable;
- 10 use App\Notifications\RetyingPodcastTranscode;
- 11 class TranscodePodcast
- 12 {
- 13 use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
- 14 /**
- 15 * Transcoder Instance
- 16 *
- 17 * @var \App\Podcast
- 18 */
- 19 protected $podcast;
- 20 /**
- 21 * 创建一个新的转码podcast实例。
- 22 *
- 23 * @param \App\Podcast $podcast
- 24 * @return void
- 25 */
- 26 public function __construct(Podcast $podcast)
- 27 {
- 28 $this->podcast = $podcast;
- 29 }
- 30 /**
- 31 * 执行队列 job.
- 32 *
- 33 * @param \Transcoder\Transcoder $podcast
- 34 * @return void
- 35 */
- 36 public function handle(Transcoder $transcoder)
- 37 {
- 38 // 如果发布服务器已被停用,请删除此队列job
- 39 if ($this->podcast->publisher->isDeactivated()) {
- 40 $this->delete();
- 41 }
- 42 // 告诉用户我们第几次重试
- 43 if ($this->attempts() > 1) {
- 44 $this->podcast->publisher->notify(new RetryingPodcastTranscode($this->podcast, $this->attempts());
- 45 }
- 46 $transcoded = $this->transcoder->setFile($event->podcast)
- 47 ->format('mp3')
- 48 ->bitrate(192)
- 49 ->start();
- 50
- 51 // 将转码podcast与原始podcast关联
- 52 $this->podcast->transcode()->associate($transcoded);
- 53
- 54 // 通知podcast的发布者他的podcast已经准备好了
- 55 $this->publisher->notify(new PodcastTranscoded($this->podcast));
- 56 }
- 57 }
如果需要删除可能已删除的模型上的作业,则可能需要 设置 [$deleteWhenMissingModels](https://laravel.com/docs/5.8/queues#ignoring-missing-models) 为真 t避免处理不存在的东西。
失败的队列job
当您需要控制人为破坏逻辑时,这非常非常方便, 因为使用空的“return”语句会将“Job”标记为已成功完成。您可以强制使排队的作业失败,希望出现异常,允许处理程序在可能的情况下稍后重试。
这使您在作业失败时可以更好地控制在任何情况下,也可以使用“failed()”方法, 它允许你 失败后执行任何清洁操纵, 比如通知用户或者删除一些东西。
在此示例中,如果由于任何原因(如 CDN 关闭时)无法从存储中检索podcast ,则作业将失败,并引发自定义异常。
- 1 <?php
- 2 namespace App\Jobs;
- 3 use App\Podcast;
- 4 use Transcoder\Transcoder;
- 5 use Illuminate\Bus\Queueable;
- 6 use Illuminate\Queue\SerializesModels;
- 7 use App\Exceptions\PodcastUnretrievable;
- 8 use App\Notifications\PodcastTranscoded;
- 9 use Illuminate\Queue\InteractsWithQueue;
- 10 use Illuminate\Foundation\Bus\Dispatchable;
- 11 use App\Notifications\RetyingPodcastTranscode;
- 12 class TranscodePodcast
- 13 {
- 14 use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
- 15 /**
- 16 * 转码器实例
- 17 *
- 18 * @var \App\Podcast
- 19 */
- 20 protected $podcast;
- 21 /**
- 22 * 创建一个新的转码Podcast实例。
- 23 *
- 24 * @param \App\Podcast $podcast
- 25 * @return void
- 26 */
- 27 public function __construct(Podcast $podcast)
- 28 {
- 29 $this->podcast = $podcast;
- 30 }
- 31 /**
- 32 * 执行队列 job.
- 33 *
- 34 * @param \Transcoder\Transcoder $podcast
- 35 * @return void
- 36 */
- 37 public function handle(Transcoder $transcoder)
- 38 {
- 39 // 如果发布服务器已被停用,请删除此队列job
- 40 if ($this->podcast->publisher->isDeactivated()) {
- 41 $this->delete();
- 42 }
- 43 //如果podcast不能从storage存储中检索,我们就会失败。
- 44 if ($this->podcast->fileDoesntExists()) {
- 45 $this->fail(new PodcastUnretrievable($this->podcast));
- 46 }
- 47 // 告诉用户我们第几次重试
- 48 if ($this->attempts() > 1) {
- 49 $this->podcast->publisher->notify(new RetryingPodcastTranscode($this->podcast, $this->attempts());
- 50 }
- 51
- 52 $transcoded = $this->transcoder->setFile($event->podcast)
- 53 ->format('mp3')
- 54 ->bitrate(192)
- 55 ->start();
- 56
- 57 // 将转码podcast与原始podcast关联
- 58 $this->podcast->transcode()->associate($transcoded);
- 59
- 60 // 通知podcast的发布者他的podcast已经准备好了
- 61 $this->publisher->notify(new PodcastTranscoded($this->podcast));
- 62 }
- 63 }
现在,进入最后的方法。
释放(延迟)队列job
这可能是trait性状的最有用的方法, 因为它可以让你在未来进一步推动这项队列job. 此方法用于 队列job速率限制.
除了速率限制之外,您还可以在某些不可用但希望在不久的将来使用它的情况下使用它同时,避免先发制人地失败。
在最后一个示例中,我们将延迟转码以备稍后使用:如果转码器正在大量使用,我们将延迟转码5分钟直到负载降低。
- 1 <?php
- 2 namespace App\Jobs;
- 3 use App\Podcast;
- 4 use Transcoder\Transcoder;
- 5 use Illuminate\Bus\Queueable;
- 6 use Illuminate\Queue\SerializesModels;
- 7 use App\Exceptions\PodcastUnretrievable;
- 8 use App\Notifications\PodcastTranscoded;
- 9 use Illuminate\Queue\InteractsWithQueue;
- 10 use App\Notifications\TranscoderHighUsage;
- 11 use Illuminate\Foundation\Bus\Dispatchable;
- 12 use App\Notifications\RetyingPodcastTranscode;
- 13 class TranscodePodcast
- 14 {
- 15 use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
- 16 /**
- 17 * Transcoder Instance
- 18 *
- 19 * @var \App\Podcast
- 20 */
- 21 protected $podcast;
- 22 /**
- 23 * 创建一个新的转码podcast实例。
- 24 *
- 25 * @param \App\Podcast $podcast
- 26 * @return void
- 27 */
- 28 public function __construct(Podcast $podcast)
- 29 {
- 30 $this->podcast = $podcast;
- 31 }
- 32 /**
- 33 * 执行队列job.
- 34 *
- 35 * @param \Transcoder\Transcoder $podcast
- 36 * @return void
- 37 */
- 38 public function handle(Transcoder $transcoder)
- 39 {
- 40 // 如果发布服务器已被停用,请删除此队列job
- 41 if ($this->podcast->publisher->isDeactivated()) {
- 42 $this->delete();
- 43 }
- 44 // 如果podcast不能从storage存储中检索,我们就会失败。
- 45 if ($this->podcast->fileDoesntExists()) {
- 46 $this->fail(new PodcastUnretrievable($this->podcast));
- 47 }
- 48
- 49 // 如果转码器使用率很高,我们将
- 50 // t延迟转码5分钟. 否则我们可能会有拖延转码器进程的危险
- 51 // 它会把所有的转码子进程都记录下来。
- 52 if ($transcoder->getLoad()->isHigh()) {
- 53 $delay = 60 * 5;
- 54 $this->podcast->publisher->notify(new TranscoderHighUsage($this->podcast, $delay));
- 55 $this->release($delay);
- 56 }
- 57 // 告诉用户我们第几次重试
- 58 if ($this->attempts() > 1) {
- 59 $this->podcast->publisher->notify(new RetryingPodcastTranscode($this->podcast, $this->attempts());
- 60 }
- 61
- 62 $transcoded = $this->transcoder->setFile($event->podcast)
- 63 ->format('mp3')
- 64 ->bitrate(192)
- 65 ->start();
- 66
- 67 // 将转码podcast与原始podcast关联
- 68 $this->podcast->transcode()->associate($transcoded);
- 69
- 70 // 通知podcast的发布者他的podcast已经准备好了
- 71 $this->publisher->notify(new PodcastTranscoded($this->podcast));
- 72 }
- 73 }
我们可以使用一些特殊方法,例如,获得分配给转码器的一些时隙,如果转码器时隙已满,则延迟作业。 在排队的工作中,你能做的就只有这些了。排队愉快。