Redisson2.0源码分析1-开篇以及Redis协议编解码
Redisson2.0源码分析1-开篇以及Redis协议编解码
之前简单分析过 Redisson V1 版本的一部分源码,看 changelog 发现 Redisson V2 版本有做过一次比较大的改动。这次就动手看看 V2 版本到底改动了哪些。
老样子,还是拉取 V2 版本发布的版本附近的源码。
1. 拉取指定版本源码分析
查看项目发布日志 changelog ,找到 2.0 版本的更新日志:
Starting from 2.0.0 version Redisson has a new own async and lock-free Redis client under the hood. Thanks to the new architecture pipline (command batches) support has been implemented and a lot of code has gone.
Feature - new RObject methods: move, moveAsync, migrate, migrateAsync Feature - new async interfaces: RAsyncMap, RAtomicLongAsync, RBlockingQueueAsync, RCollectionAsync, RDequeAsync, RExpirableAsync, RHyperLogLogAsync, RListAsync, RObjectAsync, RQueueAsync, RScriptAsync, RSetAsync, RTopicAsync Feature - multiple commands batch (Redis pipelining) support via Redisson.createBatch method Feature - new methods flushall, deleteAsync, delete, deleteByPatternAsync, deleteByPattern, findKeysByPatternAsync, findKeysByPattern added to RedissonClient interface Improvement - closed channel detection speedup
这个版本可能还有些 Bug 没有完全修复,所以又找到了 2.1 版本。
15-Aug-2015 - version 2.1.1 released
Feature - all keys operations extracted toRKeys
interface
Feature -RKeys.getKeys
,RKeys.getKeysByPattern
andRKeys.randomKey
methods added
Feature -RBlockingQueueAsync.drainToAsync
method added
Feature - Redis nodes info and ping operations viaRedisson.getNodesGroup
orRedisson.getClusterNodesGroup
now available
Improvement - added sentinel nodes discovery
Fixed - command encoding errors handling
Fixed - cluster empty slot handling
Fixed - connection hangs when there are no slaves in sentinel mode
Fixed - activate master as slave when there are no more available slaves in sentinel mode
Fixed - skip disconnected sentinels during startup
Fixed - slave node discovery in sentinel mode which has been disconnected since start
Deprecated - Redisson methodsdeleteAsync
,delete
,deleteByPatternAsync
,deleteByPattern
,findKeysByPatternAsync
,findKeysByPattern
. Use same methods withRKeys
interface
对应的 Git commit id 是 cd3032ac, 直接回滚到这个版本即可。
git reset --hard cd3032ac
2. 版本变更原因
这个版本的变化主要是有人在 issue 提到 lettuce 源码相关的问题,比如下面几个:
lettuce 移除记录 18febb69 lettuce sources removed. #183 https://github.com/redisson/redisson/issues/183
lettuce 被 fork 到 redis 仓库 https://github.com/redisson/redisson/issues/20
lettuce 太久没有去维护了,导致作者自己也想重新写一套 Redis 连接以及编解码相关的代码,这样更符合 Redisson 自己的用法。
3. V2版本代码结构
回退到指定版本后的代码结构如下:
.
└── org
└── redisson
├── BaseConfig.java // Redisson 配置类基类
├── BaseMasterSlaveServersConfig.java // 主从配置基类
├── ClusterServersConfig.java // 集群配置基类
├── CommandBatchExecutorService.java // 批量命令操作实现
├── CommandExecutor.java // Redis 命令执行器接口
├── CommandExecutorService.java // Redis 命令执行器实现
├── Config.java // Redisson 配置管理类
├── MasterSlaveServersConfig.java // 主从配置
├── PubSubMessageListener.java // 发布订阅监听器实现
├── PubSubPatternMessageListener.java // // 发布订阅模式消息监听器实现
├── PubSubPatternStatusListener.java // // // 发布订阅模式状态监听器实现
├── PubSubStatusListener.java // 发布订阅状态监听器实现
├── RedisNodes.java // Redis 集群节点
├── Redisson.java // Redisson 核心类
├── RedissonAtomicLong.java
├── RedissonBatch.java
├── RedissonBlockingQueue.java
├── RedissonBucket.java
├── RedissonClient.java
├── RedissonCountDownLatch.java
├── RedissonCountDownLatchEntry.java
├── RedissonDeque.java
├── RedissonExpirable.java
├── RedissonHyperLogLog.java
├── RedissonKeys.java
├── RedissonList.java
├── RedissonLock.java
├── RedissonLockEntry.java
├── RedissonMap.java
├── RedissonMapEntry.java
├── RedissonObject.java
├── RedissonPatternTopic.java
├── RedissonQueue.java
├── RedissonScript.java
├── RedissonSet.java
├── RedissonSortedSet.java
├── RedissonSubSortedSet.java
├── RedissonTopic.java
├── SentinelServersConfig.java
├── SingleServerConfig.java
├── SlotCallback.java
├── SyncOperation.java
├── client // 处理与 Redis 的客户端通信
│ ├── BaseRedisPubSubListener.java
│ ├── OneShotPubSubListener.java
│ ├── RedisClient.java
│ ├── RedisConnection.java
│ ├── RedisConnectionException.java
│ ├── RedisEmptySlotException.java
│ ├── RedisException.java
│ ├── RedisMovedException.java
│ ├── RedisPubSubConnection.java
│ ├── RedisPubSubListener.java
│ ├── RedisTimeoutException.java
│ ├── WriteRedisConnectionException.java
│ ├── codec // 提供用于数据类型的编码和解码的类
│ │ ├── Codec.java
│ │ ├── LongCodec.java
│ │ └── StringCodec.java
│ ├── handler // Redis 命令的编码/解码
│ │ ├── CommandDecoder.java
│ │ ├── CommandEncoder.java
│ │ ├── CommandsListEncoder.java
│ │ ├── CommandsQueue.java
│ │ ├── ConnectionWatchdog.java
│ │ └── State.java
│ └── protocol // 处理 Redis 协议、数据处理
│ ├── CommandData.java
│ ├── CommandsData.java
│ ├── Decoder.java
│ ├── Encoder.java
│ ├── QueueCommand.java
│ ├── QueueCommandHolder.java
│ ├── RedisCommand.java
│ ├── RedisCommands.java
│ ├── RedisStrictCommand.java
│ ├── StringParamsEncoder.java
│ ├── convertor // Redis 响应的数据转换
│ │ ├── BooleanAmountReplayConvertor.java
│ │ ├── BooleanNumberReplayConvertor.java
│ │ ├── BooleanReplayConvertor.java
│ │ ├── Convertor.java
│ │ ├── EmptyConvertor.java
│ │ ├── IntegerReplayConvertor.java
│ │ ├── KeyValueConvertor.java
│ │ ├── LongReplayConvertor.java
│ │ ├── NumberConvertor.java
│ │ ├── SingleConvertor.java
│ │ ├── TrueReplayConvertor.java
│ │ └── VoidReplayConvertor.java
│ ├── decoder // Redis 响应编码
│ │ ├── KeyValueMessage.java
│ │ ├── KeyValueObjectDecoder.java
│ │ ├── ListScanResult.java
│ │ ├── ListScanResultReplayDecoder.java
│ │ ├── MapScanResult.java
│ │ ├── MapScanResultReplayDecoder.java
│ │ ├── MultiDecoder.java
│ │ ├── NestedMultiDecoder.java
│ │ ├── ObjectListReplayDecoder.java
│ │ ├── ObjectMapReplayDecoder.java
│ │ ├── ObjectSetReplayDecoder.java
│ │ ├── StringDataDecoder.java
│ │ ├── StringListReplayDecoder.java
│ │ ├── StringMapDataDecoder.java
│ │ ├── StringMapReplayDecoder.java
│ │ ├── StringObjectDecoder.java
│ │ └── StringReplayDecoder.java
│ └── pubsub // 发布订阅相关、包括编解码等
│ ├── Message.java
│ ├── PubSubMessage.java
│ ├── PubSubMessageDecoder.java
│ ├── PubSubPatternMessage.java
│ ├── PubSubPatternMessageDecoder.java
│ ├── PubSubStatusDecoder.java
│ ├── PubSubStatusMessage.java
│ └── PubSubType.java
├── codec // 提供不同的序列化策略,例如 JSON, Kryo
│ ├── JsonJacksonCodec.java
│ ├── KryoCodec.java
│ └── SerializationCodec.java
├── connection // Redis 连接管理
│ ├── BaseLoadBalancer.java
│ ├── CRC16.java
│ ├── ClusterConnectionManager.java
│ ├── ClusterNodeInfo.java
│ ├── ClusterPartition.java
│ ├── ConnectionEntry.java
│ ├── ConnectionManager.java // 连接管理,包括集群和主从配置
│ ├── LoadBalancer.java // 负载均衡策略接口
│ ├── MasterSlaveConnectionManager.java
│ ├── MasterSlaveEntry.java
│ ├── PubSubConnectionEntry.java
│ ├── RandomLoadBalancer.java
│ ├── RedisClientEntry.java
│ ├── RoundRobinLoadBalancer.java
│ ├── SentinelConnectionManager.java
│ ├── SingleConnectionManager.java
│ ├── SingleEntry.java
│ ├── SubscribesConnectionEntry.java
│ └── decoder // 解码 Redis List 数据
│ ├── ListDrainToDecoder.java
│ └── ListFirstObjectDecoder.java
├── core // 分布式数据结构和操作的接口定义
│ ├── BasePatternStatusListener.java
│ ├── BaseStatusListener.java
│ ├── ClusterNode.java
│ ├── MessageListener.java
│ ├── Node.java
│ ├── NodeListener.java
│ ├── NodesGroup.java
│ ├── PatternMessageListener.java
│ ├── PatternStatusListener.java
│ ├── Predicate.java
│ ├── RAtomicLong.java
│ ├── RAtomicLongAsync.java
│ ├── RBatch.java
│ ├── RBlockingQueue.java
│ ├── RBlockingQueueAsync.java
│ ├── RBucket.java
│ ├── RBucketAsync.java
│ ├── RCollectionAsync.java
│ ├── RCountDownLatch.java
│ ├── RDeque.java
│ ├── RDequeAsync.java
│ ├── RExpirable.java
│ ├── RExpirableAsync.java
│ ├── RHyperLogLog.java
│ ├── RHyperLogLogAsync.java
│ ├── RKeys.java
│ ├── RKeysAsync.java
│ ├── RList.java
│ ├── RListAsync.java
│ ├── RLock.java
│ ├── RMap.java
│ ├── RMapAsync.java
│ ├── RObject.java
│ ├── RObjectAsync.java
│ ├── RPatternTopic.java
│ ├── RQueue.java
│ ├── RQueueAsync.java
│ ├── RScript.java
│ ├── RScriptAsync.java
│ ├── RSet.java
│ ├── RSetAsync.java
│ ├── RSortedSet.java
│ ├── RTopic.java
│ ├── RTopicAsync.java
│ └── StatusListener.java
└── misc // 杂项,一些工具类
├── CompositeIterable.java
├── InfinitySemaphoreLatch.java
├── ReclosableLatch.java
└── URIBuilder.java
14 directories, 182 files
可以看到,之前 Lettuce 的代码以及完全移除了,这块的代码肯定 Redisson 需要完全把类似的机制都实现一遍。
4. 分析思路
根据上面的代码结构以及以往分析 Redisson V1 版本源码的套路,直接按下如下几个步骤进行分析: - Redis 协议编解码。这一块由于 lettuce 的代码被移除了,肯定重新写了这一部分代码。 - Redis 连接的维护管理。 包括单机、集群、主从、哨兵等等。 - Redisson 分布式对象的实现。 这一块算是核心源码。
稍微看了下这个版本的代码多了很多,在一篇里面全部写完将会很长很长,所以将会合理的拆分出多篇来,具体拆分几篇就边写边拆。
5.协议编解码部分源码分析
首先肯定还是先看 Redis 协议编解码这一块,跟之前 Lettuce 很有可能理念会不太一样。总体上应该套路会差不多,因为 Lettuce 本身就是异步才跟 Jedis 有比较明显的差距,所以 Redisson 这一块也会继续保持一致的做法。
5.1 编解码接口定义
5.1.1 Redis数据编解码器接口 Codec
接口主要是定义获取 Redis 数据编解码,提供了针对 Hash 以及其他 Redis 数据结构的编解码。
package org.redisson.client.codec;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
public interface Codec {
// 获取 Redis Hash value 解码器
Decoder<Object> getMapValueDecoder();
// 获取 Redis Hash value 编码器
Encoder getMapValueEncoder();
// 获取 Redis Hash value 解码器
Decoder<Object> getMapKeyDecoder();
// 获取 Redis Hash key 编码器
Encoder getMapKeyEncoder();
// 获取 value 解码器
Decoder<Object> getValueDecoder();
// 获取 value 编码器
Encoder getValueEncoder();
}
5.1.2 字符串类型编解码实现 StringCodec
这个类主要是提供对字符串数据进行编解码,使用UTF-8字符集来处理字符串的转换。
在之前 lettuce 代码中,存在几乎一模一样的接口实现。 不过 lettuce 是依赖于 JDK 的 ByteBuffer ,Redisson 这个版本是直接使用 Netty 的 ByteBuf。
package org.redisson.client.codec;
import java.io.IOException;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class StringCodec implements Codec {
public static final StringCodec INSTANCE = new StringCodec(); // 单例模式
@Override
public Decoder<Object> getValueDecoder() {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) {
// 将 ByteBuf 中的字节数据转换为 UTF-8 编码的字符串并返回
return buf.toString(CharsetUtil.UTF_8);
}
};
}
@Override
public Encoder getValueEncoder() {
return new Encoder() {
@Override
public byte[] encode(Object in) throws IOException {
// 将输入对象转换为字符串,并以 UTF-8 编码成字节数组形式返回。
return in.toString().getBytes("UTF-8");
}
};
}
@Override
public Decoder<Object> getMapValueDecoder() {
return getValueDecoder();
}
@Override
public Encoder getMapValueEncoder() {
return getValueEncoder();
}
@Override
public Decoder<Object> getMapKeyDecoder() {
return getValueDecoder();
}
@Override
public Encoder getMapKeyEncoder() {
return getValueEncoder();
}
}
5.1.3 JSON 编解码实现 JsonJacksonCodec
这个是 JSON 编解码的实现,可以将 Java 对象编码成 JSON , JSON 解码为 Java对象。 使用的 JSON 库是 Jackson ,在使用 Redisson 中也是一种比较常用的编解码器。
package org.redisson.codec;
import java.io.IOException;
import org.redisson.client.codec.Codec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectMapper.DefaultTypeResolverBuilder;
import com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.jsontype.TypeResolverBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
public class JsonJacksonCodec implements Codec {
private ObjectMapper mapObjectMapper = new ObjectMapper();
public JsonJacksonCodec() {
init(mapObjectMapper);
// 这里主要是配置 Jackson 序列化以及反序列化过程中类型信息处理,不用太纠结具体内容
TypeResolverBuilder<?> mapTyper = new DefaultTypeResolverBuilder(DefaultTyping.NON_FINAL) {
public boolean useForType(JavaType t)
{
switch (_appliesFor) {
case NON_CONCRETE_AND_ARRAYS:
while (t.isArrayType()) {
t = t.getContentType();
}
// fall through
case OBJECT_AND_NON_CONCRETE:
return (t.getRawClass() == Object.class) || !t.isConcrete();
case NON_FINAL:
while (t.isArrayType()) {
t = t.getContentType();
}
// to fix problem with wrong long to int conversion
if (t.getRawClass() == Long.class) {
return true;
}
return !t.isFinal(); // includes Object.class
default:
//case JAVA_LANG_OBJECT:
return (t.getRawClass() == Object.class);
}
}
};
mapTyper.init(JsonTypeInfo.Id.CLASS, null);
mapTyper.inclusion(JsonTypeInfo.As.PROPERTY);
mapObjectMapper.setDefaultTyping(mapTyper);
}
// 配置 ObjectMapper 的序列化、反序列化等特性
protected void init(ObjectMapper objectMapper) {
// 序列化忽略 null 值
objectMapper.setSerializationInclusion(Include.NON_NULL); objectMapper.setVisibilityChecker(objectMapper.getSerializationConfig().getDefaultVisibilityChecker()
.withFieldVisibility(JsonAutoDetect.Visibility.ANY) // 字段可见性设置为任意
.withGetterVisibility(JsonAutoDetect.Visibility.NONE) // Getter方法不可见
.withSetterVisibility(JsonAutoDetect.Visibility.NONE) // Setter方法不可
.withCreatorVisibility(JsonAutoDetect.Visibility.NONE)); // 构造器不可见
// 反序列化时忽略未知属性
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
// 序列化BigDecimal时不使用科学计数法
objectMapper.configure(SerializationFeature.WRITE_BIGDECIMAL_AS_PLAIN, true);
// 按字母顺序排序属性
objectMapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
}
@Override
public Decoder<Object> getMapValueDecoder() {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
// 将 JSON 字节流转成 Java 对象
return mapObjectMapper.readValue(new ByteBufInputStream(buf), Object.class);
}
};
}
@Override
public Encoder getMapValueEncoder() {
return new Encoder() {
@Override
public byte[] encode(Object in) throws IOException {
// 将Java对象序列化成 JSON 字节流
return mapObjectMapper.writeValueAsBytes(in);
}
};
}
@Override
public Decoder<Object> getMapKeyDecoder() {
return getMapValueDecoder();
}
@Override
public Encoder getMapKeyEncoder() {
return getMapValueEncoder();
}
@Override
public Decoder<Object> getValueDecoder() {
return getMapValueDecoder();
}
@Override
public Encoder getValueEncoder() {
return getMapValueEncoder();
}
}
5.2 Redis命令处理器
Redis命令处理器主要包含如下内容:
- 将 Java 命令对象转化为 Redis 协议报文。
- 将 Redis 应答的报文解析成 Java 对象。
- 处理 pipeline 命令。
- 命令队列的处理。
这里重点需要注意一个知识点:
Redis 协议保证了命令和响应的顺序性,响应的顺序与发送命令的顺序是一致的。发送命令的时候,会将命令按顺序放入队列中,然后服务器也会按照相同的顺序返回响应。
理解这点对于下面代码的理解很重要,这里可以通过一张图大致了解这几个类的关联关系:
5.2.1 Redis 命令队列 CommandsQueue
负责管理和发送命令队列。
package org.redisson.client.handler;
import java.util.List;
import java.util.Queue;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.QueueCommandHolder;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.AttributeKey;
import io.netty.util.internal.PlatformDependent;
public class CommandsQueue extends ChannelDuplexHandler {
// 用于在 Channel 中存储 QueueCommand 对象
public static final AttributeKey<QueueCommand> REPLAY = AttributeKey.valueOf("promise");
// netty 提供的并发队列实现
// 这个队列的特点是多生产者,单个消费者
private final Queue<QueueCommandHolder> queue = PlatformDependent.newMpscQueue();
// 发送下一条命令,这里内部会调用这个方法,外部也会调用这个方法
public void sendNextCommand(ChannelHandlerContext ctx) throws Exception {
// Channel 的属性中移除已经处理的命令,这样才不会搞混
ctx.channel().attr(CommandsQueue.REPLAY).remove();
// 队列中移除当前命令
queue.poll();
// 发送数据
sendData(ctx);
}
// write 方法负责将命令写入队列
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// 判断消息是否为 QueueCommand 类型
if (msg instanceof QueueCommand) {
QueueCommand data = (QueueCommand) msg;
// 检查队列中的第一个命令跟当前命令是不是一样,一样就直接发送
if (queue.peek() != null && queue.peek().getCommand() == data) {
super.write(ctx, msg, promise);
} else {
// 命令不一样就放入队列并发送数据
queue.add(new QueueCommandHolder(data, promise));
sendData(ctx);
}
} else {
// 不是 QueueCommand 类型直接发送出去
super.write(ctx, msg, promise);
}
}
// 实际负责发送命令的方法
private void sendData(final ChannelHandlerContext ctx) throws Exception {
// 获取队列中的第一个命令
QueueCommandHolder command = queue.peek();
// 命令不在发送中或者没有发送过才处理
if (command != null && command.getSended().compareAndSet(false, true)) {
QueueCommand data = command.getCommand();
// 获取发布/订阅操作
List<CommandData<Object, Object>> pubSubOps = data.getPubSubOperations();
if (!pubSubOps.isEmpty()) {
// 存在发布/订阅操作,调用解码器处理
for (CommandData<Object, Object> cd : pubSubOps) {
for (Object channel : cd.getParams()) {
ctx.pipeline().get(CommandDecoder.class).addChannel(channel.toString(), cd);
}
}
} else {
// 注意这里!!!
// 这里将当前命令设置为 channel 的一个属性,这个拿到响应时才知道对应哪条命令。
ctx.channel().attr(REPLAY).set(data);
}
// 天监听器处理操作完成逻辑
command.getChannelPromise().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 命令发送不成功,会触发发送下一条命令
if (!future.isSuccess()) {
// 操作不成功,发送下一个命令
sendNextCommand(ctx);
}
}
});
// 写入数据
ctx.channel().writeAndFlush(data, command.getChannelPromise());
}
}
}
总结下这个类的逻辑:
- CommandsQueue 主要是负责维护命令的发送顺序和状态维护。
- 每次发送完命令后,当前 channel 就会绑定发送的命令。这样拿到 Redis 的响应后,就可以知道发送的命令,根据发送的命令来解码响应数据。
- 处理完一个命令后,会自动触发下一个命令的发送。这样可以保证命令都是一条一条发出去的,保证解析不会错乱。
5.2.2 Redis命令编码 CommandEncoder
这个类负责将命令数据编码为可以发送到 Redis 服务器的字节格式。编码主要是运用 RESP 协议定义的语法组装报文,然后通过 Netty 发送出去。
package org.redisson.client.handler;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.Encoder;
import org.redisson.client.protocol.StringParamsEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.redisson.client.protocol.RedisCommand.ValueType;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.CharsetUtil;
public class CommandEncoder extends MessageToByteEncoder<CommandData<Object, Object>> {
private final Logger log = LoggerFactory.getLogger(getClass());
// 目前使用字符串参数编码
private final Encoder paramsEncoder = new StringParamsEncoder();
final char ARGS_PREFIX = '*'; // RESP协议定义的数组类型前缀
final char BYTES_PREFIX = '$'; // RESP协议定义的字符串类型前缀
final byte[] CRLF = "\r\n".getBytes(); // RESP协议定义的换行符
@Override
protected void encode(ChannelHandlerContext ctx, CommandData<Object, Object> msg, ByteBuf out) throws Exception {
// 先写入 * ,后面跟参数的数量
out.writeByte(ARGS_PREFIX);
// 计算 Redis 命令中的参数。
// 命令本身也算一个参数,比如 GET someKey ,这里参数是2,GET就是第一个参数
int len = 1 + msg.getParams().length;
// 存在子命令名参数长度还要加 1
if (msg.getCommand().getSubName() != null) {
len++;
}
// 写入长度
out.writeBytes(toChars(len));
// 写入换行符
out.writeBytes(CRLF);
// 写入命令名称
writeArgument(out, msg.getCommand().getName().getBytes("UTF-8"));
// 写入子命令名称
if (msg.getCommand().getSubName() != null) {
writeArgument(out, msg.getCommand().getSubName().getBytes("UTF-8"));
}
// 循环写入参数
int i = 1;
for (Object param : msg.getParams()) {
Encoder encoder = paramsEncoder;
if (msg.getCommand().getInParamType().size() == 1) {
// 只有一种参数类型,比如 SET key value
if (msg.getCommand().getInParamIndex() == i && msg.getCommand().getInParamType().get(0) == ValueType.OBJECT) {
// 直接用简单的字符串编码
encoder = msg.getCodec().getValueEncoder();
} else if (msg.getCommand().getInParamIndex() <= i && msg.getCommand().getInParamType().get(0) != ValueType.OBJECT) {
// 复杂对象使用自定义的编码
// 比如像 HSET myhash field1 fieldValue
encoder = encoder(msg, i - msg.getCommand().getInParamIndex());
}
} else {
// 多种参数类型
int paramNum = i - msg.getCommand().getInParamIndex();
if (msg.getCommand().getInParamIndex() <= i) {
encoder = encoder(msg, paramNum);
}
}
// 写入参数
writeArgument(out, encoder.encode(param));
i++;
}
if (log.isTraceEnabled()) {
log.trace("channel: {} message: {}", ctx.channel(), out.toString(CharsetUtil.UTF_8));
}
}
// 内部分发编码的方法
private Encoder encoder(CommandData<Object, Object> msg, int param) {
int typeIndex = 0;
if (msg.getCommand().getInParamType().size() > 1) {
typeIndex = param;
}
// map 类型,奇数是指 value ,偶数是 key
if (msg.getCommand().getInParamType().get(typeIndex) == ValueType.MAP) {
// 参数的个数是奇数,说明这个是命令的值
if (param % 2 != 0) {
return msg.getCodec().getMapValueEncoder();
} else {
// 参数的个数是奇数,说明这个是命令的 key
return msg.getCodec().getMapKeyEncoder();
}
}
// map key 类型处理
if (msg.getCommand().getInParamType().get(typeIndex) == ValueType.MAP_KEY) {
return msg.getCodec().getMapKeyEncoder();
}
// map value 类型处理
if (msg.getCommand().getInParamType().get(typeIndex) == ValueType.MAP_VALUE) {
return msg.getCodec().getMapValueEncoder();
}
// 对象类型处理
if (msg.getCommand().getInParamType().get(typeIndex) == ValueType.OBJECTS) {
return msg.getCodec().getValueEncoder();
}
// 其他类型暂时不支持处理
throw new IllegalStateException();
}
private void writeArgument(ByteBuf out, byte[] arg) {
// 写入 Redis 协议报文。 依次写入 长度>参数个数>换行符>参数>换行符
// 看个例子 $5\r\nhello\r\n
out.writeByte(BYTES_PREFIX);
out.writeBytes(toChars(arg.length));
out.writeBytes(CRLF);
out.writeBytes(arg);
out.writeBytes(CRLF);
}
// 下面几个数组是用于快速查表将数字转成字符
final static char[] DigitTens = {'0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '1', '1', '1', '1',
'1', '1', '1', '1', '1', '1', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '3', '3', '3',
'3', '3', '3', '3', '3', '3', '3', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '5', '5',
'5', '5', '5', '5', '5', '5', '5', '5', '6', '6', '6', '6', '6', '6', '6', '6', '6', '6', '7',
'7', '7', '7', '7', '7', '7', '7', '7', '7', '8', '8', '8', '8', '8', '8', '8', '8', '8', '8',
'9', '9', '9', '9', '9', '9', '9', '9', '9', '9',};
final static char[] DigitOnes = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3',
'4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2',
'3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1',
'2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0',
'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',};
final static char[] digits = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e',
'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x',
'y', 'z'};
final static int[] sizeTable = {9, 99, 999, 9999, 99999, 999999, 9999999, 99999999, 999999999,
Integer.MAX_VALUE};
// 计算数字的字符长度
static int stringSize(long x) {
for (int i = 0;; i++)
if (x <= sizeTable[i])
return i + 1;
}
// 数字转字符数组
static void getChars(long i, int index, byte[] buf) {
long q, r;
int charPos = index;
byte sign = 0;
if (i < 0) {
sign = '-';
i = -i;
}
// Generate two digits per iteration
while (i >= 65536) {
q = i / 100;
// really: r = i - (q * 100);
r = i - ((q << 6) + (q << 5) + (q << 2));
i = q;
buf[--charPos] = (byte) DigitOnes[(int)r];
buf[--charPos] = (byte) DigitTens[(int)r];
}
// Fall thru to fast mode for smaller numbers
// assert(i <= 65536, i);
for (;;) {
q = (i * 52429) >>> (16 + 3);
r = i - ((q << 3) + (q << 1)); // r = i-(q*10) ...
buf[--charPos] = (byte) digits[(int)r];
i = q;
if (i == 0)
break;
}
if (sign != 0) {
buf[--charPos] = sign;
}
}
// 将数字转换为字节数组
public static byte[] toChars(long i) {
int size = (i < 0) ? stringSize(-i) + 1 : stringSize(i);
byte[] buf = new byte[size];
getChars(i, size, buf);
return buf;
}
}
5.2.3 Redis命令解码 CommandDecoder
Redis 协议解码器,负责将从 Redis 服务器接收到的原始字节数据转换为 Java 对象。 在之前 RedissonV1 lettuce 代码中,使用 Redis 状态机这个工具类进行解码的。
总结下这个类的作用:
解析 RESP 协议格式的响应数据。
将字节数据转成 Java 对象。
处理不同类型的 Redis 命令响应,包括简单字符串、错误信息、数值类型、批量字符串类型、数组类型。
处理 Redis 的发布订阅消息。
package org.redisson.client.handler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.pubsub.Message;
import org.redisson.client.protocol.pubsub.PubSubMessage;
import org.redisson.client.protocol.pubsub.PubSubPatternMessage;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
public class CommandDecoder extends ReplayingDecoder<State> {
private final Logger log = LoggerFactory.getLogger(getClass());
public static final char CR = '\r'; //回车
public static final char LF = '\n'; // 换行,切割报文使用
private static final char ZERO = '0';
// 不需要并发Map,因为响应是陆陆续续到达的
// 存储消息解码器的Map,key是 channel 名,value 是对应的解码器
private final Map<String, MultiDecoder<Object>> messageDecoders = new HashMap<String, MultiDecoder<Object>>();
// 存储 channel 和对应 CommandData 的 Map
private final Map<String, CommandData<Object, Object>> channels = PlatformDependent.newConcurrentHashMap();
public void addChannel(String channel, CommandData<Object, Object> data) {
channels.put(channel, data);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 获取当前队列命令,命令就是之前发送给 Redis 的命令,netty channel 中绑定了这个命令。
QueueCommand data = ctx.channel().attr(CommandsQueue.REPLAY).get();
// 如果没有获取到当前队列命令,判断不了数据类型,只能使用一个默认的解码器
Decoder<Object> currentDecoder = null;
if (data == null) {
// data为null,创建一个默认的解码器
currentDecoder = new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) {
// 默认解码器将 ByteBuf 中的数据解码为 UTF-8 字符串
return buf.toString(CharsetUtil.UTF_8);
}
};
}
if (log.isTraceEnabled()) {
log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8));
}
// 如果当前状态为空,则初始化一个新的 State 对象
if (state() == null) {
state(new State());
}
// 重置解码状态
state().setDecoderState(null);
// 根据不同的 data 类型进行解码
if (data == null) {
// 如果没有队列命令,直接进行解码
decode(in, null, null, ctx.channel(), currentDecoder);
// 如果当前命令是单个命令
} else if (data instanceof CommandData) {
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
try {
decode(in, cmd, null, ctx.channel(), currentDecoder);
} catch (IOException e) {
cmd.getPromise().setFailure(e);
}
// 如果当前命令是多个命令 批量命令
} else if (data instanceof CommandsData) {
CommandsData commands = (CommandsData)data;
// 获取当前的命令索引
int i = state().getIndex();
// 循环处理 ByteBuf 中的数据,直到所有命令都被解码
while (in.writerIndex() > in.readerIndex()) {
CommandData<Object, Object> cmd = null;
try {
// 设置检查点,用于恢复解码状态
checkpoint();
// 设置当前的命令索引
state().setIndex(i);
// 获取当前命令
cmd = (CommandData<Object, Object>) commands.getCommands().get(i);
// 解码当前命令
decode(in, cmd, null, ctx.channel(), currentDecoder);
// 解码成功推进当前命令索引
i++;
} catch (IOException e) {
// 设置当前命令的 Promise 为失败状态
cmd.getPromise().setFailure(e);
}
}
// 如果所有命令都已解码
if (i == commands.getCommands().size()) {
Promise<Void> promise = commands.getPromise();
// 尝试完成 Promise,表示所有命令成功处理
if (!promise.trySuccess(null) && promise.isCancelled()) {
log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), data);
}
// 发送下一个命令
ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx);
// 重置状态
state(null);
} else {
// 如果还有未处理的命令,恢复解码状态
checkpoint();
state().setIndex(i);
}
return;
}
// 通知队列发送下一个命令
ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx);
// 重置状态
state(null);
}
private void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, Channel channel, Decoder<Object> currentDecoder) throws IOException {
// 读取第一个字节,表示 Redis响应类型
int code = in.readByte();
if (code == '+') {
// 简单字符串类型处理
String result = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
in.skipBytes(2); // 跳过两个字节的回车换行 \r\n
handleResult(data, parts, result, false, channel);
} else if (code == '-') {
// 处理错误响应
String error = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
in.skipBytes(2); // 跳过两个字节的回车换行 \r\n
if (error.startsWith("MOVED")) {
// 处理MOVED错误, 集群模式下的重定向
String[] errorParts = error.split(" ");
int slot = Integer.valueOf(errorParts[1]);
data.getPromise().setFailure(new RedisMovedException(slot));
} else if (error.startsWith("(error) ASK")) {
// 集群模式下的临时重定向错误s
String[] errorParts = error.split(" ");
int slot = Integer.valueOf(errorParts[2]);
data.getPromise().setFailure(new RedisMovedException(slot));
} else {
// 处理其他错误
data.getPromise().setFailure(new RedisException(error + ". channel: " + channel));
}
} else if (code == ':') {
// 整数响应
String status = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
in.skipBytes(2); // 跳过两个字节的回车换行 \r\n
Object result = Long.valueOf(status);
handleResult(data, parts, result, false, channel);
} else if (code == '$') {
// 批量字符串响应
ByteBuf buf = readBytes(in);
Object result = null;
if (buf != null) {
result = decoder(data, parts, currentDecoder).decode(buf, state());
}
handleResult(data, parts, result, false, channel);
} else if (code == '*') {
// 数组响应
long size = readLong(in);
List<Object> respParts = new ArrayList<Object>();
// 循环处理数组每一个元素
decodeMulti(in, data, parts, channel, currentDecoder, size, respParts);
} else {
throw new IllegalStateException("Can't decode replay " + (char)code);
}
}
private void decodeMulti(ByteBuf in, CommandData<Object, Object> data, List<Object> parts,
Channel channel, Decoder<Object> currentDecoder, long size, List<Object> respParts)
throws IOException {
// 循环处理多个响应部分
for (int i = respParts.size(); i < size; i++) {
decode(in, data, respParts, channel, currentDecoder);
}
// 用消息解码器处理完整的响应
Object result = messageDecoder(data, respParts).decode(respParts, state());
if (result instanceof Message) {
// PubSub消息特殊处理下
handleMultiResult(data, null, channel, result);
// 检查是否还有更多消息
if (in.writerIndex() > in.readerIndex()) {
// 如果还有数据,继续解码
decode(in, data, null, channel, currentDecoder);
}
} else {
handleMultiResult(data, parts, channel, result);
}
}
private void handleMultiResult(CommandData<Object, Object> data, List<Object> parts,
Channel channel, Object result) {
if (data == null) {
if (result instanceof PubSubStatusMessage) {
// 处理PubSub状态消息
String channelName = ((PubSubStatusMessage) result).getChannel();
CommandData<Object, Object> d = channels.get(channelName);
if (Arrays.asList("PSUBSCRIBE", "SUBSCRIBE").contains(d.getCommand().getName())) {
// / 订阅操作
channels.remove(channelName);
messageDecoders.put(channelName, d.getMessageDecoder());
}
if (Arrays.asList("PUNSUBSCRIBE", "UNSUBSCRIBE").contains(d.getCommand().getName())) {
// 取消订阅操作
channels.remove(channelName);
messageDecoders.remove(channelName);
}
}
}
if (data != null) {
// 如果有命令数据,处理结果
handleResult(data, parts, result, true, channel);
} else {
// 处理PubSub消息
RedisPubSubConnection pubSubConnection = (RedisPubSubConnection)channel.attr(RedisPubSubConnection.CONNECTION).get();
// 根据不同的类型调用回调函数
if (result instanceof PubSubStatusMessage) {
pubSubConnection.onMessage((PubSubStatusMessage) result);
} else if (result instanceof PubSubMessage) {
pubSubConnection.onMessage((PubSubMessage) result);
} else {
pubSubConnection.onMessage((PubSubPatternMessage) result);
}
}
}
private void handleResult(CommandData<Object, Object> data, List<Object> parts, Object result, boolean multiResult, Channel channel) {
if (data != null) {
if (multiResult) {
// 多个用 convertMulti 方法进行转换
result = data.getCommand().getConvertor().convertMulti(result);
} else {
// 单结果使用 convert 方法进行转换
result = data.getCommand().getConvertor().convert(result);
}
}
if (parts != null) {
// parts不为null,将结果添加到parts列表中
parts.add(result);
} else {
if (!data.getPromise().trySuccess(result) && data.getPromise().isCancelled()) {
log.warn("response has been skipped due to timeout! channel: {}, command: {}, result: {}", channel, data, result);
}
}
}
private MultiDecoder<Object> messageDecoder(CommandData<Object, Object> data, List<Object> parts) {
// 如果 data 为 null,是一个发布订阅相关的消息
if (data == null) {
// 1. 处理订阅以及取消订阅的命令
if (Arrays.asList("subscribe", "psubscribe", "punsubscribe", "unsubscribe").contains(parts.get(0))) {
String channelName = (String) parts.get(1);
// 使用 channels 中存储的命令解码器去解码
return channels.get(channelName).getCommand().getReplayMultiDecoder();
} else if (parts.get(0).equals("message")) {
// 2. 处理普通的发布订阅消息
String channelName = (String) parts.get(1);
return messageDecoders.get(channelName);
} else if (parts.get(0).equals("pmessage")) {
// 3. 处理模式匹配的发布订阅消息
String patternName = (String) parts.get(1);
return messageDecoders.get(patternName);
}
}
// 如果识别不了消息类型,就直接用默认的去处理
return data.getCommand().getReplayMultiDecoder();
}
private Decoder<Object> decoder(CommandData<Object, Object> data, List<Object> parts, Decoder<Object> currentDecoder) {
// 如果 data 为 null,是一个发布订阅相关的消息
if (data == null) {
// 处理普通的发布消息
if (parts.size() == 2 && parts.get(0).equals("message")) {
String channelName = (String) parts.get(1);
return messageDecoders.get(channelName);
}
// 处理模式匹配的发布消息
if (parts.size() == 3 && parts.get(0).equals("pmessage")) {
String patternName = (String) parts.get(1);
return messageDecoders.get(patternName);
}
// 如果消息类型判断不了,直接返回当前解码器
return currentDecoder;
}
// 处理不是发布订阅消息的情况
Decoder<Object> decoder = data.getCommand().getReplayDecoder();
if (parts != null) {
MultiDecoder<Object> multiDecoder = data.getCommand().getReplayMultiDecoder();
if (multiDecoder.isApplicable(parts.size(), state())) {
decoder = multiDecoder;
}
}
// 如果还没有找到合适的解码器,根据命令的输出参数类型选择解码器
if (decoder == null) {
// 根据命令的输出参数类型选择合适的解码器
if (data.getCommand().getOutParamType() == ValueType.MAP) {
// 大小是奇数,就使用 map key 解码器
// Redis 的 map 响应是 key-value 交替的
if (parts.size() % 2 != 0) {
decoder = data.getCodec().getMapKeyDecoder();
} else {
// 大小是偶数,使用 map value 解码器
decoder = data.getCodec().getMapValueDecoder();
}
} else if (data.getCommand().getOutParamType() == ValueType.MAP_KEY) {
// map key 解码器
decoder = data.getCodec().getMapKeyDecoder();
} else if (data.getCommand().getOutParamType() == ValueType.MAP_VALUE) {
// map value 解码器
decoder = data.getCodec().getMapValueDecoder();
} else {
// 使用普通的 value 解码器
decoder = data.getCodec().getValueDecoder();
}
}
return decoder;
}
public ByteBuf readBytes(ByteBuf is) throws IOException {
// 读取长度
long l = readLong(is);
if (l > Integer.MAX_VALUE) {
throw new IllegalArgumentException(
"Java only supports arrays up to " + Integer.MAX_VALUE + " in size");
}
int size = (int) l;
if (size == -1) {
// 大小为-1,表示空值。返回null
return null;
}
// 读取指定大小的数据
ByteBuf buffer = is.readSlice(size);
// 读取检查是否存在结束符 \r\n
int cr = is.readByte();
int lf = is.readByte();
if (cr != CR || lf != LF) {
throw new IOException("Improper line ending: " + cr + ", " + lf);
}
return buffer;
}
// 从 ByteBuf 中读取一个长整型数
public static long readLong(ByteBuf is) throws IOException {
long size = 0;
int sign = 1;
int read = is.readByte();
// 可能是负数,处理下
if (read == '-') {
read = is.readByte();
sign = -1;
}
// 循环读取数字字符,一直读到 \r\n
do {
if (read == CR) {
if (is.readByte() == LF) {
break;
}
}
// 将每个数字字符转换为数值再累加
int value = read - ZERO;
if (value >= 0 && value < 10) {
size *= 10;
size += value;
} else {
// 遇到非数字字符,抛出异常
throw new IOException("Invalid character in integer");
}
read = is.readByte();
} while (true);
// 返回最终结果,同时还处理了符号位
return size * sign;
}
}
5.2.4 Redis List命令编码 CommandsListEncoder
处理集合类型的命令编码,没有很特殊的逻辑,就是循环调用单个命令的编码器即可。
package org.redisson.client.handler;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class CommandsListEncoder extends MessageToByteEncoder<CommandsData> {
@Override
protected void encode(ChannelHandlerContext ctx, CommandsData msg, ByteBuf out) throws Exception {
// 遍历每一个 CommandData 对象
for (CommandData<?, ?> commandData : msg.getCommands()) {
// 循环调用 netty pipeline 中的 CommandEncoder 实例,并调用 encode 方法
ctx.pipeline().get(CommandEncoder.class).encode(ctx, (CommandData<Object, Object>)commandData, out);
}
}
}