java实现Redis消息发布和订阅

redis | 2020-02-07 15:53:18

我再次觉得在简单消息应用的场景使用redis实在很方便,不仅可以使用smove或RPOPLPUSH等命令实现消息不丢失(ack模式),也可以直接使用pop和push来消费和发布消息,更可以使用PUBLISH/SUBSCRIBE发布订阅的模式。

1.Redis发布订阅架构
Redis提供了发布订阅功能,可以用于消息的传输,Redis的发布订阅机制包括三个部分,发布者,订阅者和Channel。
发布者和订阅者都是Redis客户端,Channel则为Redis服务器端,发布者将消息发送到某个的频道,订阅了这个频道的订阅者就能接收到这条消息。Redis的这种发布订阅机制与基于主题的发布订阅类似,Channel相当于主题。

2.Redis发布订阅功能
(1)发送消息
Redis采用PUBLISH命令发送消息,其返回值为接收到该消息的订阅者的数量。
这里写图片描述
(2)订阅某个频道
Redis采用SUBSCRIBE命令订阅某个频道,其返回值包括客户端订阅的频道,目前已订阅的频道数量,以及接收到的消息,其中subscribe表示已经成功订阅了某个频道。

3.代码案例:
3.1首先引入相关依赖

<dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>2.9.0</version>
    </dependency>

3.2定义消息发布者类-Publisher

package com.cicc.config.management.subsciber;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class Publisher extends Thread{

    private final JedisPool jedisPool;

    public Publisher(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }

    @Override
    public void run() {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        Jedis jedis = jedisPool.getResource();   //连接池中取出一个连接
        while (true) {
            try {
                jedis.publish("mychannel", reader.readLine());   //从 mychannel 的频道上推送消息
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

3.3定义消息订阅类-SubThread

package com.cicc.config.management.subsciber;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class SubThread extends Thread {

    private final JedisPool jedisPool;
    private final Subscriber subscriber = new Subscriber();

    private final String channel = "mychannel";

    public SubThread(JedisPool jedisPool) {
        super("SubThread");
        this.jedisPool = jedisPool;
    }

    @Override
    public void run() {
        System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();   //取出一个连接
            jedis.subscribe(subscriber, channel);    //通过subscribe 的api去订阅,入参是订阅者和频道名
        } catch (Exception e) {
            System.out.println(String.format("subsrcibe channel error, %s", e));
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }
}

3.4定义订阅消息处理类-Subscriber 继承了JedisPubSub,其中onMessage可以根据业务需求来重写

package com.cicc.config.management.subsciber;

import redis.clients.jedis.JedisPubSub;

public class Subscriber extends JedisPubSub {

    public Subscriber(){}
    @Override
    public void onMessage(String channel, String message) {       //收到消息会调用
        System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message));
    }

    @Override
    public void onSubscribe(String channel, int subscribedChannels) {    //订阅了频道会调用
        System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
                channel, subscribedChannels));
    }
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {   //取消订阅 会调用
        System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
                channel, subscribedChannels));

    }
}

3.5编写测试类:PSTest

package com.cicc.config.management.test;

import com.cicc.config.management.subsciber.Publisher;
import com.cicc.config.management.subsciber.SubThread;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class PSTest{

    public static void main( String[] args )
    {
        // 连接本地redis服务端
        JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 6379);

        Publisher publisher = new Publisher(jedisPool);    //发布者
        publisher.start();

        SubThread subThread = new SubThread(jedisPool);  //订阅者
        subThread.start();
    }
}

 

登录后即可回复 登录 | 注册
    
关注编程学问公众号