七的博客

Redisson2.0源码分析1-开篇以及Redis协议编解码

源码分析

Redisson2.0源码分析1-开篇以及Redis协议编解码

之前简单分析过 Redisson V1 版本的一部分源码,看 changelog 发现 Redisson V2 版本有做过一次比较大的改动。这次就动手看看 V2 版本到底改动了哪些。

老样子,还是拉取 V2 版本发布的版本附近的源码。

1. 拉取指定版本源码分析

查看项目发布日志 changelog ,找到 2.0 版本的更新日志:

Starting from 2.0.0 version Redisson has a new own async and lock-free Redis client under the hood. Thanks to the new architecture pipline (command batches) support has been implemented and a lot of code has gone.

Feature - new RObject methods: move, moveAsync, migrate, migrateAsync Feature - new async interfaces: RAsyncMap, RAtomicLongAsync, RBlockingQueueAsync, RCollectionAsync, RDequeAsync, RExpirableAsync, RHyperLogLogAsync, RListAsync, RObjectAsync, RQueueAsync, RScriptAsync, RSetAsync, RTopicAsync Feature - multiple commands batch (Redis pipelining) support via Redisson.createBatch method Feature - new methods flushall, deleteAsync, delete, deleteByPatternAsync, deleteByPattern, findKeysByPatternAsync, findKeysByPattern added to RedissonClient interface Improvement - closed channel detection speedup

这个版本可能还有些 Bug 没有完全修复,所以又找到了 2.1 版本。

15-Aug-2015 - version 2.1.1 released
Feature - all keys operations extracted to RKeys interface
Feature - RKeys.getKeys, RKeys.getKeysByPattern and RKeys.randomKeymethods added
Feature - RBlockingQueueAsync.drainToAsync method added
Feature - Redis nodes info and ping operations via Redisson.getNodesGroup or Redisson.getClusterNodesGroup now available
Improvement - added sentinel nodes discovery
Fixed - command encoding errors handling
Fixed - cluster empty slot handling
Fixed - connection hangs when there are no slaves in sentinel mode
Fixed - activate master as slave when there are no more available slaves in sentinel mode
Fixed - skip disconnected sentinels during startup
Fixed - slave node discovery in sentinel mode which has been disconnected since start
Deprecated - Redisson methods deleteAsync, delete, deleteByPatternAsync, deleteByPattern, findKeysByPatternAsync, findKeysByPattern. Use same methods with RKeys interface

对应的 Git commit id 是 cd3032ac, 直接回滚到这个版本即可。

git reset --hard cd3032ac

2. 版本变更原因

这个版本的变化主要是有人在 issue 提到 lettuce 源码相关的问题,比如下面几个:

lettuce 太久没有去维护了,导致作者自己也想重新写一套 Redis 连接以及编解码相关的代码,这样更符合 Redisson 自己的用法。

3. V2版本代码结构

回退到指定版本后的代码结构如下:

.
└── org
    └── redisson
        ├── BaseConfig.java   // Redisson 配置类基类
        ├── BaseMasterSlaveServersConfig.java  // 主从配置基类
        ├── ClusterServersConfig.java   // 集群配置基类
        ├── CommandBatchExecutorService.java  // 批量命令操作实现
        ├── CommandExecutor.java   // Redis 命令执行器接口
        ├── CommandExecutorService.java  // Redis 命令执行器实现
        ├── Config.java   // Redisson 配置管理类
        ├── MasterSlaveServersConfig.java  // 主从配置
        ├── PubSubMessageListener.java  // 发布订阅监听器实现
        ├── PubSubPatternMessageListener.java   // // 发布订阅模式消息监听器实现
        ├── PubSubPatternStatusListener.java   // // // 发布订阅模式状态监听器实现
        ├── PubSubStatusListener.java   // 发布订阅状态监听器实现
        ├── RedisNodes.java   // Redis 集群节点
        ├── Redisson.java   // Redisson 核心类
        ├── RedissonAtomicLong.java
        ├── RedissonBatch.java
        ├── RedissonBlockingQueue.java
        ├── RedissonBucket.java
        ├── RedissonClient.java
        ├── RedissonCountDownLatch.java
        ├── RedissonCountDownLatchEntry.java
        ├── RedissonDeque.java
        ├── RedissonExpirable.java
        ├── RedissonHyperLogLog.java
        ├── RedissonKeys.java
        ├── RedissonList.java
        ├── RedissonLock.java
        ├── RedissonLockEntry.java
        ├── RedissonMap.java
        ├── RedissonMapEntry.java
        ├── RedissonObject.java
        ├── RedissonPatternTopic.java
        ├── RedissonQueue.java
        ├── RedissonScript.java
        ├── RedissonSet.java
        ├── RedissonSortedSet.java
        ├── RedissonSubSortedSet.java
        ├── RedissonTopic.java
        ├── SentinelServersConfig.java
        ├── SingleServerConfig.java
        ├── SlotCallback.java
        ├── SyncOperation.java
        ├── client   // 处理与 Redis 的客户端通信
        │   ├── BaseRedisPubSubListener.java
        │   ├── OneShotPubSubListener.java
        │   ├── RedisClient.java
        │   ├── RedisConnection.java
        │   ├── RedisConnectionException.java
        │   ├── RedisEmptySlotException.java
        │   ├── RedisException.java
        │   ├── RedisMovedException.java
        │   ├── RedisPubSubConnection.java
        │   ├── RedisPubSubListener.java
        │   ├── RedisTimeoutException.java
        │   ├── WriteRedisConnectionException.java
        │   ├── codec  // 提供用于数据类型的编码和解码的类
        │   │   ├── Codec.java
        │   │   ├── LongCodec.java
        │   │   └── StringCodec.java
        │   ├── handler  // Redis 命令的编码/解码
        │   │   ├── CommandDecoder.java
        │   │   ├── CommandEncoder.java
        │   │   ├── CommandsListEncoder.java
        │   │   ├── CommandsQueue.java
        │   │   ├── ConnectionWatchdog.java
        │   │   └── State.java
        │   └── protocol   // 处理 Redis 协议、数据处理
        │       ├── CommandData.java
        │       ├── CommandsData.java
        │       ├── Decoder.java
        │       ├── Encoder.java
        │       ├── QueueCommand.java
        │       ├── QueueCommandHolder.java
        │       ├── RedisCommand.java
        │       ├── RedisCommands.java
        │       ├── RedisStrictCommand.java
        │       ├── StringParamsEncoder.java
        │       ├── convertor   // Redis 响应的数据转换
        │       │   ├── BooleanAmountReplayConvertor.java
        │       │   ├── BooleanNumberReplayConvertor.java
        │       │   ├── BooleanReplayConvertor.java
        │       │   ├── Convertor.java
        │       │   ├── EmptyConvertor.java
        │       │   ├── IntegerReplayConvertor.java
        │       │   ├── KeyValueConvertor.java
        │       │   ├── LongReplayConvertor.java
        │       │   ├── NumberConvertor.java
        │       │   ├── SingleConvertor.java
        │       │   ├── TrueReplayConvertor.java
        │       │   └── VoidReplayConvertor.java
        │       ├── decoder   // Redis 响应编码
        │       │   ├── KeyValueMessage.java
        │       │   ├── KeyValueObjectDecoder.java
        │       │   ├── ListScanResult.java
        │       │   ├── ListScanResultReplayDecoder.java
        │       │   ├── MapScanResult.java
        │       │   ├── MapScanResultReplayDecoder.java
        │       │   ├── MultiDecoder.java
        │       │   ├── NestedMultiDecoder.java
        │       │   ├── ObjectListReplayDecoder.java
        │       │   ├── ObjectMapReplayDecoder.java
        │       │   ├── ObjectSetReplayDecoder.java
        │       │   ├── StringDataDecoder.java
        │       │   ├── StringListReplayDecoder.java
        │       │   ├── StringMapDataDecoder.java
        │       │   ├── StringMapReplayDecoder.java
        │       │   ├── StringObjectDecoder.java
        │       │   └── StringReplayDecoder.java
        │       └── pubsub   // 发布订阅相关、包括编解码等
        │           ├── Message.java
        │           ├── PubSubMessage.java
        │           ├── PubSubMessageDecoder.java
        │           ├── PubSubPatternMessage.java
        │           ├── PubSubPatternMessageDecoder.java
        │           ├── PubSubStatusDecoder.java
        │           ├── PubSubStatusMessage.java
        │           └── PubSubType.java
        ├── codec  // 提供不同的序列化策略,例如 JSON, Kryo
        │   ├── JsonJacksonCodec.java
        │   ├── KryoCodec.java
        │   └── SerializationCodec.java
        ├── connection   //  Redis 连接管理
        │   ├── BaseLoadBalancer.java
        │   ├── CRC16.java
        │   ├── ClusterConnectionManager.java
        │   ├── ClusterNodeInfo.java
        │   ├── ClusterPartition.java
        │   ├── ConnectionEntry.java
        │   ├── ConnectionManager.java      // 连接管理,包括集群和主从配置
        │   ├── LoadBalancer.java          // 负载均衡策略接口
        │   ├── MasterSlaveConnectionManager.java
        │   ├── MasterSlaveEntry.java
        │   ├── PubSubConnectionEntry.java
        │   ├── RandomLoadBalancer.java
        │   ├── RedisClientEntry.java
        │   ├── RoundRobinLoadBalancer.java
        │   ├── SentinelConnectionManager.java
        │   ├── SingleConnectionManager.java
        │   ├── SingleEntry.java
        │   ├── SubscribesConnectionEntry.java
        │   └── decoder   // 解码 Redis List 数据
        │       ├── ListDrainToDecoder.java
        │       └── ListFirstObjectDecoder.java
        ├── core  // 分布式数据结构和操作的接口定义
        │   ├── BasePatternStatusListener.java
        │   ├── BaseStatusListener.java
        │   ├── ClusterNode.java
        │   ├── MessageListener.java
        │   ├── Node.java
        │   ├── NodeListener.java
        │   ├── NodesGroup.java
        │   ├── PatternMessageListener.java
        │   ├── PatternStatusListener.java
        │   ├── Predicate.java
        │   ├── RAtomicLong.java
        │   ├── RAtomicLongAsync.java
        │   ├── RBatch.java
        │   ├── RBlockingQueue.java
        │   ├── RBlockingQueueAsync.java
        │   ├── RBucket.java
        │   ├── RBucketAsync.java
        │   ├── RCollectionAsync.java
        │   ├── RCountDownLatch.java
        │   ├── RDeque.java
        │   ├── RDequeAsync.java
        │   ├── RExpirable.java
        │   ├── RExpirableAsync.java
        │   ├── RHyperLogLog.java
        │   ├── RHyperLogLogAsync.java
        │   ├── RKeys.java
        │   ├── RKeysAsync.java
        │   ├── RList.java
        │   ├── RListAsync.java
        │   ├── RLock.java
        │   ├── RMap.java
        │   ├── RMapAsync.java
        │   ├── RObject.java
        │   ├── RObjectAsync.java
        │   ├── RPatternTopic.java
        │   ├── RQueue.java
        │   ├── RQueueAsync.java
        │   ├── RScript.java
        │   ├── RScriptAsync.java
        │   ├── RSet.java
        │   ├── RSetAsync.java
        │   ├── RSortedSet.java
        │   ├── RTopic.java
        │   ├── RTopicAsync.java
        │   └── StatusListener.java
        └── misc  // 杂项,一些工具类
            ├── CompositeIterable.java  
            ├── InfinitySemaphoreLatch.java
            ├── ReclosableLatch.java
            └── URIBuilder.java

14 directories, 182 files

可以看到,之前 Lettuce 的代码以及完全移除了,这块的代码肯定 Redisson 需要完全把类似的机制都实现一遍。

4. 分析思路

根据上面的代码结构以及以往分析 Redisson V1 版本源码的套路,直接按下如下几个步骤进行分析: - Redis 协议编解码。这一块由于 lettuce 的代码被移除了,肯定重新写了这一部分代码。 - Redis 连接的维护管理。 包括单机、集群、主从、哨兵等等。 - Redisson 分布式对象的实现。 这一块算是核心源码。

稍微看了下这个版本的代码多了很多,在一篇里面全部写完将会很长很长,所以将会合理的拆分出多篇来,具体拆分几篇就边写边拆。

5.协议编解码部分源码分析

首先肯定还是先看 Redis 协议编解码这一块,跟之前 Lettuce 很有可能理念会不太一样。总体上应该套路会差不多,因为 Lettuce 本身就是异步才跟 Jedis 有比较明显的差距,所以 Redisson 这一块也会继续保持一致的做法。

5.1 编解码接口定义

5.1.1 Redis数据编解码器接口 Codec

接口主要是定义获取 Redis 数据编解码,提供了针对 Hash 以及其他 Redis 数据结构的编解码。

package org.redisson.client.codec;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;

public interface Codec {

    // 获取 Redis Hash value 解码器
    Decoder<Object> getMapValueDecoder();

    // 获取 Redis Hash value 编码器
    Encoder getMapValueEncoder();

    // 获取 Redis Hash value 解码器
    Decoder<Object> getMapKeyDecoder();

    // 获取 Redis Hash key 编码器
    Encoder getMapKeyEncoder();

    // 获取 value 解码器
    Decoder<Object> getValueDecoder();

    // 获取 value 编码器
    Encoder getValueEncoder();

}

5.1.2 字符串类型编解码实现 StringCodec

这个类主要是提供对字符串数据进行编解码,使用UTF-8字符集来处理字符串的转换。

在之前 lettuce 代码中,存在几乎一模一样的接口实现。 不过 lettuce 是依赖于 JDK 的 ByteBuffer ,Redisson 这个版本是直接使用 Netty 的 ByteBuf。

package org.redisson.client.codec;

import java.io.IOException;

import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;

import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;

public class StringCodec implements Codec {

    public static final StringCodec INSTANCE = new StringCodec();  // 单例模式

    @Override
    public Decoder<Object> getValueDecoder() {
        return new Decoder<Object>() {
            @Override
            public Object decode(ByteBuf buf, State state) {
                // 将 ByteBuf 中的字节数据转换为 UTF-8 编码的字符串并返回
                return buf.toString(CharsetUtil.UTF_8);
            }
        };
    }

    @Override
    public Encoder getValueEncoder() {
        return new Encoder() {
            @Override
            public byte[] encode(Object in) throws IOException {
                // 将输入对象转换为字符串,并以 UTF-8 编码成字节数组形式返回。
                return in.toString().getBytes("UTF-8");
            }
        };
    }

    @Override
    public Decoder<Object> getMapValueDecoder() {
        return getValueDecoder();
    }

    @Override
    public Encoder getMapValueEncoder() {
        return getValueEncoder();
    }

    @Override
    public Decoder<Object> getMapKeyDecoder() {
        return getValueDecoder();
    }

    @Override
    public Encoder getMapKeyEncoder() {
        return getValueEncoder();
    }
}

5.1.3 JSON 编解码实现 JsonJacksonCodec

这个是 JSON 编解码的实现,可以将 Java 对象编码成 JSON , JSON 解码为 Java对象。 使用的 JSON 库是 Jackson ,在使用 Redisson 中也是一种比较常用的编解码器。

package org.redisson.codec;

import java.io.IOException;

import org.redisson.client.codec.Codec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectMapper.DefaultTypeResolverBuilder;
import com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.jsontype.TypeResolverBuilder;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;

public class JsonJacksonCodec implements Codec {

    private ObjectMapper mapObjectMapper = new ObjectMapper();

    public JsonJacksonCodec() {
        init(mapObjectMapper);
       
        // 这里主要是配置 Jackson 序列化以及反序列化过程中类型信息处理,不用太纠结具体内容
        TypeResolverBuilder<?> mapTyper = new DefaultTypeResolverBuilder(DefaultTyping.NON_FINAL) {
            public boolean useForType(JavaType t)
            {
                switch (_appliesFor) {
                case NON_CONCRETE_AND_ARRAYS:
                    while (t.isArrayType()) {
                        t = t.getContentType();
                    }
                    // fall through
                case OBJECT_AND_NON_CONCRETE:
                    return (t.getRawClass() == Object.class) || !t.isConcrete();
                case NON_FINAL:
                    while (t.isArrayType()) {
                        t = t.getContentType();
                    }
                    // to fix problem with wrong long to int conversion
                    if (t.getRawClass() == Long.class) {
                        return true;
                    }
                    return !t.isFinal(); // includes Object.class
                default:
                //case JAVA_LANG_OBJECT:
                    return (t.getRawClass() == Object.class);
                }
            }
        };
        mapTyper.init(JsonTypeInfo.Id.CLASS, null); 
        mapTyper.inclusion(JsonTypeInfo.As.PROPERTY);
        mapObjectMapper.setDefaultTyping(mapTyper);
    }
	
    // 配置 ObjectMapper 的序列化、反序列化等特性
    protected void init(ObjectMapper objectMapper) {
        // 序列化忽略 null 值
        objectMapper.setSerializationInclusion(Include.NON_NULL); objectMapper.setVisibilityChecker(objectMapper.getSerializationConfig().getDefaultVisibilityChecker()  
                                            .withFieldVisibility(JsonAutoDetect.Visibility.ANY)   // 字段可见性设置为任意
                                            .withGetterVisibility(JsonAutoDetect.Visibility.NONE)   // Getter方法不可见
                                            .withSetterVisibility(JsonAutoDetect.Visibility.NONE)    // Setter方法不可
                                            .withCreatorVisibility(JsonAutoDetect.Visibility.NONE)); // 构造器不可见
        
        // 反序列化时忽略未知属性
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
         // 序列化BigDecimal时不使用科学计数法
        objectMapper.configure(SerializationFeature.WRITE_BIGDECIMAL_AS_PLAIN, true);
        // 按字母顺序排序属性
        objectMapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
    }

    @Override
    public Decoder<Object> getMapValueDecoder() {
        return new Decoder<Object>() {

            @Override
            public Object decode(ByteBuf buf, State state) throws IOException {
                // 将 JSON 字节流转成 Java 对象
                return mapObjectMapper.readValue(new ByteBufInputStream(buf), Object.class);
            }
        };
    }

    @Override
    public Encoder getMapValueEncoder() {
        return new Encoder() {

            @Override
            public byte[] encode(Object in) throws IOException {
                // 将Java对象序列化成 JSON 字节流
                return mapObjectMapper.writeValueAsBytes(in);
            }
        };
    }

    @Override
    public Decoder<Object> getMapKeyDecoder() {
        return getMapValueDecoder();
    }

    @Override
    public Encoder getMapKeyEncoder() {
        return getMapValueEncoder();
    }

    @Override
    public Decoder<Object> getValueDecoder() {
        return getMapValueDecoder();
    }

    @Override
    public Encoder getValueEncoder() {
        return getMapValueEncoder();
    }

}

5.2 Redis命令处理器

Redis命令处理器主要包含如下内容:

  • 将 Java 命令对象转化为 Redis 协议报文。
  • 将 Redis 应答的报文解析成 Java 对象。
  • 处理 pipeline 命令。
  • 命令队列的处理。

这里重点需要注意一个知识点:

Redis 协议保证了命令和响应的顺序性,响应的顺序与发送命令的顺序是一致的。发送命令的时候,会将命令按顺序放入队列中,然后服务器也会按照相同的顺序返回响应。

理解这点对于下面代码的理解很重要,这里可以通过一张图大致了解这几个类的关联关系:

Redisson 命令处理器逻辑

5.2.1 Redis 命令队列 CommandsQueue

负责管理和发送命令队列。

package org.redisson.client.handler;

import java.util.List;
import java.util.Queue;

import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.QueueCommandHolder;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.AttributeKey;
import io.netty.util.internal.PlatformDependent;

public class CommandsQueue extends ChannelDuplexHandler {

    // 用于在 Channel 中存储 QueueCommand 对象
    public static final AttributeKey<QueueCommand> REPLAY = AttributeKey.valueOf("promise");

    // netty 提供的并发队列实现
    // 这个队列的特点是多生产者,单个消费者
    private final Queue<QueueCommandHolder> queue = PlatformDependent.newMpscQueue();

    // 发送下一条命令,这里内部会调用这个方法,外部也会调用这个方法
    public void sendNextCommand(ChannelHandlerContext ctx) throws Exception {
        // Channel 的属性中移除已经处理的命令,这样才不会搞混
        ctx.channel().attr(CommandsQueue.REPLAY).remove();
        // 队列中移除当前命令
        queue.poll();
        // 发送数据
        sendData(ctx);
    }
	
    // write 方法负责将命令写入队列
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        //  判断消息是否为 QueueCommand 类型
        if (msg instanceof QueueCommand) {
            QueueCommand data = (QueueCommand) msg;
            // 检查队列中的第一个命令跟当前命令是不是一样,一样就直接发送
            if (queue.peek() != null && queue.peek().getCommand() == data) {
                super.write(ctx, msg, promise);
            } else {
                // 命令不一样就放入队列并发送数据
                queue.add(new QueueCommandHolder(data, promise));
                sendData(ctx);
            }
        } else {
            // 不是 QueueCommand 类型直接发送出去
            super.write(ctx, msg, promise);
        }
    }
	
    
    // 实际负责发送命令的方法
    private void sendData(final ChannelHandlerContext ctx) throws Exception {
        // 获取队列中的第一个命令
        QueueCommandHolder command = queue.peek();
        
        // 命令不在发送中或者没有发送过才处理
        if (command != null && command.getSended().compareAndSet(false, true)) {
            QueueCommand data = command.getCommand();

            // 获取发布/订阅操作
            List<CommandData<Object, Object>> pubSubOps = data.getPubSubOperations();
            if (!pubSubOps.isEmpty()) {
                // 存在发布/订阅操作,调用解码器处理
                for (CommandData<Object, Object> cd : pubSubOps) {
                    for (Object channel : cd.getParams()) {
                        ctx.pipeline().get(CommandDecoder.class).addChannel(channel.toString(), cd);
                    }
                }
            } else {
                // 注意这里!!!
                // 这里将当前命令设置为 channel 的一个属性,这个拿到响应时才知道对应哪条命令。
                ctx.channel().attr(REPLAY).set(data);
            }

            // 天监听器处理操作完成逻辑
            command.getChannelPromise().addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    // 命令发送不成功,会触发发送下一条命令
                    if (!future.isSuccess()) {
                        // 操作不成功,发送下一个命令
                        sendNextCommand(ctx);
                    }
                }
            });
            // 写入数据
            ctx.channel().writeAndFlush(data, command.getChannelPromise());
        }
    }

}

总结下这个类的逻辑:

  • CommandsQueue 主要是负责维护命令的发送顺序和状态维护。
  • 每次发送完命令后,当前 channel 就会绑定发送的命令。这样拿到 Redis 的响应后,就可以知道发送的命令,根据发送的命令来解码响应数据。
  • 处理完一个命令后,会自动触发下一个命令的发送。这样可以保证命令都是一条一条发出去的,保证解析不会错乱。

5.2.2 Redis命令编码 CommandEncoder

这个类负责将命令数据编码为可以发送到 Redis 服务器的字节格式。编码主要是运用 RESP 协议定义的语法组装报文,然后通过 Netty 发送出去。

编码流程

package org.redisson.client.handler;

import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.Encoder;
import org.redisson.client.protocol.StringParamsEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.redisson.client.protocol.RedisCommand.ValueType;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.CharsetUtil;

public class CommandEncoder extends MessageToByteEncoder<CommandData<Object, Object>> {

    private final Logger log = LoggerFactory.getLogger(getClass());

    // 目前使用字符串参数编码
    private final Encoder paramsEncoder = new StringParamsEncoder();

    final char ARGS_PREFIX = '*';   // RESP协议定义的数组类型前缀
    final char BYTES_PREFIX = '$';  // RESP协议定义的字符串类型前缀
    final byte[] CRLF = "\r\n".getBytes();  // RESP协议定义的换行符

    @Override
    protected void encode(ChannelHandlerContext ctx, CommandData<Object, Object> msg, ByteBuf out) throws Exception {
        // 先写入 * ,后面跟参数的数量
        out.writeByte(ARGS_PREFIX);

        // 计算 Redis 命令中的参数。
        // 命令本身也算一个参数,比如 GET someKey ,这里参数是2,GET就是第一个参数
        int len = 1 + msg.getParams().length;

        // 存在子命令名参数长度还要加 1
        if (msg.getCommand().getSubName() != null) {
            len++;
        }
        // 写入长度
        out.writeBytes(toChars(len));
        // 写入换行符
        out.writeBytes(CRLF);

        // 写入命令名称
        writeArgument(out, msg.getCommand().getName().getBytes("UTF-8"));
        // 写入子命令名称
        if (msg.getCommand().getSubName() != null) {
            writeArgument(out, msg.getCommand().getSubName().getBytes("UTF-8"));
        }

        // 循环写入参数
        int i = 1;
        for (Object param : msg.getParams()) {
            Encoder encoder = paramsEncoder;
            if (msg.getCommand().getInParamType().size() == 1) {
                // 只有一种参数类型,比如 SET key value
                if (msg.getCommand().getInParamIndex() == i && msg.getCommand().getInParamType().get(0) == ValueType.OBJECT) {
                    // 直接用简单的字符串编码
                    encoder = msg.getCodec().getValueEncoder();
                } else if (msg.getCommand().getInParamIndex() <= i && msg.getCommand().getInParamType().get(0) != ValueType.OBJECT) {
                    // 复杂对象使用自定义的编码
                    // 比如像 HSET myhash field1 fieldValue
                    encoder = encoder(msg, i - msg.getCommand().getInParamIndex());
                }
            } else {
                // 多种参数类型
                int paramNum = i - msg.getCommand().getInParamIndex();
                if (msg.getCommand().getInParamIndex() <= i) {
                    encoder = encoder(msg, paramNum);
                }
            }

            // 写入参数
            writeArgument(out, encoder.encode(param));

            i++;
        }

        if (log.isTraceEnabled()) {
            log.trace("channel: {} message: {}", ctx.channel(), out.toString(CharsetUtil.UTF_8));
        }
    }

    // 内部分发编码的方法
    private Encoder encoder(CommandData<Object, Object> msg, int param) {
        int typeIndex = 0;
        if (msg.getCommand().getInParamType().size() > 1) {
            typeIndex = param;
        }

        // map 类型,奇数是指 value ,偶数是 key
        if (msg.getCommand().getInParamType().get(typeIndex) == ValueType.MAP) {
            // 参数的个数是奇数,说明这个是命令的值
            if (param % 2 != 0) {
                return msg.getCodec().getMapValueEncoder();
            } else {
                 // 参数的个数是奇数,说明这个是命令的 key
                return msg.getCodec().getMapKeyEncoder();
            }
        }
        
        // map key 类型处理
        if (msg.getCommand().getInParamType().get(typeIndex) == ValueType.MAP_KEY) {
            return msg.getCodec().getMapKeyEncoder();
        }
        
        // map value 类型处理        
        if (msg.getCommand().getInParamType().get(typeIndex) == ValueType.MAP_VALUE) {
            return msg.getCodec().getMapValueEncoder();
        }
        
        // 对象类型处理
        if (msg.getCommand().getInParamType().get(typeIndex) == ValueType.OBJECTS) {
            return msg.getCodec().getValueEncoder();
        }

        // 其他类型暂时不支持处理
        throw new IllegalStateException();
    }

    private void writeArgument(ByteBuf out, byte[] arg) {
        // 写入 Redis 协议报文。 依次写入 长度>参数个数>换行符>参数>换行符
        // 看个例子 $5\r\nhello\r\n
        out.writeByte(BYTES_PREFIX);
        out.writeBytes(toChars(arg.length));
        out.writeBytes(CRLF);
        out.writeBytes(arg);
        out.writeBytes(CRLF);
    }

    // 下面几个数组是用于快速查表将数字转成字符
    final static char[] DigitTens = {'0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '1', '1', '1', '1',
            '1', '1', '1', '1', '1', '1', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '3', '3', '3',
            '3', '3', '3', '3', '3', '3', '3', '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', '5', '5',
            '5', '5', '5', '5', '5', '5', '5', '5', '6', '6', '6', '6', '6', '6', '6', '6', '6', '6', '7',
            '7', '7', '7', '7', '7', '7', '7', '7', '7', '8', '8', '8', '8', '8', '8', '8', '8', '8', '8',
            '9', '9', '9', '9', '9', '9', '9', '9', '9', '9',};

    final static char[] DigitOnes = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3',
            '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2',
            '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1',
            '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0',
            '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
            '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',};

    final static char[] digits = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e',
            'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x',
            'y', 'z'};

    final static int[] sizeTable = {9, 99, 999, 9999, 99999, 999999, 9999999, 99999999, 999999999,
            Integer.MAX_VALUE};

    // 计算数字的字符长度
    static int stringSize(long x) {
        for (int i = 0;; i++)
            if (x <= sizeTable[i])
                return i + 1;
    }

    // 数字转字符数组
    static void getChars(long i, int index, byte[] buf) {
        long q, r;
        int charPos = index;
        byte sign = 0;

        if (i < 0) {
            sign = '-';
            i = -i;
        }

        // Generate two digits per iteration
        while (i >= 65536) {
            q = i / 100;
            // really: r = i - (q * 100);
            r = i - ((q << 6) + (q << 5) + (q << 2));
            i = q;
            buf[--charPos] = (byte) DigitOnes[(int)r];
            buf[--charPos] = (byte) DigitTens[(int)r];
        }

        // Fall thru to fast mode for smaller numbers
        // assert(i <= 65536, i);
        for (;;) {
            q = (i * 52429) >>> (16 + 3);
            r = i - ((q << 3) + (q << 1)); // r = i-(q*10) ...
            buf[--charPos] = (byte) digits[(int)r];
            i = q;
            if (i == 0)
                break;
        }
        if (sign != 0) {
            buf[--charPos] = sign;
        }
    }

    // 将数字转换为字节数组
    public static byte[] toChars(long i) {
        int size = (i < 0) ? stringSize(-i) + 1 : stringSize(i);
        byte[] buf = new byte[size];
        getChars(i, size, buf);
        return buf;
    }

}


5.2.3 Redis命令解码 CommandDecoder

Redis 协议解码器,负责将从 Redis 服务器接收到的原始字节数据转换为 Java 对象。 在之前 RedissonV1 lettuce 代码中,使用 Redis 状态机这个工具类进行解码的。

总结下这个类的作用:

  • 解析 RESP 协议格式的响应数据。

  • 将字节数据转成 Java 对象。

  • 处理不同类型的 Redis 命令响应,包括简单字符串、错误信息、数值类型、批量字符串类型、数组类型。

  • 处理 Redis 的发布订阅消息。

Redis协议解码流程

package org.redisson.client.handler;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.pubsub.Message;
import org.redisson.client.protocol.pubsub.PubSubMessage;
import org.redisson.client.protocol.pubsub.PubSubPatternMessage;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;


public class CommandDecoder extends ReplayingDecoder<State> {

    private final Logger log = LoggerFactory.getLogger(getClass());

    public static final char CR = '\r'; //回车
    public static final char LF = '\n'; // 换行,切割报文使用
    private static final char ZERO = '0';

    // 不需要并发Map,因为响应是陆陆续续到达的 
    // 存储消息解码器的Map,key是 channel 名,value 是对应的解码器
    private final Map<String, MultiDecoder<Object>> messageDecoders = new HashMap<String, MultiDecoder<Object>>();
    // 存储 channel 和对应 CommandData 的 Map
    private final Map<String, CommandData<Object, Object>> channels = PlatformDependent.newConcurrentHashMap();

    public void addChannel(String channel, CommandData<Object, Object> data) {
        channels.put(channel, data);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 获取当前队列命令,命令就是之前发送给 Redis 的命令,netty channel 中绑定了这个命令。
        QueueCommand data = ctx.channel().attr(CommandsQueue.REPLAY).get();
	
        // 如果没有获取到当前队列命令,判断不了数据类型,只能使用一个默认的解码器
        Decoder<Object> currentDecoder = null;
        if (data == null) {
            // data为null,创建一个默认的解码器
            currentDecoder = new Decoder<Object>() {
                @Override
                public Object decode(ByteBuf buf, State state) {
                    // 默认解码器将 ByteBuf 中的数据解码为 UTF-8 字符串
                    return buf.toString(CharsetUtil.UTF_8);
                }
            };
        }

        if (log.isTraceEnabled()) {
            log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8));
        }
		
        // 如果当前状态为空,则初始化一个新的 State 对象
        if (state() == null) {
            state(new State());
        }
        // 重置解码状态
        state().setDecoderState(null);

        // 根据不同的 data 类型进行解码
        if (data == null) {
              // 如果没有队列命令,直接进行解码
              decode(in, null, null, ctx.channel(), currentDecoder);
            
          // 如果当前命令是单个命令
        } else if (data instanceof CommandData) {
            CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
            try {
                 
                 decode(in, cmd, null, ctx.channel(), currentDecoder);
            } catch (IOException e) {
                cmd.getPromise().setFailure(e);
            }
            
          // 如果当前命令是多个命令 批量命令  
        } else if (data instanceof CommandsData) {
            CommandsData commands = (CommandsData)data;
			
            // 获取当前的命令索引
            int i = state().getIndex();
			
            // 循环处理 ByteBuf 中的数据,直到所有命令都被解码
            while (in.writerIndex() > in.readerIndex()) {
                CommandData<Object, Object> cmd = null;
                try {
                     // 设置检查点,用于恢复解码状态
                    checkpoint();
                    // 设置当前的命令索引
                    state().setIndex(i);
                    // 获取当前命令
                    cmd = (CommandData<Object, Object>) commands.getCommands().get(i);
                     // 解码当前命令
                    decode(in, cmd, null, ctx.channel(), currentDecoder);
                    // 解码成功推进当前命令索引
                    i++;
                } catch (IOException e) {
                    //  设置当前命令的 Promise 为失败状态
                    cmd.getPromise().setFailure(e);
                }
            }
			
             // 如果所有命令都已解码
            if (i == commands.getCommands().size()) {
                Promise<Void> promise = commands.getPromise();
                 // 尝试完成 Promise,表示所有命令成功处理
                if (!promise.trySuccess(null) && promise.isCancelled()) {
                    log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), data);
                }
				
                
                // 发送下一个命令
                ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx);
            	
                // 重置状态
                state(null);
            } else {
                // 如果还有未处理的命令,恢复解码状态
                checkpoint();
                state().setIndex(i);
            }
            return;
        }
		
        // 通知队列发送下一个命令
        ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx);
		
        // 重置状态
        state(null);
    }

    private void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, Channel channel, Decoder<Object> currentDecoder) throws IOException {
        // 读取第一个字节,表示 Redis响应类型
        int code = in.readByte();
        if (code == '+') {
            // 简单字符串类型处理
            String result = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
            in.skipBytes(2); // 跳过两个字节的回车换行 \r\n

            handleResult(data, parts, result, false, channel);
        } else if (code == '-') {
            // 处理错误响应
            String error = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); 
            in.skipBytes(2);  // 跳过两个字节的回车换行 \r\n

            if (error.startsWith("MOVED")) {
                // 处理MOVED错误, 集群模式下的重定向
                String[] errorParts = error.split(" ");
                int slot = Integer.valueOf(errorParts[1]);
                data.getPromise().setFailure(new RedisMovedException(slot));
            } else if (error.startsWith("(error) ASK")) {
                // 集群模式下的临时重定向错误s
                String[] errorParts = error.split(" ");
                int slot = Integer.valueOf(errorParts[2]);
                data.getPromise().setFailure(new RedisMovedException(slot));
            } else {
                // 处理其他错误
                data.getPromise().setFailure(new RedisException(error + ". channel: " + channel));
            }
        } else if (code == ':') {
            // 整数响应
            String status = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
            in.skipBytes(2);  // 跳过两个字节的回车换行 \r\n
            Object result = Long.valueOf(status);
            handleResult(data, parts, result, false, channel);
        } else if (code == '$') {
            // 批量字符串响应
            ByteBuf buf = readBytes(in);
            Object result = null;
            if (buf != null) {
                result = decoder(data, parts, currentDecoder).decode(buf, state());
            }
            handleResult(data, parts, result, false, channel);
        } else if (code == '*') {
            // 数组响应
            long size = readLong(in);
            List<Object> respParts = new ArrayList<Object>();
            // 循环处理数组每一个元素
            decodeMulti(in, data, parts, channel, currentDecoder, size, respParts);
        } else {
            throw new IllegalStateException("Can't decode replay " + (char)code);
        }
    }

    private void decodeMulti(ByteBuf in, CommandData<Object, Object> data, List<Object> parts,
            Channel channel, Decoder<Object> currentDecoder, long size, List<Object> respParts)
                    throws IOException {
        // 循环处理多个响应部分               
        for (int i = respParts.size(); i < size; i++) {
            decode(in, data, respParts, channel, currentDecoder);
        }

        // 用消息解码器处理完整的响应
        Object result = messageDecoder(data, respParts).decode(respParts, state());

        if (result instanceof Message) {
            // PubSub消息特殊处理下
            handleMultiResult(data, null, channel, result);
            // 检查是否还有更多消息
            if (in.writerIndex() > in.readerIndex()) {
                // 如果还有数据,继续解码
                decode(in, data, null, channel, currentDecoder);
            }
        } else {
            handleMultiResult(data, parts, channel, result);
        }
    }

    private void handleMultiResult(CommandData<Object, Object> data, List<Object> parts,
            Channel channel, Object result) {
        if (data == null) {
            if (result instanceof PubSubStatusMessage) {
                // 处理PubSub状态消息
                String channelName = ((PubSubStatusMessage) result).getChannel();
                CommandData<Object, Object> d = channels.get(channelName);
                if (Arrays.asList("PSUBSCRIBE", "SUBSCRIBE").contains(d.getCommand().getName())) {
                    // / 订阅操作
                    channels.remove(channelName);
                    messageDecoders.put(channelName, d.getMessageDecoder());
                }
                if (Arrays.asList("PUNSUBSCRIBE", "UNSUBSCRIBE").contains(d.getCommand().getName())) {
                    // 取消订阅操作
                    channels.remove(channelName);
                    messageDecoders.remove(channelName);
                }
            }
        }

        if (data != null) {
            // 如果有命令数据,处理结果
            handleResult(data, parts, result, true, channel);
        } else {
            // 处理PubSub消息
            RedisPubSubConnection pubSubConnection = (RedisPubSubConnection)channel.attr(RedisPubSubConnection.CONNECTION).get();
            // 根据不同的类型调用回调函数
            if (result instanceof PubSubStatusMessage) {
                pubSubConnection.onMessage((PubSubStatusMessage) result);
            } else if (result instanceof PubSubMessage) {
                pubSubConnection.onMessage((PubSubMessage) result);
            } else {
                pubSubConnection.onMessage((PubSubPatternMessage) result);
            }
        }
    }

    private void handleResult(CommandData<Object, Object> data, List<Object> parts, Object result, boolean multiResult, Channel channel) {
        if (data != null) {
            if (multiResult) {
                // 多个用 convertMulti 方法进行转换
                result = data.getCommand().getConvertor().convertMulti(result);
            } else {
                // 单结果使用 convert 方法进行转换
                result = data.getCommand().getConvertor().convert(result);
            }
        }
        if (parts != null) {
            // parts不为null,将结果添加到parts列表中
            parts.add(result);
        } else {
            if (!data.getPromise().trySuccess(result) && data.getPromise().isCancelled()) {
                log.warn("response has been skipped due to timeout! channel: {}, command: {}, result: {}", channel, data, result);
            }
        }
    }

    private MultiDecoder<Object> messageDecoder(CommandData<Object, Object> data, List<Object> parts) {
        // 如果 data 为 null,是一个发布订阅相关的消息
        if (data == null) {
            // 1. 处理订阅以及取消订阅的命令
            if (Arrays.asList("subscribe", "psubscribe", "punsubscribe", "unsubscribe").contains(parts.get(0))) {
                String channelName = (String) parts.get(1);
                // 使用 channels 中存储的命令解码器去解码
                return channels.get(channelName).getCommand().getReplayMultiDecoder();
            } else if (parts.get(0).equals("message")) {
                // 2. 处理普通的发布订阅消息
                String channelName = (String) parts.get(1);
                return messageDecoders.get(channelName);
            } else if (parts.get(0).equals("pmessage")) {
                // 3. 处理模式匹配的发布订阅消息
                String patternName = (String) parts.get(1);
                return messageDecoders.get(patternName);
            }
        }
		
        // 如果识别不了消息类型,就直接用默认的去处理
        return data.getCommand().getReplayMultiDecoder();
    }

    private Decoder<Object> decoder(CommandData<Object, Object> data, List<Object> parts, Decoder<Object> currentDecoder) {
        // 如果 data 为 null,是一个发布订阅相关的消息
        if (data == null) {
            // 处理普通的发布消息
            if (parts.size() == 2 && parts.get(0).equals("message")) {
                String channelName = (String) parts.get(1);
                return messageDecoders.get(channelName);
            }
            
            // 处理模式匹配的发布消息
            if (parts.size() == 3 && parts.get(0).equals("pmessage")) {
                String patternName = (String) parts.get(1);
                return messageDecoders.get(patternName);
            }
           // 如果消息类型判断不了,直接返回当前解码器
            return currentDecoder;
        }
		
        // 处理不是发布订阅消息的情况
        Decoder<Object> decoder = data.getCommand().getReplayDecoder();
        if (parts != null) {
            MultiDecoder<Object> multiDecoder = data.getCommand().getReplayMultiDecoder();
            if (multiDecoder.isApplicable(parts.size(), state())) {
                decoder = multiDecoder;
            }
        }
        
        
         // 如果还没有找到合适的解码器,根据命令的输出参数类型选择解码器
        if (decoder == null) {
            // 根据命令的输出参数类型选择合适的解码器
            if (data.getCommand().getOutParamType() == ValueType.MAP) {
                // 大小是奇数,就使用 map key 解码器
                //  Redis 的 map 响应是 key-value 交替的
                if (parts.size() % 2 != 0) {
                    decoder = data.getCodec().getMapKeyDecoder();
                } else {
               // 大小是偶数,使用 map value 解码器
                    decoder = data.getCodec().getMapValueDecoder();
                }
            } else if (data.getCommand().getOutParamType() == ValueType.MAP_KEY) {
                // map key 解码器
                decoder = data.getCodec().getMapKeyDecoder();
            } else if (data.getCommand().getOutParamType() == ValueType.MAP_VALUE) {
                // map value 解码器
                decoder = data.getCodec().getMapValueDecoder();
            } else {
                // 使用普通的 value 解码器
                decoder = data.getCodec().getValueDecoder();
            }
        }
        return decoder;
    }

    public ByteBuf readBytes(ByteBuf is) throws IOException {
        // 读取长度
        long l = readLong(is);
        if (l > Integer.MAX_VALUE) {
            throw new IllegalArgumentException(
                    "Java only supports arrays up to " + Integer.MAX_VALUE + " in size");
        }
        int size = (int) l;
        if (size == -1) {
            // 大小为-1,表示空值。返回null
            return null;
        }
        // 读取指定大小的数据
        ByteBuf buffer = is.readSlice(size);
        // 读取检查是否存在结束符 \r\n 
        int cr = is.readByte();
        int lf = is.readByte();
        if (cr != CR || lf != LF) {
            throw new IOException("Improper line ending: " + cr + ", " + lf);
        }
        return buffer;
    }

    // 从 ByteBuf 中读取一个长整型数
    public static long readLong(ByteBuf is) throws IOException {
        long size = 0;
        int sign = 1;
        int read = is.readByte();
        // 可能是负数,处理下
        if (read == '-') {
            read = is.readByte();
            sign = -1;
        }
        // 循环读取数字字符,一直读到 \r\n
        do {
            if (read == CR) {
                if (is.readByte() == LF) {
                    break;
                }
            }
            // 将每个数字字符转换为数值再累加
            int value = read - ZERO;
            if (value >= 0 && value < 10) {
                size *= 10;
                size += value;
            } else {
                // 遇到非数字字符,抛出异常
                throw new IOException("Invalid character in integer");
            }
            read = is.readByte();
        } while (true);
        // 返回最终结果,同时还处理了符号位
        return size * sign;
    }

}


5.2.4 Redis List命令编码 CommandsListEncoder

处理集合类型的命令编码,没有很特殊的逻辑,就是循环调用单个命令的编码器即可。

package org.redisson.client.handler;

import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class CommandsListEncoder extends MessageToByteEncoder<CommandsData> {

    @Override
    protected void encode(ChannelHandlerContext ctx, CommandsData msg, ByteBuf out) throws Exception {
        // 遍历每一个 CommandData 对象
        for (CommandData<?, ?> commandData : msg.getCommands()) {
            // 循环调用 netty pipeline 中的 CommandEncoder 实例,并调用 encode 方法
            ctx.pipeline().get(CommandEncoder.class).encode(ctx, (CommandData<Object, Object>)commandData, out);
        }
    }

}