之所以使用redis做消息队列,就是因为使用 pop和push简单,但需要考虑消息丢失和重复消费的问题
1.redis防止消息丢失
在前面一篇文章(redis实现消息消费确认(ack机制)),实现了redis使用RPOPLPUSH 来实现消费确认,来防止消息丢失。
RPOPLPUSH 在一个原子时间内,执行以下两个动作:
将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端。
将 source 弹出的元素插入到列表 destination ,作为 destination 列表的的头元素。
1.1 我们首先在task队列插入消息之前给消息放入日期属性(一般消息使用json字符串)
1.2 我们只需要使用RPOPLPUSH 将消费到的消息同时放入一个doing队列
1.3 处理完成久删除doing队列中的消息
1.4 定时扫描doing队列中的消息,日期超过一定时间 就重新把消息插入到task队列
问题:这会导致一个小问题,那就是当消息很多,积压时间较长,就会多次执行1.4的步骤,导致重复消费。
2.redis解决重复消费
上面消息的时间属性是消息的创建时间,而不是消息消费的时间,实际上,我们判断消息是否重复,需要使用消息消费的时间,这样才能正常判断超时,正常回滚到task队列。
解决方法:
2.1在消息中放入uuid
2.2再使用一个tasktime map队列来记录消费消息的消费时间,key是消息的uuid,value是消费的时间。