前言
之前在一些博客上看到过讲如何实现延迟队列,但是平时没用上也没有动手实现过。
在上次面试的时候,面试官也问过我有没有用过延迟队列,最后凭借着记忆讲了下如何用 Redis 的有序集合实现延迟队列,以及有什么缺点。
纸上得来终觉浅,绝知此事要躬行。
这句诗就是本文的主要目的。
原理
主要用到了 Redis 的三个命令,ZADD
、ZREM
和 ZRANGEBYSCORE
。
ZADD key_name score value
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
ZREM key value [value ...]
ZADD
用于向有序集合中添加一条或多条数据,score
作为数据处理时间,value
存放数据。
ZRANGEBYSCORE
用于获取有序集合中指定时间范围内的数据,按照时间戳升序排列。
ZREM
用于将一条或多条数据从有序集合中移除。
举个例子:
# 添加测试数据
127.0.0.1:6379> ZADD test 1 a 2 b 3 c 4 d 5 e
(integer) 5
# 获取 2 到 4 之间的数据
127.0.0.1:6379> ZRANGEBYSCORE test 2 4
1) "b"
2) "c"
3) "d"
# 获取一条 2 到 4 之间的数据
127.0.0.1:6379> ZRANGEBYSCORE test 2 4 LIMIT 0 1
1) "b"
# 移除数据 b
127.0.0.1:6379> ZREM test b
(integer) 1
127.0.0.1:6379> ZRANGEBYSCORE test 2 4 LIMIT 0 1
1) "c"
在 Redis 2.4 版本前,ZADD、ZREM 每次只能添加/删除一条数据。
实现
用的是 predis/predis 扩展包操作 Redis。
use Predis\Client;
class RedisDelayQueue
{
/**
* @var Client
*/
private $client = null;
/**
* RedisDelayQueue constructor.
* @param Client $client
*/
public function __construct(Client $client = null)
{
$this->client = $client ?: $this->createClient();
}
private function createClient()
{
$params = [
'scheme' => 'tcp',
'host' => '127.0.0.1',
'port' => 6379,
];
return new Client($params);
}
/**
* 添加数据
*
* @param $queueName
* @param $data
* @return int
*/
public function push($queueName, $data)
{
$members = [];
$data = isset($data['content']) ? [$data] : $data;
foreach($data as $datum) {
$members[$datum['content']] = $datum['time'];
}
return $this->client->zadd($queueName, $members);
}
/**
* 消费数据
* @param $queueName
* @param \Closure $callback
* @param \Closure $catch
*/
public function consume($queueName, \Closure $callback, \Closure $catch = null)
{
$options = [
'limit' => [
'offset' => 0,
'count' => 1, // 只取一条数据
]
];
while (true) {
// 从集合中获取一条小于当前时间的数据
$result = $this->client->zrangebyscore($queueName, 0, time(), $options);
// 没获取到数据就休息一秒
if (empty($result)) {
sleep(1);
continue;
}
// 将数据从集合中移除
if (!$this->client->zrem($queueName, $result[0])) {
continue;
}
try {
$callback($result[0]);
} catch(\Exception $e) {
if (!$catch instanceof \Closure) {
echo 'ERROR:' . $e->getMessage().PHP_EOL;
continue;
}
$catch($e, $result);
}
}
}
}
因为存在多个进程处理同一个队列的情况,就会出现一条数据被多个进程获取到,所以只有当 ZREM 命令移除数据成功时,才算是真正的获取到了数据。
使用
添加数据
require_once './vendor/autoload.php';
require_once './RedisDelayQueue.php';
$queue = new RedisDelayQueue();
// 添加一条数据
$data = [
'content' => '投递数据的时间是:'.date('Y-m-d H:i:s'),
'time' => time() + 10, // 十秒后处理
];
// 添加多条数据
// $data = [
// [
// 'content' => '投递数据的时间是:'.date('Y-m-d H:i:s'),
// 'time' => time() + 10, // 十秒后处理
// ],
// ];
echo $queue->push('test', $data);
消费数据
require_once './vendor/autoload.php';
require_once './RedisDelayQueue.php';
$queue = new RedisDelayQueue();
$queue->consume('test', function ($value) {
printf("[%s] %s \r\n", date('Y-m-d H:i:s'), $value);
});
// 处理异常情况
$queue->consume('test', function ($value) {
printf("[%s] %s \r\n", date('Y-m-d H:i:s'), $value);
// 假装出现异常
throw new \Exception('数据库连接超时');
}, function ($e, $value) {
printf("发生异常:%s \r\n", $e->getMessage());
printf("获取到的数据:%s \r\n", $value);
});
运行结果
php index.php
[2020-01-19 02:29:47] 投递数据的时间是:2020-01-19 02:29:37
[2020-01-19 02:30:19] 投递数据的时间是:2020-01-19 02:30:09
使用 Lua 脚本进行优化
上面提到了,一条数据可能会被多个进程获取到,然后通过 ZREM
移除数据判断是否抢占数据成功,执行 ZRANGEBYSCORE
和 ZREM
命令会发出两次请求,那些没有抢到数据的进程就相当于这次数据白获取了。
使用 Lua 脚本进行优化,只发出一次请求,在服务端进行数据抢占操作。
Redis 中执行 Lua 脚本的命令是 EVAL
:
EVAL script numkeys key [key ...] arg [arg ...]
- script:Lua 脚本的内容。
- numkeys:命令的个数(用于区分 key 和 arg)。
- key:从 EVAL 的第三个参数开始算起,表示在脚本中所用到的那些 Redis 键(key),在 Lua 中通过全局变量 KEYS 数组访问。
- arg:附加参数,全局变量 ARGV 数组访问。
Redis 版本 >= 2.6.0 才能使用 Lua 脚本。
修改如下:
public function consume($queueName, \Closure $callback, \Closure $catch = null)
{
$script = $this->getLuaScript();
while (true) {
// 使用 eval 执行 Lua 脚本
$result = $this->client->eval($script, 2, $queueName, time());
// 没获取到数据就休息一秒
if (empty($result)) {
sleep(1);
continue;
}
try {
$callback($result);
} catch(\Exception $e) {
if (!$catch instanceof \Closure) {
echo 'ERROR:' . $e->getMessage().PHP_EOL;
continue;
}
$catch($e, $result);
}
}
}
/**
* 获取 lua 脚本
* @return string
*/
public function getLuaScript()
{
return <<<LUA
local result = redis.call('zrangebyscore', KEYS[1], 0, KEYS[2], 'limit', 0, 1);
if (table.getn(result) == 0)
then
return false;
end
if (redis.call('zrem', KEYS[1], result[1]) > 0)
then
return result[1];
else
return false;
end
LUA;
}
最后
需要注意是:基于 Redis 的延迟队列不能 100% 保证可靠性
。
如果在使用 ZREM
命令将数据从集合中移除后,处理数据时发生了异常,那么这条数据就丢失了,也就是缺少了 ACK 机制,所以在使用时候需要进行权衡,或者使用 RabbitMQ 这些专业的消息中间件。
写到这已经3点50了…溜了溜了
2020.01.20 01:28 更新
增加处理异常的回调参数,可自定义处理抛出的异常。