七的博客

RedissonV1.0-Lettuce通信源码分析-开篇

源码分析

RedissonV1.0-Lettuce通信源码分析-开篇

最近为了研究 Redisson 的实现原理,专门去下载了 Redisson 的源码。经过对源码的查看发现 Redisson 依赖了 Lettuce 作为通信相关的库,那么大概率 Redisson 很大一部分逻辑会依赖于 Lettuce ,所以还是有必要对 Lettuce 的源码做下分析。

先了解下 Lettuce 是什么,看下作者对 Lettuce 的介绍:

lettuce - A scalable Java Redis client

Lettuce is a scalable thread-safe Redis client providing both synchronous and asyncronous connections. Multiple threads may share one connection provided they avoid blocking and transactional operations such as BLPOP, and MULTI/EXEC. Multiple connections are efficiently managed by the excellent netty NIO framework.

大概翻译下来就是:

Lettuce 是一个可扩展的、线程安全的 Redis 客户端。提供了同步和异步两种连接方式。多个线程可以共享同一个连接,只要它们避免使用阻塞式和事务式操作 (比如 BLPOP 和 MULTI/EXEC)。

由优秀的 Netty NIO 框架高效管理多个连接。

1. 代码拉取

根据 Redisson V1.0 项目的 Readme.md 文件描述, Redisson V1.0 是依赖了 Lettuce 这个库作为客户端的通信部分 (https://github.com/wg/lettuce) 。 但是看 Redisson 代码的话又包含了 Lettuce 的代码,那么就有两种方式阅读源码:

  • 直接下载 Redisson 代码进行分析。
  git clone https://github.com/redisson/redisson.git
  • 下载 Lettuce 代码进行分析。
  git clone https://github.com/wg/lettuce.git 

Lettuce 这个项目从 2014 年就开始不怎么维护了,所以代码两边代码应该是差不太多的,选哪种方式都行。

2. 代码结构

拉取下来的代码结构如下:

.
└── lambdaworks
    ├── codec
    │   └── Base16.java
    └── redis
        ├── KeyValue.java
        ├── RedisAsyncConnection.java  // 异步连接
        ├── RedisClient.java  // Redis 客户端抽象
        ├── RedisCommandInterruptedException.java  // 命令执行异常
        ├── RedisConnection.java  // 同步连接
        ├── RedisException.java  // 公用 Redis 异常
        ├── ScoredValue.java    
        ├── ScriptOutputType.java  // lua 脚本输出
        ├── SortArgs.java    // 构建 sort 命令参数
        ├── ZStoreArgs.java  // 构建 ZUNIONSTORE 和 ZINTERSTORE 命令参数
        ├── codec   // 协议编解码相关
        │   ├── RedisCodec.java   // 编解码接口定义
        │   └── Utf8StringCodec.java  // 默认编解码实现
        ├── output   // Redis 数据输出对象
        │   ├── BooleanListOutput.java
        │   ├── BooleanOutput.java
        │   ├── ByteArrayOutput.java
        │   ├── DateOutput.java
        │   ├── DoubleOutput.java
        │   ├── IntegerOutput.java
        │   ├── KeyListOutput.java
        │   ├── KeyOutput.java
        │   ├── KeyValueOutput.java
        │   ├── MapOutput.java
        │   ├── MultiOutput.java
        │   ├── NestedMultiOutput.java
        │   ├── ScoredValueListOutput.java
        │   ├── StatusOutput.java
        │   ├── StringListOutput.java
        │   ├── ValueListOutput.java
        │   ├── ValueOutput.java
        │   └── ValueSetOutput.java
        ├── protocol   // 协议相关
        │   ├── Charsets.java  // 字符集工具
        │   ├── Command.java  // Redis 命令编码
        │   ├── CommandArgs.java  // Redis 命令参数拼装
        │   ├── CommandHandler.java  // netty redis 命令处理器
        │   ├── CommandKeyword.java  // 命令拼接中关键字枚举
        │   ├── CommandOutput.java  // 命令输出
        │   ├── CommandType.java   // Redis 命令枚举
        │   ├── ConnectionWatchdog.java  // 连接看门狗
        │   └── RedisStateMachine.java  // 状态机,用来解析 Redis 应答数据
        └── pubsub  // 发布订阅相关
            ├── PubSubCommandHandler.java  // netty 发布订阅处理器
            ├── PubSubOutput.java  // 发布订阅输出
            ├── RedisPubSubAdapter.java   // 发布订阅适配器
            ├── RedisPubSubConnection.java   // 发布订阅连接
            └── RedisPubSubListener.java   // 发布订阅监听器

7 directories, 45 files

3. 分析思路

按照上面的源码结构,按照如下顺序进行分析:

  • Redis 数据编解码接口分析。

  • 怎么去解析 Redis 服务端发送过来的数据。

  • 怎么给调用方返回不同类型的 Java 对象。

  • 怎么结合 Netty 去创建连接、关闭连接、断线重连等。

  • 怎么实现同步连接、异步连接。

  • 发布订阅机制怎么实现的。

分成 2 篇文章来分析这些源码:

  • 第一篇分析编解码、数据解析等。

  • 第二篇涉及到 Redis 通信连接管理相关、发布订阅机制的实现。

4. 源码分析

4.1 Redis 数据编解码

4.1.1 编解码接口 RedisCodec

RedisCodec 是一个抽象类,定义 Lettuce 与 Redis 通信数据的编解码。

package com.lambdaworks.redis.codec;

import java.nio.ByteBuffer;

//  <K>表示键的类型。 <V>表示值的类型。
public abstract class RedisCodec<K, V> {
    // 将 Redis 返回的原始字节解码为 key 的类型。
    public abstract K decodeKey(ByteBuffer bytes);

    // 将 Redis 返回的原始字节解码为 value 的类型。
    public abstract V decodeValue(ByteBuffer bytes);

    // 将应用程序中的键编码为字节数组,字节数组发送给 Redis。
    public abstract byte[] encodeKey(K key);

    // 将应用程序中的值编码为字节数组  字节数组再发送给 Redis。
    public abstract byte[] encodeValue(V value);
}

  • encode 方法需要是线程安全的,因为可能会被多个线程同时调用。
  • decode 方法只会被一个线程调用,因此不需要是线程安全的。

4.1.2 编解码实现 Utf8StringCodec

com.lambdaworks.redis.codec.Utf8StringCodec 是一个处理 UTF-8 编码的 Key Value 编码实现。

主要做了两件事:

  • 将ByteBuffer中的字节解码为 String 类型 Key 、String 类型 Value。
  • 将字符串编码成字节数组返回。
package com.lambdaworks.redis.codec;

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.*;

import static java.nio.charset.CoderResult.OVERFLOW;

public class Utf8StringCodec extends RedisCodec<String, String> {
    private Charset charset;  // 字符集
    private CharsetDecoder decoder;  // 将字节解码为字符的解码器
    private CharBuffer chars;  // 用于存储解码后字符的缓冲区

    // 初始化一个新实例,使用UTF-8字符集进行字符串的编码和解码
    public Utf8StringCodec() {
        charset = Charset.forName("UTF-8");
        decoder = charset.newDecoder();
        chars   = CharBuffer.allocate(1024);
    }

    @Override
    public String decodeKey(ByteBuffer bytes) {
        return decode(bytes);
    }

    @Override
    public String decodeValue(ByteBuffer bytes) {
        return decode(bytes);
    }

    @Override
    public byte[] encodeKey(String key) {
        return encode(key);
    }

    @Override
    public byte[] encodeValue(String value) {
        return encode(value);
    }

    // 核心的解码逻辑,将ByteBuffer中的字节解码为字符串
    private String decode(ByteBuffer bytes) {
        chars.clear();  // 清空字符缓冲区
        bytes.mark(); // 标记字节缓冲区的当前位置,后续有需要又可以回到这个位置。这是 ByteBuffer中的用法。

        decoder.reset(); // // 重置解码器
        
        // 循环解码,如果溢出的话说明 CharBuffer 容量不够,就会一直扩容,直到所有字节都被成功解码
        while (decoder.decode(bytes, chars, true) == OVERFLOW || decoder.flush(chars) == OVERFLOW) {
            // 如果字符缓冲区溢出的话,将其容量扩大一倍。
            // 可以想象下 ArrayList 之类的扩容机制,都有相似的地方。
            chars = CharBuffer.allocate(chars.capacity() * 2);
            
            // / 重置字节缓冲区到之前标记的位置。(这里就是回到一开始 bytes.mark() 标记的那个下标)
            bytes.reset();
        }
        // 调用 flip 方法将 buffer 从写模式切换到读模式。
        // toString() 将 buffer 中的字符转换为字符串。
        return chars.flip().toString();
    }

    // 核心的编码逻辑,就是直接调用 String 的方法将字符串编码为字节数组
    private byte[] encode(String string) {
        return string.getBytes(charset);
    }
}

这里面涉及到几个 java.nio 包下的工具类:

  • ByteBuffer 是用来存储原始的字节数据。原始的字节数据指的是以最基本的二进制形式存在的数据,比如 [72, 101, 108, 108, 111]。
  • CharBuffer 是用来存储解码后的字符。比如上面的原始字节数据解码后为 [ ‘H’ , ‘e’ , ‘l’ , ‘l’ , ‘o’ ]。
  • ByteBuffer 跟 CharBuffer 都是 Buffer 接口的实现类。所以也都有一些基本操作,比如 clear(), mark(), reset(), flip() 等等。

这里需要注意的是,通过 CharBuffer.allocate(1024) 创建 CharBuffer 的时候,默认就是处于写模式。每次调用 chars.clear() 也会将 buffer 重置到写模式。写模式下才可以添加数据,最后调用 chars.flip() 就是切换到读的模式。

上面 decode 代码看着很复杂,但是换个简洁点的写法一下子就看得懂:

    private String decode(ByteBuffer bytes) {
        final Charset charset = StandardCharsets.UTF_8;
        return charset.decode(bytes).toString();
    }

上面写法虽然很简洁,但是对于非常大输入的话,可能不如原方法高效,因为它每次都创建新的 String 对象。

4.2 协议解析相关

4.2.1 字符集工具类 Charsets

这个类的主要用途是提供一个方法将字符串转换为 ASCII 编码的 ByteBuffer。

package com.lambdaworks.redis.protocol;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;

public class Charsets {
    public static final Charset ASCII = Charset.forName("US-ASCII");

    public static ByteBuffer buffer(String s) {
        // 使用 ASCII 字符集将字符串转换为字节数组
        return ByteBuffer.wrap(s.getBytes(ASCII));
    }
}

这个类使用 ASCII 编码,意味着只适用于 ASCII 字符( 0-127 的字符 )。对于非 ASCII 字符,会导致数据丢失或不正确的编码。

4.2.2 Redis 命令类型 CommandType

这个类主要是定义了 Redis 支持的各种命令类型。

package com.lambdaworks.redis.protocol;

public enum CommandType {
    // Connection   连接命令
    AUTH, ECHO, PING, QUIT, SELECT,

    // Server
    BGREWRITEAOF, BGSAVE, CLIENT, CONFIG, DBSIZE, DEBUG, FLUSHALL,
    FLUSHDB, INFO, LASTSAVE, MONITOR, SAVE, SHUTDOWN, SLAVEOF,
    SLOWLOG, SYNC,

    // Keys
    DEL, DUMP, EXISTS, EXPIRE, EXPIREAT, KEYS, MIGRATE, MOVE, OBJECT, PERSIST,
    PEXPIRE, PEXPIREAT, PTTL, RANDOMKEY, RENAME, RENAMENX, RESTORE, TTL, TYPE,

    // String
    APPEND, GET, GETRANGE, GETSET, MGET, MSET, MSETNX, SET, SETEX, SETNX,
    SETRANGE, STRLEN,

    // Numeric
    DECR, DECRBY, INCR, INCRBY, INCRBYFLOAT,

    // List
    BLPOP, BRPOP, BRPOPLPUSH,
    LINDEX, LINSERT, LLEN, LPOP, LPUSH, LPUSHX, LRANGE, LREM, LSET, LTRIM,
    RPOP, RPOPLPUSH, RPUSH, RPUSHX, SORT,

    // Hash
    HDEL, HEXISTS, HGET, HGETALL, HINCRBY, HINCRBYFLOAT, HKEYS, HLEN,
    HMGET, HMSET, HSET, HSETNX, HVALS,

    // Transaction
    DISCARD, EXEC, MULTI, UNWATCH, WATCH,

    // Pub/Sub
    PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE,

    // Sets
    SADD, SCARD, SDIFF, SDIFFSTORE, SINTER, SINTERSTORE, SISMEMBER,
    SMEMBERS, SMOVE, SPOP, SRANDMEMBER, SREM, SUNION, SUNIONSTORE,

    // Sorted Set
    ZADD, ZCARD, ZCOUNT, ZINCRBY, ZINTERSTORE, ZRANGE, ZRANGEBYSCORE,
    ZRANK, ZREM, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREVRANGE,
    ZREVRANGEBYSCORE, ZREVRANK, ZSCORE, ZUNIONSTORE,

    // Scripting
    EVAL, EVALSHA, SCRIPT,

    // Bits
    BITCOUNT, BITOP, GETBIT, SETBIT;

    public byte[] bytes;

    private CommandType() {
        bytes = name().getBytes(Charsets.ASCII);
    }
}

4.2.3 Redis 命令参数编码器 CommandArgs

这个类主要是用来构建 Redis 命令的一个工具类,将各种参数编码为 Redis 协议格式,关于 Redis 协议格式参考之前的文章 【Redis客户端跟服务端通信协议】。

package com.lambdaworks.redis.protocol;

import com.lambdaworks.redis.codec.RedisCodec;

import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.Map;

import static java.lang.Math.max;

public class CommandArgs<K, V> {
    // Redis 协议中使用的行结束符
    private static final byte[] CRLF = "\r\n".getBytes(Charsets.ASCII);
    // 编码 key value 
    private RedisCodec<K, V> codec;
    // 保存编码后的 RESP 命令参数
    private ByteBuffer buffer;
    // 参数数量
    private int count;

    public CommandArgs(RedisCodec<K, V> codec) {
        this.codec  = codec;
        this.buffer = ByteBuffer.allocate(32);
    }

    // 返回编码后的 Redis 命令参数。
    public ByteBuffer buffer() {
        // 将 buffer 从写数据的模式切换到读数据的模式。 
        buffer.flip();
        return buffer;
    }

    // 返回添加的参数数量
    public int count() {
        return count;
    }

    // 添加一个 key 到参数 List ,就是往 buffer 中写入。底下几个方法也差不多作用。
    public CommandArgs<K, V> addKey(K key) {
        return write(codec.encodeKey(key));
    }

    public CommandArgs<K, V> addKeys(K... keys) {
        for (K key : keys) {
            addKey(key);
        }
        return this;
    }

    public CommandArgs<K, V> addValue(V value) {
        return write(codec.encodeValue(value));
    }

    public CommandArgs<K, V> addValues(V... values) {
        for (V value : values) {
            addValue(value);
        }
        return this;
    }

    public CommandArgs<K, V> add(Map<K, V> map) {
        if (map.size() > 2) {
            realloc(buffer.capacity() + 16 * map.size());
        }

        for (Map.Entry<K, V> entry : map.entrySet()) {
            write(codec.encodeKey(entry.getKey()));
            write(codec.encodeValue(entry.getValue()));
        }

        return this;
    }

    public CommandArgs<K, V> add(String s) {
        return write(s);
    }

    public CommandArgs<K, V> add(long n) {
        return write(Long.toString(n));
    }

    public CommandArgs<K, V> add(double n) {
        return write(Double.toString(n));
    }

    public CommandArgs<K, V> add(byte[] value) {
        return write(value);
    }

    public CommandArgs<K, V> add(CommandKeyword keyword) {
        return write(keyword.bytes);
    }

    public CommandArgs<K, V> add(CommandType type) {
        return write(type.bytes);
    }

    // 往 ByteBuffer 中写入字节数据
    private CommandArgs<K, V> write(byte[] arg) {
        // 标记当前 buffer 的位置,用来给后面发生 BufferOverflowException 时回到这个位置。
        buffer.mark();

        // buffer 中剩余的空间比传入的参数长度小,就估算一个新的 buffer 长度。
        if (buffer.remaining() < arg.length) {
            // 估计新的 buffer 大小公式 = 当前剩余空间 + arg 的长度 + 10(额外的缓冲)。
            int estimate = buffer.remaining() + arg.length + 10;
            // 最终 buffer 的大小取 【当前容量的两倍】、【上面估计扩容大小】 两个中的较大值,保证空间够避免扩容幅度不够。
            realloc(max(buffer.capacity() * 2, estimate));
        }

        // 下面是实现 Redis 通信报文的拼接
        while (true) {
            try {
                // 先写入$表示接下来是字符串,然后写入长度,再写入实际的参数数据。
                buffer.put((byte) '$');
                write(arg.length);
                buffer.put(CRLF);
                buffer.put(arg);
                buffer.put(CRLF);
                break;
            } catch (BufferOverflowException e) {
                // 发生 buffer 写入容量不够异常,就会将 buffer 的指针切换到一开始标记地方,然后执行扩容
                buffer.reset();
                // 每次尝试将容量翻倍
                realloc(buffer.capacity() * 2);
            }
        }

        // 增加参数计数器值
        count++;
        return this;
    }

    // 将字符串写入 ByteBuffer ,跟上面方法套路差不多
    private CommandArgs<K, V> write(String arg) {
        // 获取要写入字符串的长度,后面会多次用到。
        int length = arg.length();
        
        // 标记当前指针位置,后面可能会用到。
        buffer.mark();

        // buffer 中剩余的空间比传入的参数长度小,就估算一个新的 buffer 长度。 跟上面方法一样。
        if (buffer.remaining() < length) {
            int estimate = buffer.remaining() + length + 10;
            realloc(max(buffer.capacity() * 2, estimate));
        }

        // 下面是实现 Redis 通信报文的拼接
        while (true) {
            try {
                buffer.put((byte) '$');
                write(length);
                buffer.put(CRLF);

                // 循环将字符串中每一个字符地写入 buffer。
                // (byte) arg.charAt(i) 将字符转换为字节,适用于 ASCII 字符。
                for (int i = 0; i < length; i++) {
                    buffer.put((byte) arg.charAt(i));
                }
                buffer.put(CRLF);
                break;
            } catch (BufferOverflowException e) {
                // buffer 写入超出容量异常处理,默认扩容 2 倍。
                buffer.reset();
                realloc(buffer.capacity() * 2);
            }
        }

        count++;
        return this;
    }

    // 数值写入处理
    private void write(long value) {
        // 如果值小于 10,直接转换为对应的 ASCII 码字符然后写入 buffer。
        if (value < 10) {
            // '0' 的 ASCII 值是 48,'0' + value 会得到正确的 ASCII 字符。
            buffer.put((byte) ('0' + value));
            return;
        }

        // 使用 StringBuilder 拼接字符串,优化性能
        StringBuilder sb = new StringBuilder(8);
        while (value > 0) {
            // 每次 %10 就会拿到当前 value 的最后一位数。 
            // 比如传入数值 12345, 第一次循环的时候, digit=5 , next value = 1234
            // 第二次循环的时候, digit=4 , next value = 123
            // 第三次循环的时候, digit=3 , next value = 12
            // 第四次循环的时候, digit=2 , next value = 1
            // 最后一次循环的时候, digit=1 , next value = 1 / 10 = 0
            long digit = value % 10;

            // 往 StringBuilder 拼接的顺序是反着来的,上面 StringBuilder 的内容是 "54321"
            sb.append((char) ('0' + digit));
            value /= 10;
        }

        // 从后向前遍历 StringBuilder,将字符以正确的顺序写入 buffer。
        // 上面 StringBuilder 的内容是 "54321",反过来遍历就会变成 12345
        for (int i = sb.length() - 1; i的内容是 "54321" >= 0; i--) {
            // 直接使用 (byte) sb.charAt(i) 将字符转换为字节,对于数字来说可以这么写
            buffer.put((byte) sb.charAt(i));
        }
    }




    // 重新分配 ByteBuffer 的大小
    private void realloc(int size) {
        // 创建新的 ByteBuffer,容量用传入的大小
        ByteBuffer buffer = ByteBuffer.allocate(size);
        // 将 buffer 从写模式切换到读模式,主要是为了下面的 buffer 复制。
        this.buffer.flip();
        // 将旧的 buffer 中的所有内容复制到新的 buffer 中。
        buffer.put(this.buffer);

        // 记录新 buffer 的当前位置,后续使用 reset() 方法返回到这个位置
        buffer.mark();
        // this.buffer 指向新 buffer , 旧的 buffer 将会被回收掉。
        this.buffer = buffer;
    }
}

这个类中的处理数值写入不知道为什么写这么复杂,直接像下面这么写也应该是可以的:

private void write(long value) {
    String strValue = Long.toString(value);
    for (int i = 0; i < strValue.length(); i++) {
        buffer.put((byte) strValue.charAt(i));
    }
}

4.2.4 Redis命令处理 CommandHandler

看这命令风格,大概可以猜到就是 Netty 的 Channel Handler了。 实际上它确实是用于处理 Redis 命令的 Handler。

package com.lambdaworks.redis.protocol;

import io.netty.buffer.ByteBuf;
import io.netty.channel.*;

import java.util.concurrent.BlockingQueue;


// @ChannelHandler.Sharable 注解表示 handler 支持被多个 channel 共享。
// 有点像 Spring 中的单例 Bean 注解
@ChannelHandler.Sharable
public class CommandHandler<K, V> extends ChannelDuplexHandler {
    // 存储待处理的 Redis 命令
    protected BlockingQueue<Command<K, V, ?>> queue;
    // 缓存接收到的数据
    protected ByteBuf buffer;
    // 状态机,看起来是用于解析 Redis 协议的工具类
    protected RedisStateMachine<K, V> rsm;

    
    public CommandHandler(BlockingQueue<Command<K, V, ?>> queue) {
        this.queue = queue;
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        // channel 注册上来时才会创建 buffer 和状态机。
        buffer = ctx.alloc().heapBuffer();
        rsm = new RedisStateMachine<K, V>();
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        // channel 注销时释放 buffer 资源。
        buffer.release();
    }

    // 处理 Redis 端应答的数据
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf input = (ByteBuf) msg;
        try {
            // 检查 input 是否有可读的字节。没有的话就直接返回
            if (!input.isReadable()) return;
            // 丢弃 buffer 中已经读取过的字节,读过的数据不清理的话会一直占着内存的。
            buffer.discardReadBytes();
            // 往 buffer 中拼接数据,相当于累计数据,数据可能出现粘包或者拆包的情况
            buffer.writeBytes(input);
            // 尝试解码数据
            decode(ctx, buffer);
        } finally {
            input.release();
        }
    }

    // 往 Redis 服务端写数据
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        // 将传入的消息转换为 Command 对象,这依赖于前面已经解码好了为 Command 对象
        Command<?, ?, ?> cmd = (Command<?, ?, ?>) msg;
        Channel channel = ctx.channel();
        // 分配一个 ButeBuf 准备写数据
        ByteBuf buf = ctx.alloc().heapBuffer();
        // 将 Command 编码成字节放入 ButeBuf 中
        cmd.encode(buf);
        // 将 ByteBuf 写入 Channel ,也就是发给 Redis 
        ctx.write(buf, promise);
    }

    // 尝试将 ByteBuf 解码为
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
        // 队列不为空 &&  Redis 状态机工具类能够成功解码 buffer 中的数据。
        while(!queue.isEmpty() && rsm.decode(buffer, queue.peek().getOutput())) {
            Command<K, V, ?> cmd = queue.take();
            // 通知等待的线程等做下一步工作
            cmd.complete();
        }
    }
}

这个类中 Handler 加了 @ChannelHandler.Sharable 注解,同时变量 buffer 等又是共享的,看着会有线程安全问题。

4.2.5 Redis报文解析状态机 RedisStateMachine

这个类用于解码 Redis 服务器的响应报文。 它依据 Redis 的请求协议,将收到的字节流解析为命令输出。

package com.lambdaworks.redis.protocol;

import com.lambdaworks.redis.RedisException;
import io.netty.buffer.ByteBuf;

import java.nio.ByteBuffer;
import java.util.LinkedList;

import static com.lambdaworks.redis.protocol.Charsets.buffer;
import static com.lambdaworks.redis.protocol.RedisStateMachine.State.Type.*;

// 根据 http://redis.io/topics/protocol  进行协议解码
public class RedisStateMachine<K, V> {
    // Redis 事务处理中命令被排队的应答回复,通常是事务模式下使用 MULTI 和 EXEC 命令返回的应答。
    private static final ByteBuffer QUEUED = buffer("QUEUED");

    // 数据类型,对应着 Redis 协议描述文档里面的几种类型
    static class State {
        //  简单字符串+、错误信息-、整数:、批量$、多条批量响应*、字节数据
        enum Type { SINGLE, ERROR, INTEGER, BULK, MULTI, BYTES }

        // type 是当前解析的响应类型
        Type type  = null;
        // count 是当前批量响应的计数或字节长度, -1就是空,Redis 协议里面描述的
        int  count = -1;
    }

    // 维护解码过程中的状态堆栈, 栈这种结构可以用处理处理嵌套关系
    // 比如对于命令【MGET key1 key2】,Redis应答的可能是【*2 $5 value1 $5 value2】
    // 这就就是带有嵌套的响应,用栈这种结构可以很好地处理这种嵌套结构。
    private LinkedList<State> stack;

    // 只能通过实例化这个对象来调用方法
    public RedisStateMachine() {
        stack = new LinkedList<State>();
    }

    // 核心解码方法
    // 如果读取了完整的响应,返回 true。
    public boolean decode(ByteBuf buffer, CommandOutput<K, V, ?> output) {
        // length 用来存储长度 、 end 用来存储结束为止
        int length, end;
        ByteBuffer bytes;

        // 添加一个空的状态,让下面的代码至少有一个状态可以处理。
        if (stack.isEmpty()) {
            stack.add(new State());
        }

        if (output == null) {
            return stack.isEmpty();
        }

        // 定义 loop 标签,方便跳出或者停止。 不过在 Java 中这种写法比较少。
        loop:
        // 一直处理到栈为空为止
        while (!stack.isEmpty()) {
            // 拿到栈顶的 State 对象,这里用 peek 是可能会解析失败,元素暂时不能被移除。
            State state = stack.peek();

            // 状态类型为空,尝试读取 Redis 回复的类型并标记读取位置。
            if (state.type == null) {
                // buffer 不可读,就直接跳出循环等待更多数据。
                if (!buffer.isReadable()) break;
                // 获取当前 Redis 回复的类型
                state.type = readReplyType(buffer);
                // 
                buffer.markReaderIndex();
            }

            switch (state.type) {
                // 处理单行回复
                case SINGLE:
                    // 先读取一行数据,如果返回 null 则读取失败,跳出主循环等待更多数据。
                    if ((bytes = readLine(buffer)) == null) break loop;
                    // 如果 Redis 应答的是特殊响应 QUEUED,这种就不处理,因为没啥意思。
                    // 比如说应答的是 +OK\r\n ,那么就可以读取 OK 到输出中。
                    if (!QUEUED.equals(bytes)) {
                        output.set(bytes);
                    }
                    break;
                // 处理错误回复    
                case ERROR:
                    // 同上,没读取到就跳出循环重新等待数据。
                    if ((bytes = readLine(buffer)) == null) break loop;
                    output.setError(bytes);
                    break;
                // 处理整数回复    
                case INTEGER:
                    // 先找到行尾,然后读取整数值。
                    // 比如响应 【:1000\r\n】,会读取到 1000 再设置到返回值中。
                    if ((end = findLineEnd(buffer)) == -1) break loop;
                    output.set(readLong(buffer, buffer.readerIndex(), end));
                    break;
                // 处理批量回复    
                case BULK:
                    // 没有结束就继续等待数据
                    if ((end = findLineEnd(buffer)) == -1) break loop;
                    // 读取数据长度
                    length = (int) readLong(buffer, buffer.readerIndex(), end);

                    // 读取长度,如果是-1,表示null值。
                    if (length == -1) {
                        output.set(null);
                    } else {
                        // 状态改为 BYTES 类型
                        state.type = BYTES;
                        // 设置需要读取的字节数(加2是为了包括\r\n)
                        // 比如响应【$5hello】,读取到长度是5,然后将状态改为BYTES,设置需要读取的字节数为7(5 + 2)。
                        state.count = length + 2;
                        buffer.markReaderIndex();
                        continue loop;
                    }
                    break;
                // 处理多行回复    
                case MULTI:
                    // 意味着数组的元素数量还不能确定
                    if (state.count == -1) {
                        // 查找 \r\n,如果找不到就跳出循环等待更多数据。
                        if ((end = findLineEnd(buffer)) == -1) break loop;
                        // 从 buffer 中读取数组的元素数量
                        length = (int) readLong(buffer, buffer.readerIndex(), end);
                        // 要处理的元素数量就是上面匹配出来的
                        state.count = length;
                        //  保存当前 buffer 指针位置
                        buffer.markReaderIndex();
                    }
                    // 没有元素需要处理,跳出循环
                    if (state.count <= 0) break;
                    // 处理完一个元素后,减少计数
                    state.count--;
                    // 加入栈顶,用于处理下一个数组元素。
                    stack.addFirst(new State());
                    // 开始处理数组中的下一个元素

                    // 比如响应【*2\r\n$5\r\nhello\r\n$5\r\nworld\r\n】
                    // *2 表示有两个元素分别是 【$5\r\nhello\r\n】 和 【$5\r\nworld\r\n】
                    // 这两个会被单独处理,分别加入栈顶进行处理
                    continue loop;
                // 处理字节数据    
                case BYTES:
                    // 读取指定长度的字节数据。要是读取不到足够的字节,跳出循环等待更多的数据。
                    if ((bytes = readBytes(buffer, state.count)) == null) break loop;
                    output.set(bytes);
            }
            
            // 标记读取位置,移除当前状态,
            buffer.markReaderIndex();
            stack.remove();
            // 调用回调方法
            output.complete(stack.size());
        }

        // 栈是为空完成了所有解析,否则就是没有完成解析
        return stack.isEmpty();
    }

    // 查找行尾的换行符 \r\n , 因为Redis协议使用 \r\n 作为行分隔符。
    private int findLineEnd(ByteBuf buffer) {
        // 从 buffer 当前读取位置开始
        int start = buffer.readerIndex();
        
        // 查找下一个'\n'字符 , 然后检查它前面是否是'\r'。
        // writerIndex 表示 buffer 中已写入数据的结尾,超过这个范围是没数据的。
        int index = buffer.indexOf(start, buffer.writerIndex(), (byte) '\n');
        
        // 确保 \n 前面是 \r,如果是,则返回 \n 的位置。
        // -1是没有找到的意思
        return (index > 0 && buffer.getByte(index - 1) == '\r') ? index : -1;
    }

    // 分析 Redis 响应的类型
    private State.Type readReplyType(ByteBuf buffer) {
        // 读取响应的第一个字节来确定响应类型, 每种类型都有特定的前缀字符
        switch (buffer.readByte()) {
            case '+': return SINGLE;
            case '-': return ERROR;
            case ':': return INTEGER;
            case '$': return BULK;
            case '*': return MULTI;
            default:  throw new RedisException("Invalid first byte");
        }
    }

    // 读取长整型数值
    private long readLong(ByteBuf buffer, int start, int end) {
        long value = 0;
        
        // 处理负数,并通过字符串解析来获取数值。使用减法而不是加法可以避免溢出问题。
        boolean negative = buffer.getByte(start) == '-';
        // 如果是负数就从下一个字符开始处理,跳过 - 号
        int offset = negative ? start + 1 : start;
        while (offset < end - 1) {
            int digit = buffer.getByte(offset++) - '0';
            value = value * 10 - digit;
        }
        // 如果是正数就取反,前面用了减法
        if (!negative) value = -value;
        buffer.readerIndex(end + 1);

        return value;
    }

    // 读取 \r\n 前的数据
    private ByteBuffer readLine(ByteBuf buffer) {
        ByteBuffer bytes = null;
        // 找 \n 的下标
        int end = findLineEnd(buffer);
        // 找到了 \n 的下标就开始读取 start下标到 \r的字节内容,这里面不包括 \r\n
        if (end > -1) {
            int start = buffer.readerIndex();
            bytes = buffer.nioBuffer(start, end - start - 1);

            // 更新指针到 end+1 位置,也就是跳过 \r\n
            buffer.readerIndex(end + 1);
        }
        return bytes;
    }

    // 读取指定字节数,忽略 \r\n
    private ByteBuffer readBytes(ByteBuf buffer, int count) {
        ByteBuffer bytes = null;
        // 确保 buffer 中可读的字节大于想读的字节数
        if (buffer.readableBytes() >= count) {
            // 读取指定字节数,跳过 \r\n
            bytes = buffer.nioBuffer(buffer.readerIndex(), count - 2);

            // 更新指针跳过已经读取过的字节
            buffer.readerIndex(buffer.readerIndex() + count);
        }
        return bytes;
    }
}


4.2.6 Redis命令输出抽象类 CommandOutput

这是一个抽象类,用于表示 Redis 命令的输出。源码中继承这个抽象类的子类有几十个。

package com.lambdaworks.redis.protocol;

import com.lambdaworks.redis.codec.RedisCodec;

import java.nio.ByteBuffer;

//K代表 key 类型、V 代表 value 类型 、 T 表示输出的类型
public abstract class CommandOutput<K, V, T> {
    protected RedisCodec<K, V> codec;   // key value 编解码器
    protected T output;      // 保存命令的输出结果
    protected String error;  // 保存错误信息

    public CommandOutput(RedisCodec<K, V> codec, T output) {
        this.codec  = codec;
        this.output = output;
    }

    //  返回命令输出
    public T get() {
        return output;
    }

    // 设置命令输出为字节序列。  默认实现会抛出异常,子类需要根据需要重写。
    public void set(ByteBuffer bytes) {
        throw new IllegalStateException();
    }

    // 设置命令输出为64位整数。
    public void set(long integer) {
        throw new IllegalStateException();
    }

    // 设置错误信息
    public void setError(ByteBuffer error) {
        this.error = decodeAscii(error);
    }

    public void setError(String error) {
        this.error = error;
    }

    // 检查是否有错误信息
    public boolean hasError() {
        return this.error != null;
    }

    // 获取错误信息
    public String getError() {
        return error;
    }

    // 标记命令输出完成
    public void complete(int depth) {
        // nothing to do by default
    }

    // 将 ByteBuffer 解码为 ASCII 字符串,公用的方法,子类也可以用。
    protected String decodeAscii(ByteBuffer bytes) {
        char[] chars = new char[bytes.remaining()];
        for (int i = 0; i < chars.length; i++) {
            chars[i] = (char) bytes.get();
        }
        return new String(chars);
    }
}



4.3 Redis 数据输出对象

com.lambdaworks.redis.output 包下就是一些 Java 对象的输出类,都是 CommandOutput 抽象类的实现。

4.3.1 布尔值列表输出 BooleanListOutput

将 Redis 返回的整数值 List 转换为 boolean 值 List

package com.lambdaworks.redis.output;

import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;

import java.util.ArrayList;
import java.util.List;

public class BooleanListOutput<K, V> extends CommandOutput<K, V, List<Boolean>> {
    public BooleanListOutput(RedisCodec<K, V> codec) {
        super(codec, new ArrayList<Boolean>());
    }

    @Override
    public void set(long integer) {
        // 将输入的整数转换为布尔值
        // 如果整数是 1,那么对应的布尔值就是 true ,添加到 List 中。
        // 其他整数对应的布尔值就是 false  ,添加到 List 中。
        output.add((integer == 1) ? Boolean.TRUE : Boolean.FALSE);
    }
}

4.3.2 布尔值输出 BooleanOutput

单个布尔值的输出处理器,比如 EXISTS 命令。

package com.lambdaworks.redis.output;

import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;

import java.util.ArrayList;
import java.util.List;

public class BooleanListOutput<K, V> extends CommandOutput<K, V, List<Boolean>> {
    public BooleanListOutput(RedisCodec<K, V> codec) {
        super(codec, new ArrayList<Boolean>());
    }

    @Override
    public void set(long integer) {
        // 如果是 1 则设置为 true,否则就是 false。
        output.add((integer == 1) ? Boolean.TRUE : Boolean.FALSE);
    }
}

4.3.3 字节数组输出 ByteArrayOutput

处理 Redis 命令返回的字节数组。

package com.lambdaworks.redis.output;

import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;

import java.nio.ByteBuffer;


public class ByteArrayOutput<K, V> extends CommandOutput<K, V, byte[]> {
    public ByteArrayOutput(RedisCodec<K, V> codec) {
        super(codec, null);
    }

    @Override
    public void set(ByteBuffer bytes) {
        if (bytes != null) {
            // 创建一个新的字节数组,大小为 ByteBuffer 剩余的字节数。
            output = new byte[bytes.remaining()];
            // 将 ByteBuffer 中的数据复制到新创建的字节数组中去。
            bytes.get(output);
        }
    }
}

4.3.4 日期输出 DateOutput

处理 Redis 命令返回的日期时间输出, 这个输出处理器不处理毫秒级精度。

package com.lambdaworks.redis.output;

import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;

import java.util.Date;


public class DateOutput<K, V> extends CommandOutput<K, V, Date> {
    public DateOutput(RedisCodec<K, V> codec) {
        super(codec, null);
    }

    @Override
    public void set(long time) {
        // 将以秒为单位时间戳转换为毫秒,然后创建一个新的 Date 对象。
        //  Java 的 Date 构造函数需要的是毫秒级时间戳。
        output = new Date(time * 1000);
    }
}

4.3.5 双精度浮点数输出 DoubleOutput

处理双精度浮点数的输出。 比如 ZSCORE 等命令输出。

package com.lambdaworks.redis.output;

import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;

import java.nio.ByteBuffer;

import static java.lang.Double.parseDouble;


public class DoubleOutput<K, V> extends CommandOutput<K, V, Double> {
    public DoubleOutput(RedisCodec<K, V> codec) {
        super(codec, null);
    }

    @Override
    public void set(ByteBuffer bytes) {
        // ByteBuffer 为 null,则输出也是 null。
        // 使用父类的 decodeAscii 方法将字节缓冲区解码为 ASCII 字符串。
        // 再将 ASCII 字符串解析为 Double 值。
        output = (bytes == null) ? null : parseDouble(decodeAscii(bytes));
    }
}

4.3.6 整形数值输出 IntegerOutput

处理 64 位整数输出。

package com.lambdaworks.redis.output;

import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;

import java.nio.ByteBuffer;


public class IntegerOutput<K, V> extends CommandOutput<K, V, Long> {
    public IntegerOutput(RedisCodec<K, V> codec) {
        super(codec, null);
    }

    @Override
    public void set(long integer) {
        // 直接将输入的长整型值赋给 output
        output = integer;
    }

    @Override
    public void set(ByteBuffer bytes) {
        // 传入的 ByteBuffer  会被直接忽略掉
        // 相当于空值处理
        output = null;
    }
}

4.3.7 键集合输出 KeyListOutput

处理 Redis 命令返回的 key List 输出。 可以支持处理返回多个 KEY 的 Redis 命令。

可能用于处理 KEYS、SCAN命令输出,或者一次性批量获取 Key 或者遍历数据库的场景。

package com.lambdaworks.redis.output;

import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;


public class KeyListOutput<K, V> extends CommandOutput<K, V, List<K>> {
    public KeyListOutput(RedisCodec<K, V> codec) {
        super(codec, new ArrayList<K>());
    }

    @Override
    public void set(ByteBuffer bytes) {
        // 利用 codec 解码 ByteBuffer 中的 key
        output.add(codec.decodeKey(bytes));
    }
}

4.3.8 键输出 KeyOutput

处理单个 Key 的输出结果。

package com.lambdaworks.redis.output;

import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;

import java.nio.ByteBuffer;


public class KeyOutput<K, V> extends CommandOutput<K, V, K> {
    public KeyOutput(RedisCodec<K, V> codec) {
        super(codec, null);
    }

    @Override
    public void set(ByteBuffer bytes) {
        // ByteBuffer 为 null,输出也为 null。
        // ByteBuffer 不为 null,利用提供的 codec 解码 ByteBuffer 中的 Key 。
        output = (bytes == null) ? null : codec.decodeKey(bytes);
    }
}

4.3.9 键值输出 KeyValueOutput

处理 key value 输出。

package com.lambdaworks.redis.output;

import com.lambdaworks.redis.KeyValue;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;

import java.nio.ByteBuffer;

public class KeyValueOutput<K, V> extends CommandOutput<K, V, KeyValue<K, V>> {
    private K key;

    public KeyValueOutput(RedisCodec<K, V> codec) {
        super(codec, null);
    }

    @Override
    public void set(ByteBuffer bytes) {
        if (bytes != null) {
            // 如果 key 还没有值,将 ByteBuffer 解码为 key 赋值。
            if (key == null) {
                key = codec.decodeKey(bytes);
            } else {
                // 如果 key 已设置过,将 ByteBuffer 解码为 value 。然后创建一个新的 KeyValue 对象输出。
                V value = codec.decodeValue(bytes);
                output = new KeyValue<K, V>(key, value);
            }
        }
    }
}

4.3.10 Map输出对象 MapOutput

处理键值对映射输出。

package com.lambdaworks.redis.output;

import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;


public class MapOutput<K, V> extends CommandOutput<K, V, Map<K, V>> {
    private K key;

    public MapOutput(RedisCodec<K, V> codec) {
        super(codec, new HashMap<K, V>());
    }

    @Override
    public void set(ByteBuffer bytes) {
        // key 为 null,将 ByteBuffer 解码为 key 并保存。
        if (key == null) {
            key = codec.decodeKey(bytes);
            return;
        }

        // 将 ByteBuffer 解码为 value 并保存。
        V value = (bytes == null) ? null : codec.decodeValue(bytes);
        output.put(key, value);

        // 处理完一对 key value 后,重置 key 为 null,准备处理下一对。
        key = null;
    }
}

4.3.11 事务块输出 MultiOutput

处理 Redis 的 MULTI 事务命令块中所有命令输出。

package com.lambdaworks.redis.output;

import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.Command;
import com.lambdaworks.redis.protocol.CommandOutput;

import java.nio.ByteBuffer;
import java.util.*;

public class MultiOutput<K, V> extends CommandOutput<K, V, List<Object>> {
    private Queue<Command<K, V, ?>> queue;

    public MultiOutput(RedisCodec<K, V> codec) {
        super(codec, new ArrayList<Object>());
        queue = new LinkedList<Command<K, V, ?>>();
    }

    // 添加命令到队列去
    public void add(Command<K, V, ?> cmd) {
        queue.add(cmd);
    }

    // 循环调用队列中命令的 complete 方法
    public void cancel() {
        for (Command<K, V, ?> c : queue) {
            c.complete();
        }
    }

    @Override
    public void set(long integer) {
        // 将输入的值设置到当前命令的输出
        queue.peek().getOutput().set(integer);
    }

    @Override
    public void set(ByteBuffer bytes) {
        queue.peek().getOutput().set(bytes);
    }

    // 设置错误信息
    @Override
    public void setError(ByteBuffer error) {
        CommandOutput<K, V, ?> output = queue.isEmpty() ? this : queue.peek().getOutput();
        output.setError(decodeAscii(error));
    }

    // 完成逻辑
    @Override
    public void complete(int depth) {
        if (depth == 1) {
            // 处理单个命令的完成
            Command<K, V, ?> cmd = queue.remove();
            CommandOutput<K, V, ?> o = cmd.getOutput();
            output.add(!o.hasError() ? o.get() : new RedisException(o.getError()));
            cmd.complete();
        } else if (depth == 0 && !queue.isEmpty()) {
            // 处理所有剩余命令的完成
            for (Command<K, V, ?> cmd : queue) {
                cmd.complete();
            }
        }
    }
}

4.3.12 嵌套命令输出 NestedMultiOutput

嵌套的多层命令输出处理器。

package com.lambdaworks.redis.output;

import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;

import java.nio.ByteBuffer;
import java.util.*;


public class NestedMultiOutput<K, V> extends CommandOutput<K, V, List<Object>> {
    private LinkedList<List<Object>> stack;  // 管理嵌套命令结构
    private int depth; //  记录当前嵌套深度

    public NestedMultiOutput(RedisCodec<K, V> codec) {
        super(codec, new ArrayList<Object>());
        stack = new LinkedList<List<Object>>();
        depth = 1;
    }

    @Override
    public void set(long integer) {
        output.add(integer);
    }

    @Override
    public void set(ByteBuffer bytes) {
        output.add(bytes == null ? null : codec.decodeValue(bytes));
    }

    @Override
    public void setError(ByteBuffer error) {
        output.add(new RedisException(decodeAscii(error)));
    }

    @Override
    public void complete(int depth) {
        // 当新的深度大于当前深度时,创建新的嵌套层级。
        if (depth > this.depth) {
            Object o = output.remove(output.size() - 1);
            stack.push(output);
            output = new ArrayList<Object>();
            output.add(o);
        // 当新的深度小于当前深度时,完成当前层级并返回上一层。    
        } else if (depth > 0 && depth < this.depth) {
            stack.peek().add(output);
            output = stack.pop();
        }
        // 更新当前深度。
        this.depth = depth;
    }
}

4.3.13 有序集合输出 ScoredValueListOutput

处理有序集合(Sorted Set)的返回结果。

package com.lambdaworks.redis.output;

import com.lambdaworks.redis.ScoredValue;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;


public class ScoredValueListOutput<K, V> extends CommandOutput<K, V, List<ScoredValue<V>>> {
    private V value; // 暂存解码后的值

    public ScoredValueListOutput(RedisCodec<K, V> codec) {
        super(codec, new ArrayList<ScoredValue<V>>());
    }

    @Override
    public void set(ByteBuffer bytes) {
        // 如果 value 为 null,解码 ByteBuffer 并存储 value。
        if (value == null) {
            value = codec.decodeValue(bytes);
            return;
        }

        // 解码 score ,创建 ScoredValue 对象并添加到 List 中。
        double score = Double.parseDouble(decodeAscii(bytes));
        output.add(new ScoredValue<V>(score, value));
        value = null;
    }
}

4.3.14 状态输出 StatusOutput

处理 Redis 命令的状态消息输出。比如简单状态响应,如 SET、DEL、EXPIRE 等命令的执行结果。

package com.lambdaworks.redis.output;

import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;

import java.nio.ByteBuffer;

import static com.lambdaworks.redis.protocol.Charsets.buffer;


public class StatusOutput<K, V> extends CommandOutput<K, V, String> {
    private static final ByteBuffer OK = buffer("OK");

    public StatusOutput(RedisCodec<K, V> codec) {
        super(codec, null);
    }

    @Override
    public void set(ByteBuffer bytes) {
        // 如果输入的 ByteBuffer 等于预定义的 OK 常量,则把输出设置为字符串 "OK"。
        // 不等于就解码为 ASCII 字符串
        output = OK.equals(bytes) ? "OK" : decodeAscii(bytes);
    }
}

4.3.15 字符串列表输出 StringListOutput

处理字符串 List 结果输出。

package com.lambdaworks.redis.output;

import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

public class StringListOutput<K, V> extends CommandOutput<K, V, List<String>> {
    public StringListOutput(RedisCodec<K, V> codec) {
        super(codec, new ArrayList<String>());
    }

    @Override
    public void set(ByteBuffer bytes) {
        // 输入的 ByteBuffer 为 null,则添加 null 到 List 中。
        // 不为空将 ByteBuffer 解码为 ASCII 字符串并添加到 List 中。
        output.add(bytes == null ? null : decodeAscii(bytes));
    }
}

4.3.16 值列表输出 ValueListOutput

处理命令返回值 List 输出结果。

package com.lambdaworks.redis.output;

import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;


public class ValueListOutput<K, V> extends CommandOutput<K, V, List<V>> {
    public ValueListOutput(RedisCodec<K, V> codec) {
        super(codec, new ArrayList<V>());
    }

    @Override
    public void set(ByteBuffer bytes) {
        // ByteBuffer 为 null,则添加 null 到 List 中。
        // 不为空调用 RedisCodec 解码 ByteBuffer ,再添加到 List 中。
        output.add(bytes == null ? null : codec.decodeValue(bytes));
    }
}

4.3.17 值输出 ValueOutput

单个值命令输出处理。

package com.lambdaworks.redis.output;

import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;

import java.nio.ByteBuffer;

public class ValueOutput<K, V> extends CommandOutput<K, V, V> {
    public ValueOutput(RedisCodec<K, V> codec) {
        super(codec, null);
    }

    @Override
    public void set(ByteBuffer bytes) {
        // ByteBuffer 为 null,则添加 null 到 List 中。
        // 不为空调用 RedisCodec 解码 ByteBuffer ,再添加到 List 中。
        output = (bytes == null) ? null : codec.decodeValue(bytes);
    }
}

4.3.18 值集合输出 ValueSetOutput

处理返回值集合输出。

package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;


public class ValueOutput<K, V> extends CommandOutput<K, V, V> {
    public ValueOutput(RedisCodec<K, V> codec) {
        super(codec, null);
    }

    @Override
    public void set(ByteBuffer bytes) {
        // ByteBuffer 为 null,则添加 null 到 List 中。
        // 不为空调用 RedisCodec 解码 ByteBuffer ,再添加到 List 中。
        output = (bytes == null) ? null : codec.decodeValue(bytes);
    }
}

4.4 参数组装辅助类

4.4.1 SortArgs

这是一个构建Redis SORT命令的参数 List 的辅助类。


package com.lambdaworks.redis;

import com.lambdaworks.redis.protocol.CommandArgs;
import com.lambdaworks.redis.protocol.CommandKeyword;

import java.util.ArrayList;
import java.util.List;

import static com.lambdaworks.redis.protocol.CommandKeyword.*;
import static com.lambdaworks.redis.protocol.CommandType.GET;


public class SortArgs {
    private String by;  // 排序的 key
    private Long offset, count;  // 分页限制结果
    private List<String> get;  // 字段 List 
    private CommandKeyword order;  // 升序或降序
    private boolean alpha;  // 是否按字母顺序排序

    
    // 内部静态类 Builder , 用于创建和初始化SortArgs对象。
    public static class Builder {
        public static SortArgs by(String pattern) {
            return new SortArgs().by(pattern);
        }

        public static SortArgs limit(long offset, long count) {
            return new SortArgs().limit(offset, count);
        }

        public static SortArgs get(String pattern) {
            return new SortArgs().get(pattern);
        }

        public static SortArgs asc() {
            return new SortArgs().asc();
        }

        public static SortArgs desc() {
            return new SortArgs().desc();
        }

        public static SortArgs alpha() {
            return new SortArgs().alpha();
        }
    }

    public SortArgs by(String pattern) {
        by = pattern;
        return this;
    }

    public SortArgs limit(long offset, long count) {
        this.offset = offset;
        this.count  = count;
        return this;
    }

    public SortArgs get(String pattern) {
        if (get == null) {
            get = new ArrayList<String>();
        }
        get.add(pattern);
        return this;
    }

    public SortArgs asc() {
        order = ASC;
        return this;
    }

    public SortArgs desc() {
        order = DESC;
        return this;
    }

    public SortArgs alpha() {
        alpha = true;
        return this;
    }

    <K, V> void build(CommandArgs<K, V> args, K store) {

        if (by != null) {
            args.add(BY);
            args.add(by);
        }

        if (get != null) {
            for (String pattern : get) {
                args.add(GET);
                args.add(pattern);
            }
        }

        if (offset != null) {
            args.add(LIMIT);
            args.add(offset);
            args.add(count);
        }

        if (order != null) {
            args.add(order);
        }

        if (alpha) {
            args.add(ALPHA);
        }

        if (store != null) {
            args.add(STORE);
            args.addKey(store);
        }
    }
}

这个类逻辑不多,主要是要对 Redis 的 Sort 命令有一点了解就好理解。

比如要写一个 Redis 命令:

有一个存储用户 ID 的 List 【users】,每个用户都有对应的 【user::age】 和 【user::name】 字段。想要根据年龄对用户进行排序,获取他们的 name,并限制结果数量。

SORT users BY user:*:age DESC GET user:*:name LIMIT 0 5 ALPHA

对 【users】 List 进行排序,使用 【user::age】作为排序依据,降序排列,获取每个用户对应的 【user::name】 值,限制结果为前 5 个,对获取的名字使用字母顺序排序

使用 SortArgs 类来构建这个命令:

final SortArgs args = SortArgs.Builder.by("user:*:age")
                                .desc()
                                .get("user:*:name")
                                .limit(0, 5)
                                .alpha();

4.4.2 ZStoreArgs

用于构建 Redis 的 ZUNIONSTORE 和 ZINTERSTORE 命令的参数 List 辅助类。


package com.lambdaworks.redis;

import com.lambdaworks.redis.protocol.CommandArgs;

import java.util.*;

import static com.lambdaworks.redis.protocol.CommandKeyword.*;


public class ZStoreArgs {
    // 三种聚合方法  求和 最小 最大
    private static enum Aggregate { SUM, MIN, MAX }

    private List<Long> weights;
    private Aggregate aggregate;

  
    public static class Builder {
        // 设置输入集合的权重
        public static ZStoreArgs weights(long... weights) {
            return new ZStoreArgs().weights(weights);
        }

        public static ZStoreArgs sum() {
            return new ZStoreArgs().sum();
        }

        public static ZStoreArgs min() {
            return new ZStoreArgs().min();
        }

        public static ZStoreArgs max() {
            return new ZStoreArgs().max();
        }
    }

    // 设置输入集合的权重
    public ZStoreArgs weights(long... weights) {
        this.weights = new ArrayList<Long>(weights.length);
        for (long weight : weights) {
            this.weights.add(weight);
        }
        return this;
    }

    public ZStoreArgs sum() {
        aggregate = Aggregate.SUM;
        return this;
    }

    public ZStoreArgs min() {
        aggregate = Aggregate.MIN;
        return this;
    }

    public ZStoreArgs max() {
        aggregate = Aggregate.MAX;
        return this;
    }

    <K, V> void build(CommandArgs<K, V> args) {
        if (weights != null) {
            args.add(WEIGHTS);
            for (long weight : weights) {
                args.add(weight);
            }
        }

        if (aggregate != null) {
            args.add(AGGREGATE);
            switch (aggregate) {
                case SUM:
                    args.add(SUM);
                    break;
                case MIN:
                    args.add(MIN);
                    break;
                case MAX:
                    args.add(MAX);
                    break;
            }
        }
    }
}