Thinkphp6 think-queue redis封装队列目标类
原创配置文件queue.php
// +----------------------------------------------------------------------
use think\facade\Env;
return [
default => redis,
connections => [
sync => [
type => sync,
],
database => [
type => database,
queue => default,
table => jobs,
],
redis => [
type => redis,
queue => defaultredis,
host => Env::get(redis.redis\_hostname, 127.0.0.1),
port => Env::get(redis.port, 6379),
password => Env::get(redis.redis\_password, ),
select => Env::get(redis.select, 0),
timeout => 0,
persistent => false,
],
],
failed => [
type => none,
table => failed\_jobs,
],
];
测试文件加入队列PublicController.php
do(ceshiqueue)->job(CeshiJob::class)->name(myqueue)->push();
if($res){
return app(json)->success(加入队列成功!);
}else{
return app(json)->fail(加入队列失败!);
}
}
}
### 队列工具类Queue.php
defaultDo = $this->do;
}
/**
* @return static
*/
public static function instance()
{
if (is\_null(self::$instance)) {
self::$instance = new static();
}
return self::$instance;
}
/**
* 放入消息队列
* @param array|null $data
* @return mixed
*/
public function push(?array $data = null)
{
if (!$this->job) {
return $this->setError(需要执行的队列类必须存在);
}
$res = QueueThink::{$this->action()}(...$this->getValues($data));
if(!$res){
$res = QueueThink::{$this->action()}(...$this->getValues($data));
if(!$res){
Log::error(加入队列失败,参数:.json\_encode($this->getValues($data)));
}
}
$this->clean();
return $res;
}
/**
* 队列名称
*/
public function name($queueName = null)
{
$this->queueName = $queueName;
return $this;
}
/**
* 清除数据
*/
public function clean()
{
$this->secs = 0;
$this->data = [];
$this->log = null;
$this->queueName = null;
$this->errorCount = 3;
$this->do = $this->defaultDo;
}
/**
* 获取任务方式
* @return string
*/
protected function action()
{
return $this->secs ? later : push;
}
/**
* 获取参数
* @param $data
* @return array
*/
protected function getValues($data)
{
$jobData[data] = $data ?: $this->data;
$jobData[do] = $this->do;
$jobData[errorCount] = $this->errorCount;
$jobData[log] = $this->log;
$jobData[queueName] = $this->log;
if ($this->do != $this->defaultDo) {
$this->job .= @ . $this->do;
}
if ($this->secs) {
return [$this->secs, $this->job, $jobData, $this->queueName];
} else {
return [$this->job, $jobData, $this->queueName];
}
}
/**
* @param $name
* @param $arguments
* @return $this
*/
public function \_\_call($name, $arguments)
{
if (in\_array($name, $this->rules)) {
if ($name === data) {
$this->{$name} = $arguments;
} else {
$this->{$name} = $arguments[0] ?? null;
}
return $this;
} else {
throw new \RuntimeException(Method does not exist . \_\_CLASS\_\_ . -> . $name . ());
}
}
}
### 队列任务接口类JobInterface.php
消息队列基类BaseJob.php
fire(...$arguments);
}
/**
* @param Job $job
* @param $data
*/
public function fire(Job $job, $data): void
{
try {
$action = $data[do] ?? doJob;//任务名
$infoData = $data[data] ?? [];//执行数据
$errorCount = $data[errorCount] ?? 0;//最大错误次数
$log = $data[log] ?? null;
if (method\_exists($this, $action)) {
$this->{$action}(...$infoData);
} else {
$job->delete();
}
} catch (\Throwable $e) {
$job->delete();
}
}
/**
* 打印出成功提示
* @param $log
* @return bool
*/
protected function info($log)
{
try {
if (is\_callable($log)) {
print\_r($log() . "\r\n");
} else if (is\_string($log) || is\_array($log)) {
print\_r($log . "\r\n");
}
} catch (\Throwable $e) {
print\_r($e->getMessage());
}
}
/**
* 任务失败执行方法
* @param $data
* @param $e
*/
public function failed($data, $e)
{
}
}
队列执行类CeshiJob.php
insert($arr);
if($res){
//删除任务
$job->delete();
}else{
if ($job->attempts() >= $errorCount && $errorCount) {
//删除任务
$job->delete();
} else {
//从新放入队列
$job->release();
}
}
} catch (\Exception $e) {
//删除任务
$job->delete();
}
}
}
执行命令:
php think queue:work --queue myqueue
或者php think queue:work --queue orderJobQueue --daemon
或者php think queue:work --queue 执行默认队列 版权声明
所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除
itfan123


