Redisson2.0源码分析8-发布订阅
Redisson2.0源码分析8-发布订阅
发布订阅机制在 Redisson 中使用也是比较频繁的,特别是一些要进行消息通知的场景,可以理解为就是简单的消息队列。
1. 发布订阅处理
发布订阅处理主要是一些消息的流转以及处理,不是实际的 Redisson 工具类运用。
1.1 发布订阅模式消息对象 PubSubMessage
package org.redisson.client.protocol.pubsub;
public class PubSubMessage<V> implements Message {
private final String channel; // 消息所属的 channel ,标识消息的来源或目的
private final V value; // 任何类型消息的内容
public PubSubMessage(String channel, V value) {
super();
this.channel = channel;
this.value = value;
}
public String getChannel() {
return channel;
}
public V getValue() {
return value;
}
@Override
public String toString() {
return "Message [channel=" + channel + ", value=" + value + "]";
}
}
1.2 发布订阅消息解码 PubSubMessageDecoder
解码发布/订阅模式中传递的消息。
package org.redisson.client.protocol.pubsub;
import java.io.IOException;
import java.util.List;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import io.netty.buffer.ByteBuf;
public class PubSubMessageDecoder implements MultiDecoder<Object> {
private final Decoder<Object> decoder; // 解码器
public PubSubMessageDecoder(Decoder<Object> decoder) {
super();
this.decoder = decoder;
}
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
// 解码 ByteBuf 中的数据
return decoder.decode(buf, null);
}
@Override
public PubSubMessage<Object> decode(List<Object> parts, State state) {
// 解码多个对象
// 从 parts 列表中获取 channel 名称和 消息内容
// 第一个为 channel 名称,第二个为消息内容
return new PubSubMessage<Object>(parts.get(1).toString(), parts.get(2));
}
@Override
public boolean isApplicable(int paramNum, State state) {
return true;
}
}
1.3 基于通配符的发布订阅模式消息对象 PubSubPatternMessage
package org.redisson.client.protocol.pubsub;
public class PubSubPatternMessage implements Message {
private final String pattern; // 匹配的模式
private final String channel; // channel 名称
private final Object value; // 消息内容
public PubSubPatternMessage(String pattern, String channel, Object value) {
super();
this.pattern = pattern;
this.channel = channel;
this.value = value;
}
public String getPattern() {
return pattern;
}
public String getChannel() {
return channel;
}
public Object getValue() {
return value;
}
@Override
public String toString() {
return "PubSubPatternMessage [pattern=" + pattern + ", channel=" + channel + ", value=" + value + "]";
}
}
1.4 通配符发布订阅模式消息解码器 PubSubPatternMessageDecoder
解码在发布/订阅模式中基于通配符的消息。
package org.redisson.client.protocol.pubsub;
import java.io.IOException;
import java.util.List;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import io.netty.buffer.ByteBuf;
public class PubSubPatternMessageDecoder implements MultiDecoder<Object> {
private final Decoder<Object> decoder;
public PubSubPatternMessageDecoder(Decoder<Object> decoder) {
super();
this.decoder = decoder;
}
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
// 将 ByteBuf 解码成对象数据
return decoder.decode(buf, null);
}
@Override
public PubSubPatternMessage decode(List<Object> parts, State state) {
// 从 parts 中获取 pattern 、channel 名称 和消息内容
// 第一个为模式,第二个为 channel 名称,第三个为实际的消息内容
return new PubSubPatternMessage(parts.get(1).toString(), parts.get(2).toString(), parts.get(3));
}
@Override
public boolean isApplicable(int paramNum, State state) {
return true;
}
}
1.5 状态消息 PubSubStatusMessage
封装发布/订阅模式中传递的状态消息。
package org.redisson.client.protocol.pubsub;
public class PubSubStatusMessage implements Message {
private final PubSubType type; // 发布订阅类型枚举
private final String channel; // channel 名称
public PubSubStatusMessage(PubSubType type, String channel) {
super();
this.type = type;
this.channel = channel;
}
public String getChannel() {
return channel;
}
public PubSubType getType() {
return type;
}
@Override
public String toString() {
return "PubSubStatusMessage [type=" + type + ", channels=" + channel + "]";
}
}
1.6 状态消息解码器 PubSubStatusDecoder
解码在发布/订阅模式中传递的状态消息。
package org.redisson.client.protocol.pubsub;
import java.util.List;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.decoder.MultiDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class PubSubStatusDecoder implements MultiDecoder<Object> {
@Override
public Object decode(ByteBuf buf, State state) {
// 直接将 ByteBuf 转换为 UTF-8 编码的字符串
String status = buf.toString(CharsetUtil.UTF_8);
// 跳过两个字节回车换行
buf.skipBytes(2);
return status;
}
@Override
public PubSubStatusMessage decode(List<Object> parts, State state) {
// 从 parts 中获取状态类型和 channel 名称
// 第一个参数为状态类型,第二个参数为 channel 名称
return new PubSubStatusMessage(PubSubType.valueOf(parts.get(0).toString().toUpperCase()), parts.get(1).toString());
}
@Override
public boolean isApplicable(int paramNum, State state) {
return true;
}
}
1.7 发布订阅模式类型枚举 PubSubType
package org.redisson.client.protocol.pubsub;
public enum PubSubType {
SUBSCRIBE, // 订阅一个 channel
PSUBSCRIBE, // 订阅一个模式匹配的 channel
PUNSUBSCRIBE, // 取消订阅一个 channel
UNSUBSCRIBE // 取消订阅一个模式匹配的 channel
}
2. 发布订阅工具类
上面一部分是发布订阅机制的封装, Redisson 对发布订阅提供的工具类主要有下面 2 个:
- RTopic 普通的发布订阅机制工具类
- RPatternTopic 基于模式匹配的发布订阅工具类
2.1. 发布/订阅 RTopic
实现发布订阅机制,这一块跟 Redisson V1 基本套路差不多。 实现原理就是通过 Redis 的发布订阅机制实现分布式消息传递,使得消息能够在多个节点之间进行广播和监听。
主要方法有: - 发布消息到 channel 。 - 添加消息监听器。 - 移除消息监听器。
package org.redisson;
import java.util.Collections;
import java.util.List;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.core.MessageListener;
import org.redisson.core.RTopic;
import org.redisson.core.StatusListener;
import io.netty.util.concurrent.Future;
public class RedissonTopic<M> implements RTopic<M> {
final CommandExecutor commandExecutor; // 命令执行器
private final String name; // topic 的名称
protected RedissonTopic(CommandExecutor commandExecutor, String name) {
this.commandExecutor = commandExecutor;
this.name = name;
}
public List<String> getChannelNames() {
return Collections.singletonList(name);
}
// 同步发送消息到 channel
@Override
public long publish(M message) {
return commandExecutor.get(publishAsync(message));
}
// 异步发送消息到 channel
@Override
public Future<Long> publishAsync(M message) {
// 使用 PUBLISH 命令
return commandExecutor.writeAsync(name, RedisCommands.PUBLISH, name, message);
}
// 添加状态监听器
@Override
public int addListener(StatusListener listener) {
return addListener(new PubSubStatusListener(listener, name));
};
// 添加消息监听器
@Override
public int addListener(MessageListener<M> listener) {
PubSubMessageListener<M> pubSubListener = new PubSubMessageListener<M>(listener, name);
return addListener(pubSubListener);
}
// 添加发布订阅监听器
private int addListener(RedisPubSubListener<M> pubSubListener) {
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().subscribe(name);
synchronized (entry) {
// 如果连接获取才连接
if (entry.isActive()) {
entry.addListener(name, pubSubListener);
return pubSubListener.hashCode();
}
}
// 连接不获取就重新尝试添加
return addListener(pubSubListener);
}
// 移除监听器
@Override
public void removeListener(int listenerId) {
// 没有找到这个监听器直接返回
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getEntry(name);
if (entry == null) {
return;
}
synchronized (entry) {
// 链接活跃才尝试移除
if (entry.isActive()) {
entry.removeListener(name, listenerId);
// 如果没有监听器了,就直接取消订阅
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().unsubscribe(name);
}
return;
}
}
// 连接不活跃重新尝试
removeListener(listenerId);
}
}
2.2. 通配符模式发布订阅工具类 RPatternTopic
基于通配符模式的 Topic 机制实现,跟 Redisson V1 版本几乎也是一样。最大的特点就是通配符,可以订阅符合某个模式的多个 channel ,并接收这些 channel 的消息。
跟上面这个类几乎一模一样,没有很大区别。
package org.redisson;
import java.util.Collections;
import java.util.List;
import org.redisson.client.RedisPubSubListener;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.core.PatternMessageListener;
import org.redisson.core.PatternStatusListener;
import org.redisson.core.RPatternTopic;
public class RedissonPatternTopic<M> implements RPatternTopic<M> {
final CommandExecutor commandExecutor; // 命令执行器
private final String name; // topic 名称
protected RedissonPatternTopic(CommandExecutor commandExecutor, String name) {
this.commandExecutor = commandExecutor;
this.name = name;
}
@Override
public int addListener(PatternStatusListener listener) {
return addListener(new PubSubPatternStatusListener(listener, name));
};
@Override
public int addListener(PatternMessageListener<M> listener) {
PubSubPatternMessageListener<M> pubSubListener = new PubSubPatternMessageListener<M>(listener, name);
return addListener(pubSubListener);
}
private int addListener(RedisPubSubListener<M> pubSubListener) {
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().psubscribe(name);
synchronized (entry) {
// 连接活跃才添加监听器
if (entry.isActive()) {
entry.addListener(name, pubSubListener);
return pubSubListener.hashCode();
}
}
// 连接不活跃就重试
return addListener(pubSubListener);
}
@Override
public void removeListener(int listenerId) {
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getEntry(name);
if (entry == null) {
return;
}
synchronized (entry) {
// 连接活跃才移除监听器
if (entry.isActive()) {
entry.removeListener(name, listenerId);
if (entry.getListeners(name).isEmpty()) {
commandExecutor.getConnectionManager().punsubscribe(name);
}
return;
}
}
//
removeListener(listenerId);
}
// 获取包含 topic 名称的列表
@Override
public List<String> getPatternNames() {
return Collections.singletonList(name);
}
}