我再次觉得在简单消息应用的场景使用redis实在很方便,不仅可以使用smove或RPOPLPUSH等命令实现消息不丢失(ack模式),也可以直接使用pop和push来消费和发布消息,更可以使用PUBLISH/SUBSCRIBE发布订阅的模式。
1.Redis发布订阅架构
Redis提供了发布订阅功能,可以用于消息的传输,Redis的发布订阅机制包括三个部分,发布者,订阅者和Channel。
发布者和订阅者都是Redis客户端,Channel则为Redis服务器端,发布者将消息发送到某个的频道,订阅了这个频道的订阅者就能接收到这条消息。Redis的这种发布订阅机制与基于主题的发布订阅类似,Channel相当于主题。
2.Redis发布订阅功能
(1)发送消息
Redis采用PUBLISH命令发送消息,其返回值为接收到该消息的订阅者的数量。
这里写图片描述
(2)订阅某个频道
Redis采用SUBSCRIBE命令订阅某个频道,其返回值包括客户端订阅的频道,目前已订阅的频道数量,以及接收到的消息,其中subscribe表示已经成功订阅了某个频道。
3.代码案例:
3.1首先引入相关依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
3.2定义消息发布者类-Publisher
package com.cicc.config.management.subsciber;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class Publisher extends Thread{
private final JedisPool jedisPool;
public Publisher(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
@Override
public void run() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
Jedis jedis = jedisPool.getResource(); //连接池中取出一个连接
while (true) {
try {
jedis.publish("mychannel", reader.readLine()); //从 mychannel 的频道上推送消息
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
3.3定义消息订阅类-SubThread
package com.cicc.config.management.subsciber;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class SubThread extends Thread {
private final JedisPool jedisPool;
private final Subscriber subscriber = new Subscriber();
private final String channel = "mychannel";
public SubThread(JedisPool jedisPool) {
super("SubThread");
this.jedisPool = jedisPool;
}
@Override
public void run() {
System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));
Jedis jedis = null;
try {
jedis = jedisPool.getResource(); //取出一个连接
jedis.subscribe(subscriber, channel); //通过subscribe 的api去订阅,入参是订阅者和频道名
} catch (Exception e) {
System.out.println(String.format("subsrcibe channel error, %s", e));
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}
3.4定义订阅消息处理类-Subscriber 继承了JedisPubSub,其中onMessage可以根据业务需求来重写
package com.cicc.config.management.subsciber;
import redis.clients.jedis.JedisPubSub;
public class Subscriber extends JedisPubSub {
public Subscriber(){}
@Override
public void onMessage(String channel, String message) { //收到消息会调用
System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message));
}
@Override
public void onSubscribe(String channel, int subscribedChannels) { //订阅了频道会调用
System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
channel, subscribedChannels));
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) { //取消订阅 会调用
System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
channel, subscribedChannels));
}
}
3.5编写测试类:PSTest
package com.cicc.config.management.test;
import com.cicc.config.management.subsciber.Publisher;
import com.cicc.config.management.subsciber.SubThread;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class PSTest{
public static void main( String[] args )
{
// 连接本地redis服务端
JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 6379);
Publisher publisher = new Publisher(jedisPool); //发布者
publisher.start();
SubThread subThread = new SubThread(jedisPool); //订阅者
subThread.start();
}
}