Posts /

Redis学习(三)

18 Aug 2020

Redis学习(三)


实战(二)

参考:

redis in action 学习笔记系列

redis in action

https://github.com/josiahcarlson/redis-in-action

Jedis

maven项目导入依赖

<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.0.1</version>
</dependency>

发布订阅模式实践

通过Redis完成两个平台进行消息发布订阅的功能。

继承JedisPubSub类重载为自己的订阅方法

本例中我们实现一个订阅器RedisSubscriber,对发布时不进行重载

package com.maple.demo_for_springfox.test_for_jedis.service;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;

/**
 * @author Chenyufeng
 * @title: RedisSubscriber
 * @projectName demo_for_spring
 * @description: TODO
 * @date 2020/8/18 11:01
 */
public class RedisSubscriber extends JedisPubSub {
    @Override
    public void subscribe(String... channels) {
        super.subscribe(channels);
    }
    @Override
    // 当发起订阅的时候会执行如下方法
    public void onSubscribe(String channel, int subscribedChannels) {
        super.onSubscribe(channel, subscribedChannels);
        System.out.println(String.format("订阅消息:频道%s , 频道数 %d", channel, subscribedChannels));
    }
    @Override
    // 当有消息接收的时候会执行如下方法
    public void onMessage(String channel, String message) {
        super.onMessage(channel, message);
        System.out.println(String.format("消费消息:频道%s ,消息:%s", channel, message));
    }
}

启用订阅端线程

通过Jedis对象来进行订阅消息的时候,会阻塞等待,因此我们的采用开辟线程的方式来接收消息!

关于阻塞之处:Redis接收数据

Jedis jedis = jedisPool.getResource();
jedis.subscribe(redisSubscriber, "ADD_ORG");
jedis.close();

接收/订阅端线程

    class mSub implements Runnable {
        private final JedisPool jedisPool;
        private String channelRedis;
        private RedisSubscriber redisSubscriber;

        public mSub(JedisPool jedisPool, String channelRedis, RedisSubscriber subscriber) {
            this.jedisPool = jedisPool;
            this.channelRedis = channelRedis;
            this.redisSubscriber = subscriber;
        }

        @Override
        public void run() {
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                jedis.subscribe(redisSubscriber, channelRedis);

            } catch (Exception e) {
                System.out.println("订阅失败!");
            } finally {
                if (jedis != null) {
                    jedis.close();
                }
            }
        }
    }

消息发布

这里需注意,jedis2进行完发布之后需要进行close(),否则会因无法获得资源而阻塞。RedisPool资源定义

    public static void main(String[] args) {
        JedisPool jedisPool = new JedisPool("172.18.21.79");

        RedisSubscriber redisSubscriber = new RedisSubscriber();

        mSub sub = new TestJedis().new mSub(jedisPool, "ADD_ORG", redisSubscriber);
        Thread thread = new Thread(sub);
        thread.start();

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
//            System.out.println("mSub线程状态:" + thread.getState());
            String msg = scanner.next();

            System.out.println("new Message: " + msg);
            
            Jedis jedis2 = jedisPool.getResource();
            jedis2.publish("ADD_ORG", msg);
            jedis2.close(); //不关闭会导致之后无法获得jedis资源。
        }
    }

附录:

客户端处理接收消息过程:

Jedis在订阅数据后客户端的处理流程如下:

List<Object> reply = client.getRawObjectMultiBulkReply();语句将会一直等待发布消息的到来。

private void process(Client client) {

    do {
      List<Object> reply = client.getRawObjectMultiBulkReply();
      final Object firstObj = reply.get(0);
      if (!(firstObj instanceof byte[])) {
        throw new JedisException("Unknown message type: " + firstObj);
      }
      final byte[] resp = (byte[]) firstObj;
      if (Arrays.equals(SUBSCRIBE.raw, resp)) {
        subscribedChannels = ((Long) reply.get(2)).intValue();
        final byte[] bchannel = (byte[]) reply.get(1);
        final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
        onSubscribe(strchannel, subscribedChannels);
      } else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) {
        subscribedChannels = ((Long) reply.get(2)).intValue();
        final byte[] bchannel = (byte[]) reply.get(1);
        final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
        onUnsubscribe(strchannel, subscribedChannels);
      } else if (Arrays.equals(MESSAGE.raw, resp)) {
        final byte[] bchannel = (byte[]) reply.get(1);
        final byte[] bmesg = (byte[]) reply.get(2);
        final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
        final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
        onMessage(strchannel, strmesg);
      } else if (Arrays.equals(PMESSAGE.raw, resp)) {
        final byte[] bpattern = (byte[]) reply.get(1);
        final byte[] bchannel = (byte[]) reply.get(2);
        final byte[] bmesg = (byte[]) reply.get(3);
        final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
        final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
        final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
        onPMessage(strpattern, strchannel, strmesg);
      } else if (Arrays.equals(PSUBSCRIBE.raw, resp)) {
        subscribedChannels = ((Long) reply.get(2)).intValue();
        final byte[] bpattern = (byte[]) reply.get(1);
        final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
        onPSubscribe(strpattern, subscribedChannels);
      } else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) {
        subscribedChannels = ((Long) reply.get(2)).intValue();
        final byte[] bpattern = (byte[]) reply.get(1);
        final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
        onPUnsubscribe(strpattern, subscribedChannels);
      } else if (Arrays.equals(PONG.raw, resp)) {
        final byte[] bpattern = (byte[]) reply.get(1);
        final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
        onPong(strpattern);
      } else {
        throw new JedisException("Unknown message type: " + firstObj);
      }
    } while (isSubscribed());

    /* Invalidate instance since this thread is no longer listening */
    this.client = null;
  }

一直往源代码内部进入,可以看到阻塞读数据的过程

1597732365444

资源池配置:

RedisPool实例化过程

1597732998397

在RedisPool中可以定义资源的参数。默认参数GenericObjectPoolConfig如下:

    /**
     * The default value for the {@code maxTotal} configuration attribute.
     * @see GenericObjectPool#getMaxTotal()
     */
    public static final int DEFAULT_MAX_TOTAL = 8;

    /**
     * The default value for the {@code maxIdle} configuration attribute.
     * @see GenericObjectPool#getMaxIdle()
     */
    public static final int DEFAULT_MAX_IDLE = 8;

    /**
     * The default value for the {@code minIdle} configuration attribute.
     * @see GenericObjectPool#getMinIdle()
     */
    public static final int DEFAULT_MIN_IDLE = 0;

最多能获得八个资源,如果申请了八个资源都没有close的话会导致阻塞。