参考:
https://github.com/josiahcarlson/redis-in-action
maven项目导入依赖
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.0.1</version>
</dependency>
通过Redis完成两个平台进行消息发布订阅的功能。
继承JedisPubSub类重载为自己的订阅方法
本例中我们实现一个订阅器RedisSubscriber
,对发布时不进行重载
package com.maple.demo_for_springfox.test_for_jedis.service;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;
/**
* @author Chenyufeng
* @title: RedisSubscriber
* @projectName demo_for_spring
* @description: TODO
* @date 2020/8/18 11:01
*/
public class RedisSubscriber extends JedisPubSub {
@Override
public void subscribe(String... channels) {
super.subscribe(channels);
}
@Override
// 当发起订阅的时候会执行如下方法
public void onSubscribe(String channel, int subscribedChannels) {
super.onSubscribe(channel, subscribedChannels);
System.out.println(String.format("订阅消息:频道%s , 频道数 %d", channel, subscribedChannels));
}
@Override
// 当有消息接收的时候会执行如下方法
public void onMessage(String channel, String message) {
super.onMessage(channel, message);
System.out.println(String.format("消费消息:频道%s ,消息:%s", channel, message));
}
}
通过Jedis对象来进行订阅消息的时候,会阻塞等待,因此我们的采用开辟线程的方式来接收消息!
关于阻塞之处:Redis接收数据
Jedis jedis = jedisPool.getResource();
jedis.subscribe(redisSubscriber, "ADD_ORG");
jedis.close();
接收/订阅端线程
class mSub implements Runnable {
private final JedisPool jedisPool;
private String channelRedis;
private RedisSubscriber redisSubscriber;
public mSub(JedisPool jedisPool, String channelRedis, RedisSubscriber subscriber) {
this.jedisPool = jedisPool;
this.channelRedis = channelRedis;
this.redisSubscriber = subscriber;
}
@Override
public void run() {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.subscribe(redisSubscriber, channelRedis);
} catch (Exception e) {
System.out.println("订阅失败!");
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}
消息发布
这里需注意,jedis2进行完发布之后需要进行close(),否则会因无法获得资源而阻塞。RedisPool资源定义
public static void main(String[] args) {
JedisPool jedisPool = new JedisPool("172.18.21.79");
RedisSubscriber redisSubscriber = new RedisSubscriber();
mSub sub = new TestJedis().new mSub(jedisPool, "ADD_ORG", redisSubscriber);
Thread thread = new Thread(sub);
thread.start();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
// System.out.println("mSub线程状态:" + thread.getState());
String msg = scanner.next();
System.out.println("new Message: " + msg);
Jedis jedis2 = jedisPool.getResource();
jedis2.publish("ADD_ORG", msg);
jedis2.close(); //不关闭会导致之后无法获得jedis资源。
}
}
Jedis在订阅数据后客户端的处理流程如下:
在 List<Object> reply = client.getRawObjectMultiBulkReply();
语句将会一直等待发布消息的到来。
private void process(Client client) {
do {
List<Object> reply = client.getRawObjectMultiBulkReply();
final Object firstObj = reply.get(0);
if (!(firstObj instanceof byte[])) {
throw new JedisException("Unknown message type: " + firstObj);
}
final byte[] resp = (byte[]) firstObj;
if (Arrays.equals(SUBSCRIBE.raw, resp)) {
subscribedChannels = ((Long) reply.get(2)).intValue();
final byte[] bchannel = (byte[]) reply.get(1);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
onSubscribe(strchannel, subscribedChannels);
} else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) {
subscribedChannels = ((Long) reply.get(2)).intValue();
final byte[] bchannel = (byte[]) reply.get(1);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
onUnsubscribe(strchannel, subscribedChannels);
} else if (Arrays.equals(MESSAGE.raw, resp)) {
final byte[] bchannel = (byte[]) reply.get(1);
final byte[] bmesg = (byte[]) reply.get(2);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
onMessage(strchannel, strmesg);
} else if (Arrays.equals(PMESSAGE.raw, resp)) {
final byte[] bpattern = (byte[]) reply.get(1);
final byte[] bchannel = (byte[]) reply.get(2);
final byte[] bmesg = (byte[]) reply.get(3);
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
onPMessage(strpattern, strchannel, strmesg);
} else if (Arrays.equals(PSUBSCRIBE.raw, resp)) {
subscribedChannels = ((Long) reply.get(2)).intValue();
final byte[] bpattern = (byte[]) reply.get(1);
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
onPSubscribe(strpattern, subscribedChannels);
} else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) {
subscribedChannels = ((Long) reply.get(2)).intValue();
final byte[] bpattern = (byte[]) reply.get(1);
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
onPUnsubscribe(strpattern, subscribedChannels);
} else if (Arrays.equals(PONG.raw, resp)) {
final byte[] bpattern = (byte[]) reply.get(1);
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
onPong(strpattern);
} else {
throw new JedisException("Unknown message type: " + firstObj);
}
} while (isSubscribed());
/* Invalidate instance since this thread is no longer listening */
this.client = null;
}
一直往源代码内部进入,可以看到阻塞读数据的过程
RedisPool实例化过程
在RedisPool中可以定义资源的参数。默认参数GenericObjectPoolConfig
如下:
/**
* The default value for the {@code maxTotal} configuration attribute.
* @see GenericObjectPool#getMaxTotal()
*/
public static final int DEFAULT_MAX_TOTAL = 8;
/**
* The default value for the {@code maxIdle} configuration attribute.
* @see GenericObjectPool#getMaxIdle()
*/
public static final int DEFAULT_MAX_IDLE = 8;
/**
* The default value for the {@code minIdle} configuration attribute.
* @see GenericObjectPool#getMinIdle()
*/
public static final int DEFAULT_MIN_IDLE = 0;
最多能获得八个资源,如果申请了八个资源都没有close的话会导致阻塞。