码迷,mamicode.com
首页 > 其他好文 > 详细

Lavavel5.5源代码 - RedisQueue是怎么实现

时间:2019-01-25 23:52:44      阅读:307      评论:0      收藏:0      [点我收藏+]

标签:延迟   his   cte   extend   nbsp   ret   cti   add   删除   

队列的基本功能:

  1、立即执行;yes
  2、延迟执行;yes
  3、保证至少执行一次;yes
  4、必须执行且最多执行一次;no

 

用到的数据结构:

  list、Sorted sets 

延迟执行的机制:
  1、先把数据放入SortedSets类型的queues:queue_000:delayed中
  2、在执行pop的时候,执行lua脚本,把SortedSets类型的queues:queue_000:delayed 中可以执行的数据rpush到list类型的queues:queue_000中

保证执行成功的机制:
  1、把要执行的数据先放入SortedSets类型的queues:queue_000:reserved中
  2、在执行pop的时候,执行lua脚本,把SortedSets类型的queues:queue_000:reserved 中可以执行的数据rpush到list类型的queues:queue_000中
      3、任务执行成功,从SortedSets类型的queues:queue_000:reserved中执行删除预存的数据

  

 

class RedisQueue extends Queue implements QueueContract
{
    public function pushRaw($payload, $queue = null, array $options = [])
    {
        $this->getConnection()->rpush(
            $this->getQueue($queue), // list类型的queues:queue_000
            $payload // $payload === "标准化后的数据,进行json格式化"的数据
        );

        return json_decode($payload, true)[‘id‘] ?? null;
    }


    protected function laterRaw($delay, $payload, $queue = null)
    {
        $this->getConnection()->zadd(
            $this->getQueue($queue).‘:delayed‘,  // SortedSets类型的queues:queue_000:delayed
            $this->availableAt($delay), // 延迟执行
            $payload // $payload === "标准化后的数据,进行json格式化"的数据
        );

        return json_decode($payload, true)[‘id‘] ?? null;
    }


    public function pop($queue = null)
    {
        // 执行lua脚本,把SortedSets类型的queues:queue_000:delayed 中可以执行的数据rpush到list类型的queues:queue_000中
        // 执行lua脚本,把SortedSets类型的queues:queue_000:reserved 中可以执行的数据rpush到list类型的queues:queue_000中
        $this->migrate($prefixed = $this->getQueue($queue));

        // 执行lua脚本,从list类型的queues:queue_000中lpop出数据,attempts加1,然后设定超时时间并放入结构把SortedSets类型的queues:queue_000:reserved 中
        list($job, $reserved) = $this->retrieveNextJob($prefixed);

        if ($reserved) {
            return new RedisJob(
                $this->container, $this, $job,
                $reserved, $this->connectionName, $queue ?: $this->default
            );
        }
    }

}

  

Lavavel5.5源代码 - RedisQueue是怎么实现

标签:延迟   his   cte   extend   nbsp   ret   cti   add   删除   

原文地址:https://www.cnblogs.com/xiaoyaogege/p/10322253.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!