消息中间件有很多,例如 ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ。这些消息系统都很专业,无论是可靠性,容错性,高性能都有自己独特的特点,那为什么我们还要用redis做消息队列
1.为什么用redis做消息队列
1.1拥有普通消息组件的特性
1.2轻量易用
搭建一个mq需要你学习它,有的mq搭建还不是那么容易,但是redis很方便搭建,用的人多,学习成本也低
1.3 省钱省力省时
一般的公司都有redis方面的应用,例如做缓存,我们就不需要额外的服务器资源来重新搭建mq,对于开发人员来说,学习redis很简单,大多数也用过,开放起来很轻松也很快。
2.redis实现消息确认 ack机制
像RabbitMQ这种消息队列一般都有先消费再commit提交确认消费的机制,而kafka也可以通过提交偏移量来实现ack:Kafka ack消费确认-提交和偏移量 详解,redis则要另辟蹊径,前面分享过 java实现Redis消息发布和订阅,这里主要讲解redis消息确认的案例。
2.1 如何用redis实现消息发布和消费
使用list,set等集合的 pop和push就能快速实现实时的消息消费和发布
例如redis List(列表) 的 LPUSH 和 RPOP
2.2 redis如何实现消息确认 ack
2.2.1 什么是ack
ACK的全称为Acknowledge character,即确认字符,表示接收到的字符无错误。意思就是我把消息处理完了我再告诉mq,我业务逻辑都搞完了,你服务器可以把这条消息删了。这就是为了防止消息丢失,防止业务层消费了消息,而没把活干完。
2.2.2 redis实现ack的原理
- 维护两个队列,假如一个消息队列的key叫做task,另外一个队列的key是doing表示正在处理的任务
- 每次消费task的消息的同时就放到doing队列
- 处理完消息就删除doing对应的消息
- 使用定时任务检查doing队列,检查消息中的时间属性(这个需要产生消息的时候就可以加入时间戳),如果超时就把消息回滚到task队列
2.2.3 redis ack消费确认案例代码
主要使用到新方法RPOPLPUSH,将消息取出并同时放入另外一个队列
java代码 消费案例
//消费消息并放入doing队列
Object msg=redisTemplate.opsForList().rightPopAndLeftPush(taskKey,doingKey);
//处理业务.....
//处理业务删除doing队列消息
redisTemplate.opsForList().remove(doingKey,0,value);
java代码 超时回滚
//查询doing所有的消息
List<Object> list=redisTemplate.opsForList().range(doingKey,0,-1);
for(Object obj:list){
JSONObject contentObj= JSON.parseObject(obj.toString());
long createTime=msgObj.getString("createTime");
//判断是否超时
if(new Date().getTime()-createTime>10000){
//如果超时就把消息放回原队列
//...放回原队列之前你还可以增加其他属性,比如回滚的次数,超过一定次数你可以不回滚
redisTemplate.opsForList().leftPush(taskKey,JSON.toJSONString(contentObj));
}
}
spring boot redis配置
spring.redis.host=192.168.1.111
spring.redis.port=6379
spring.redis.password=
spring.redis.taskKey=taskKey
spring.redis.doingKey=doingKey
到此redis做消息案例已经完美完成,那其他redis岂不是没什么用了?当然不是还是说各自有各自的优点,有各自的场景,redis适合一般的业务系统,而像大数据处理系统,日志收集,就需要类似kafka这种大名鼎鼎的MQ!