七的博客

Redisson2.0源码分析8-发布订阅

源码分析

Redisson2.0源码分析8-发布订阅

发布订阅机制在 Redisson 中使用也是比较频繁的,特别是一些要进行消息通知的场景,可以理解为就是简单的消息队列。

1. 发布订阅处理

发布订阅处理主要是一些消息的流转以及处理,不是实际的 Redisson 工具类运用。

1.1 发布订阅模式消息对象 PubSubMessage

package org.redisson.client.protocol.pubsub;

public class PubSubMessage<V> implements Message {

    private final String channel;  // 消息所属的 channel ,标识消息的来源或目的
    private final V value;   // 任何类型消息的内容

    public PubSubMessage(String channel, V value) {
        super();
        this.channel = channel;
        this.value = value;
    }

    public String getChannel() {
        return channel;
    }

    public V getValue() {
        return value;
    }

    @Override
    public String toString() {
        return "Message [channel=" + channel + ", value=" + value + "]";
    }

}

1.2 发布订阅消息解码 PubSubMessageDecoder

解码发布/订阅模式中传递的消息。

package org.redisson.client.protocol.pubsub;

import java.io.IOException;
import java.util.List;

import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;

import io.netty.buffer.ByteBuf;

public class PubSubMessageDecoder implements MultiDecoder<Object> {

    private final Decoder<Object> decoder;  // 解码器

    public PubSubMessageDecoder(Decoder<Object> decoder) {
        super();
        this.decoder = decoder;
    }

    @Override
    public Object decode(ByteBuf buf, State state) throws IOException {
        // 解码 ByteBuf 中的数据
        return decoder.decode(buf, null);
    }

    @Override
    public PubSubMessage<Object> decode(List<Object> parts, State state) {
        // 解码多个对象
        // 从 parts 列表中获取 channel 名称和 消息内容
        // 第一个为 channel 名称,第二个为消息内容
        return new PubSubMessage<Object>(parts.get(1).toString(), parts.get(2));
    }

    @Override
    public boolean isApplicable(int paramNum, State state) {
        return true;
    }

}

1.3 基于通配符的发布订阅模式消息对象 PubSubPatternMessage

package org.redisson.client.protocol.pubsub;

public class PubSubPatternMessage implements Message {

    private final String pattern;  // 匹配的模式
    private final String channel;  // channel 名称
    private final Object value;  // 消息内容

    public PubSubPatternMessage(String pattern, String channel, Object value) {
        super();
        this.pattern = pattern;
        this.channel = channel;
        this.value = value;
    }

    public String getPattern() {
        return pattern;
    }

    public String getChannel() {
        return channel;
    }

    public Object getValue() {
        return value;
    }

    @Override
    public String toString() {
        return "PubSubPatternMessage [pattern=" + pattern + ", channel=" + channel + ", value=" + value + "]";
    }

}

1.4 通配符发布订阅模式消息解码器 PubSubPatternMessageDecoder

解码在发布/订阅模式中基于通配符的消息。

package org.redisson.client.protocol.pubsub;

import java.io.IOException;
import java.util.List;

import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;

import io.netty.buffer.ByteBuf;

public class PubSubPatternMessageDecoder implements MultiDecoder<Object> {

    private final Decoder<Object> decoder;

    public PubSubPatternMessageDecoder(Decoder<Object> decoder) {
        super();
        this.decoder = decoder;
    }

    @Override
    public Object decode(ByteBuf buf, State state) throws IOException {
        // 将 ByteBuf 解码成对象数据
        return decoder.decode(buf, null);
    }

    @Override
    public PubSubPatternMessage decode(List<Object> parts, State state) {
        // 从 parts 中获取 pattern 、channel 名称 和消息内容
        // 第一个为模式,第二个为 channel 名称,第三个为实际的消息内容
        return new PubSubPatternMessage(parts.get(1).toString(), parts.get(2).toString(), parts.get(3));
    }

    @Override
    public boolean isApplicable(int paramNum, State state) {
        return true;
    }

}

1.5 状态消息 PubSubStatusMessage

封装发布/订阅模式中传递的状态消息。

package org.redisson.client.protocol.pubsub;

public class PubSubStatusMessage implements Message {

    private final PubSubType type; // 发布订阅类型枚举
    private final String channel; // channel 名称

    public PubSubStatusMessage(PubSubType type, String channel) {
        super();
        this.type = type;
        this.channel = channel;
    }

    public String getChannel() {
        return channel;
    }

    public PubSubType getType() {
        return type;
    }

    @Override
    public String toString() {
        return "PubSubStatusMessage [type=" + type + ", channels=" + channel + "]";
    }

}

1.6 状态消息解码器 PubSubStatusDecoder

解码在发布/订阅模式中传递的状态消息。

package org.redisson.client.protocol.pubsub;

import java.util.List;

import org.redisson.client.handler.State;
import org.redisson.client.protocol.decoder.MultiDecoder;

import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;

public class PubSubStatusDecoder implements MultiDecoder<Object> {

    @Override
    public Object decode(ByteBuf buf, State state) {
        // 直接将 ByteBuf 转换为 UTF-8 编码的字符串
        String status = buf.toString(CharsetUtil.UTF_8);

        // 跳过两个字节回车换行
        buf.skipBytes(2);
        return status;
    }

    @Override
    public PubSubStatusMessage decode(List<Object> parts, State state) {
        // 从 parts 中获取状态类型和 channel 名称
        // 第一个参数为状态类型,第二个参数为 channel 名称
        return new PubSubStatusMessage(PubSubType.valueOf(parts.get(0).toString().toUpperCase()), parts.get(1).toString());
    }

    @Override
    public boolean isApplicable(int paramNum, State state) {
        return true;
    }

}

1.7 发布订阅模式类型枚举 PubSubType

package org.redisson.client.protocol.pubsub;

public enum PubSubType {

    SUBSCRIBE,   // 订阅一个 channel
    PSUBSCRIBE,  // 订阅一个模式匹配的 channel
    PUNSUBSCRIBE,  // 取消订阅一个 channel
    UNSUBSCRIBE   // 取消订阅一个模式匹配的 channel
}

2. 发布订阅工具类

上面一部分是发布订阅机制的封装, Redisson 对发布订阅提供的工具类主要有下面 2 个:

  • RTopic 普通的发布订阅机制工具类
  • RPatternTopic 基于模式匹配的发布订阅工具类

2.1. 发布/订阅 RTopic

实现发布订阅机制,这一块跟 Redisson V1 基本套路差不多。 实现原理就是通过 Redis 的发布订阅机制实现分布式消息传递,使得消息能够在多个节点之间进行广播和监听。

主要方法有: - 发布消息到 channel 。 - 添加消息监听器。 - 移除消息监听器。

package org.redisson;

import java.util.Collections;
import java.util.List;

import org.redisson.client.RedisPubSubListener;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.core.MessageListener;
import org.redisson.core.RTopic;
import org.redisson.core.StatusListener;

import io.netty.util.concurrent.Future;


public class RedissonTopic<M> implements RTopic<M> {

    final CommandExecutor commandExecutor;  // 命令执行器
    private final String name; // topic 的名称

    protected RedissonTopic(CommandExecutor commandExecutor, String name) {
        this.commandExecutor = commandExecutor;
        this.name = name;
    }

    public List<String> getChannelNames() {
        return Collections.singletonList(name);
    }

    // 同步发送消息到 channel 
    @Override
    public long publish(M message) {
        return commandExecutor.get(publishAsync(message));
    }

    // 异步发送消息到 channel 
    @Override
    public Future<Long> publishAsync(M message) {
        // 使用 PUBLISH 命令
        return commandExecutor.writeAsync(name, RedisCommands.PUBLISH, name, message);
    }

    // 添加状态监听器
    @Override
    public int addListener(StatusListener listener) {
        return addListener(new PubSubStatusListener(listener, name));
    };

    // 添加消息监听器
    @Override
    public int addListener(MessageListener<M> listener) {
        PubSubMessageListener<M> pubSubListener = new PubSubMessageListener<M>(listener, name);
        return addListener(pubSubListener);
    }

    // 添加发布订阅监听器
    private int addListener(RedisPubSubListener<M> pubSubListener) {
        PubSubConnectionEntry entry = commandExecutor.getConnectionManager().subscribe(name);
        synchronized (entry) {
            // 如果连接获取才连接
            if (entry.isActive()) {
                entry.addListener(name, pubSubListener);
                return pubSubListener.hashCode();
            }
        }
        // 连接不获取就重新尝试添加
        return addListener(pubSubListener);
    }

    // 移除监听器
    @Override
    public void removeListener(int listenerId) {
        // 没有找到这个监听器直接返回
        PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getEntry(name);
        if (entry == null) {
            return;
        }
        synchronized (entry) {
            // 链接活跃才尝试移除
            if (entry.isActive()) {
                entry.removeListener(name, listenerId);
                // 如果没有监听器了,就直接取消订阅
                if (!entry.hasListeners(name)) {
                    commandExecutor.getConnectionManager().unsubscribe(name);
                }
                return;
            }
        }

        // 连接不活跃重新尝试
        removeListener(listenerId);
    }

}

2.2. 通配符模式发布订阅工具类 RPatternTopic

基于通配符模式的 Topic 机制实现,跟 Redisson V1 版本几乎也是一样。最大的特点就是通配符,可以订阅符合某个模式的多个 channel ,并接收这些 channel 的消息。

跟上面这个类几乎一模一样,没有很大区别。

package org.redisson;

import java.util.Collections;
import java.util.List;

import org.redisson.client.RedisPubSubListener;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.core.PatternMessageListener;
import org.redisson.core.PatternStatusListener;
import org.redisson.core.RPatternTopic;


public class RedissonPatternTopic<M> implements RPatternTopic<M> {

    final CommandExecutor commandExecutor; // 命令执行器
    private final String name;  // topic 名称

    protected RedissonPatternTopic(CommandExecutor commandExecutor, String name) {
        this.commandExecutor = commandExecutor;
        this.name = name;
    }

    @Override
    public int addListener(PatternStatusListener listener) {
        return addListener(new PubSubPatternStatusListener(listener, name));
    };

    @Override
    public int addListener(PatternMessageListener<M> listener) {
        PubSubPatternMessageListener<M> pubSubListener = new PubSubPatternMessageListener<M>(listener, name);
        return addListener(pubSubListener);
    }

    private int addListener(RedisPubSubListener<M> pubSubListener) {
        PubSubConnectionEntry entry = commandExecutor.getConnectionManager().psubscribe(name);
        synchronized (entry) {
            // 连接活跃才添加监听器
            if (entry.isActive()) {
                entry.addListener(name, pubSubListener);
                return pubSubListener.hashCode();
            }
        }
        // 连接不活跃就重试
        return addListener(pubSubListener);
    }

    @Override
    public void removeListener(int listenerId) {
        PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getEntry(name);
        if (entry == null) {
            return;
        }
        synchronized (entry) {
            // 连接活跃才移除监听器            
            if (entry.isActive()) {
                entry.removeListener(name, listenerId);
                if (entry.getListeners(name).isEmpty()) {
                    commandExecutor.getConnectionManager().punsubscribe(name);
                }
                return;
            }
        }

        //  
        removeListener(listenerId);
    }

    // 获取包含 topic 名称的列表
    @Override
    public List<String> getPatternNames() {
        return Collections.singletonList(name);
    }

}