BaseJobs.php 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. <?php
  2. // +----------------------------------------------------------------------
  3. // | CRMEB [ CRMEB赋能开发者,助力企业发展 ]
  4. // +----------------------------------------------------------------------
  5. // | Copyright (c) 2016~2023 https://www.crmeb.com All rights reserved.
  6. // +----------------------------------------------------------------------
  7. // | Licensed CRMEB并不是自由软件,未经许可不能去掉CRMEB相关版权
  8. // +----------------------------------------------------------------------
  9. // | Author: CRMEB Team <admin@crmeb.com>
  10. // +----------------------------------------------------------------------
  11. namespace crmeb\basic;
  12. use crmeb\interfaces\JobInterface;
  13. use think\queue\Job;
  14. /**
  15. * 消息队列基类
  16. * Class BaseJobs
  17. * @package crmeb\basic
  18. */
  19. abstract class BaseJobs implements JobInterface
  20. {
  21. /**
  22. * @param $name
  23. * @param $arguments
  24. */
  25. public function __call($name, $arguments)
  26. {
  27. $this->fire(...$arguments);
  28. }
  29. /**
  30. * 运行消息队列
  31. * @param Job $job
  32. * @param $data
  33. */
  34. public function fire(Job $job, $data): void
  35. {
  36. try {
  37. $action = $data['do'] ?? 'doJob';//任务名
  38. $infoData = $data['data'] ?? [];//执行数据
  39. $errorCount = $data['errorCount'] ?? 0;//最大错误次数
  40. $this->runJob($action, $job, $infoData, $errorCount);
  41. } catch (\Throwable $e) {
  42. $job->delete();
  43. }
  44. }
  45. /**
  46. * 执行队列
  47. * @param string $action
  48. * @param Job $job
  49. * @param array $infoData
  50. * @param int $errorCount
  51. */
  52. protected function runJob(string $action, Job $job, array $infoData, int $errorCount = 3)
  53. {
  54. $action = method_exists($this, $action) ? $action : 'handle';
  55. if (!method_exists($this, $action)) {
  56. $job->delete();
  57. }
  58. if ($this->{$action}(...$infoData)) {
  59. //删除任务
  60. $job->delete();
  61. } else {
  62. if ($job->attempts() >= $errorCount && $errorCount) {
  63. //删除任务
  64. $job->delete();
  65. } else {
  66. //从新放入队列
  67. $job->release();
  68. }
  69. }
  70. }
  71. }