Thinkphp6 think-queue redis封装队列目标类

原创
小哥 3年前 (2022-11-03) 阅读数 11 #PHP

配置文件queue.php

  
// +----------------------------------------------------------------------  
use think\facade\Env;

return [  
default     => redis,  
connections => [  
sync     => [  
type => sync,  
],  
database => [  
type => database,  
queue  => default,  
table  => jobs,  
],  
redis    => [  
type     => redis,  
queue      => defaultredis,  
host       => Env::get(redis.redis\_hostname, 127.0.0.1),  
port       => Env::get(redis.port, 6379),  
password   => Env::get(redis.redis\_password, ),  
select     => Env::get(redis.select, 0),  
timeout    => 0,  
persistent => false,  
],  
],  
failed      => [  
type  => none,  
table => failed\_jobs,  
],  

];

测试文件加入队列PublicController.php

do(ceshiqueue)->job(CeshiJob::class)->name(myqueue)->push();  
if($res){  
return app(json)->success(加入队列成功!);  
}else{  
return app(json)->fail(加入队列失败!);  
}  
}

}  

### 队列工具类Queue.php

defaultDo = $this->do;  
}

/**  
* @return static  
*/  
public static function instance()  
{  
if (is\_null(self::$instance)) {  
self::$instance = new static();  
}  
return self::$instance;  
}

/**  
* 放入消息队列  
* @param array|null $data  
* @return mixed  
*/  
public function push(?array $data = null)  
{  
if (!$this->job) {  
return $this->setError(需要执行的队列类必须存在);  
}  
$res = QueueThink::{$this->action()}(...$this->getValues($data));  
if(!$res){  
$res = QueueThink::{$this->action()}(...$this->getValues($data));  
if(!$res){  
Log::error(加入队列失败,参数:.json\_encode($this->getValues($data)));  
}  
}  
$this->clean();  
return $res;  
}

/**  
* 队列名称  
*/  
public function name($queueName = null)  
{  
$this->queueName = $queueName;  
return $this;  
}  

/**  
* 清除数据  
*/  
public function clean()  
{  
$this->secs = 0;  
$this->data = [];  
$this->log = null;  
$this->queueName = null;  
$this->errorCount = 3;  
$this->do = $this->defaultDo;  
}

/**  
* 获取任务方式  
* @return string  
*/  
protected function action()  
{  
return $this->secs ? later : push;  
}

/**  
* 获取参数  
* @param $data  
* @return array  
*/  
protected function getValues($data)  
{  
$jobData[data] = $data ?: $this->data;  
$jobData[do] = $this->do;  
$jobData[errorCount] = $this->errorCount;  
$jobData[log] = $this->log;  
$jobData[queueName] = $this->log;  
if ($this->do != $this->defaultDo) {  
$this->job .= @ . $this->do;  
}  
if ($this->secs) {  
return [$this->secs, $this->job, $jobData, $this->queueName];  
} else {  
return [$this->job, $jobData, $this->queueName];  
}  
}

/**  
* @param $name  
* @param $arguments  
* @return $this  
*/  
public function \_\_call($name, $arguments)  
{  
if (in\_array($name, $this->rules)) {  
if ($name === data) {  
$this->{$name} = $arguments;  
} else {  
$this->{$name} = $arguments[0] ?? null;  
}  
return $this;  
} else {  
throw new \RuntimeException(Method does not exist . \_\_CLASS\_\_ . -> . $name . ());  
}  
}  
}  

### 队列任务接口类JobInterface.php

消息队列基类BaseJob.php

fire(...$arguments);  
}

/**  
* @param Job $job  
* @param $data  
*/  
public function fire(Job $job, $data): void  
{  
try {  
$action = $data[do] ?? doJob;//任务名  
$infoData = $data[data] ?? [];//执行数据  
$errorCount = $data[errorCount] ?? 0;//最大错误次数  
$log = $data[log] ?? null;  
if (method\_exists($this, $action)) {  
$this->{$action}(...$infoData);  
} else {  
$job->delete();  
}  
} catch (\Throwable $e) {  
$job->delete();  
}  
}

/**  
* 打印出成功提示  
* @param $log  
* @return bool  
*/  
protected function info($log)  
{  
try {  
if (is\_callable($log)) {  
print\_r($log() . "\r\n");  
} else if (is\_string($log) || is\_array($log)) {  
print\_r($log . "\r\n");  
}  
} catch (\Throwable $e) {  
print\_r($e->getMessage());  
}  
}

/**  
* 任务失败执行方法  
* @param $data  
* @param $e  
*/  
public function failed($data, $e)  
{

}  
}  

队列执行类CeshiJob.php

insert($arr);  
if($res){  
//删除任务  
$job->delete();  
}else{  
if ($job->attempts() >= $errorCount && $errorCount) {  
//删除任务  
$job->delete();  
} else {  
//从新放入队列  
$job->release();  
}  
}  
} catch (\Exception $e) {  
//删除任务  
$job->delete();  
}  
}  
}  

执行命令:

php think queue:work --queue myqueue

或者php think queue:work --queue orderJobQueue --daemon

或者php think queue:work --queue 执行默认队列
版权声明

所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除