RedissonV1.0-Lettuce通信源码分析-开篇
RedissonV1.0-Lettuce通信源码分析-开篇
最近为了研究 Redisson 的实现原理,专门去下载了 Redisson 的源码。经过对源码的查看发现 Redisson 依赖了 Lettuce 作为通信相关的库,那么大概率 Redisson 很大一部分逻辑会依赖于 Lettuce ,所以还是有必要对 Lettuce 的源码做下分析。
先了解下 Lettuce 是什么,看下作者对 Lettuce 的介绍:
lettuce - A scalable Java Redis client
Lettuce is a scalable thread-safe Redis client providing both synchronous and asyncronous connections. Multiple threads may share one connection provided they avoid blocking and transactional operations such as BLPOP, and MULTI/EXEC. Multiple connections are efficiently managed by the excellent netty NIO framework.
大概翻译下来就是:
Lettuce 是一个可扩展的、线程安全的 Redis 客户端。提供了同步和异步两种连接方式。多个线程可以共享同一个连接,只要它们避免使用阻塞式和事务式操作 (比如 BLPOP 和 MULTI/EXEC)。
由优秀的 Netty NIO 框架高效管理多个连接。
1. 代码拉取
根据 Redisson V1.0 项目的 Readme.md 文件描述, Redisson V1.0 是依赖了 Lettuce 这个库作为客户端的通信部分 (https://github.com/wg/lettuce) 。 但是看 Redisson 代码的话又包含了 Lettuce 的代码,那么就有两种方式阅读源码:
- 直接下载 Redisson 代码进行分析。
git clone https://github.com/redisson/redisson.git
- 下载 Lettuce 代码进行分析。
git clone https://github.com/wg/lettuce.git
Lettuce 这个项目从 2014 年就开始不怎么维护了,所以代码两边代码应该是差不太多的,选哪种方式都行。
2. 代码结构
拉取下来的代码结构如下:
.
└── lambdaworks
├── codec
│ └── Base16.java
└── redis
├── KeyValue.java
├── RedisAsyncConnection.java // 异步连接
├── RedisClient.java // Redis 客户端抽象
├── RedisCommandInterruptedException.java // 命令执行异常
├── RedisConnection.java // 同步连接
├── RedisException.java // 公用 Redis 异常
├── ScoredValue.java
├── ScriptOutputType.java // lua 脚本输出
├── SortArgs.java // 构建 sort 命令参数
├── ZStoreArgs.java // 构建 ZUNIONSTORE 和 ZINTERSTORE 命令参数
├── codec // 协议编解码相关
│ ├── RedisCodec.java // 编解码接口定义
│ └── Utf8StringCodec.java // 默认编解码实现
├── output // Redis 数据输出对象
│ ├── BooleanListOutput.java
│ ├── BooleanOutput.java
│ ├── ByteArrayOutput.java
│ ├── DateOutput.java
│ ├── DoubleOutput.java
│ ├── IntegerOutput.java
│ ├── KeyListOutput.java
│ ├── KeyOutput.java
│ ├── KeyValueOutput.java
│ ├── MapOutput.java
│ ├── MultiOutput.java
│ ├── NestedMultiOutput.java
│ ├── ScoredValueListOutput.java
│ ├── StatusOutput.java
│ ├── StringListOutput.java
│ ├── ValueListOutput.java
│ ├── ValueOutput.java
│ └── ValueSetOutput.java
├── protocol // 协议相关
│ ├── Charsets.java // 字符集工具
│ ├── Command.java // Redis 命令编码
│ ├── CommandArgs.java // Redis 命令参数拼装
│ ├── CommandHandler.java // netty redis 命令处理器
│ ├── CommandKeyword.java // 命令拼接中关键字枚举
│ ├── CommandOutput.java // 命令输出
│ ├── CommandType.java // Redis 命令枚举
│ ├── ConnectionWatchdog.java // 连接看门狗
│ └── RedisStateMachine.java // 状态机,用来解析 Redis 应答数据
└── pubsub // 发布订阅相关
├── PubSubCommandHandler.java // netty 发布订阅处理器
├── PubSubOutput.java // 发布订阅输出
├── RedisPubSubAdapter.java // 发布订阅适配器
├── RedisPubSubConnection.java // 发布订阅连接
└── RedisPubSubListener.java // 发布订阅监听器
7 directories, 45 files
3. 分析思路
按照上面的源码结构,按照如下顺序进行分析:
Redis 数据编解码接口分析。
怎么去解析 Redis 服务端发送过来的数据。
怎么给调用方返回不同类型的 Java 对象。
怎么结合 Netty 去创建连接、关闭连接、断线重连等。
怎么实现同步连接、异步连接。
发布订阅机制怎么实现的。
分成 2 篇文章来分析这些源码:
第一篇分析编解码、数据解析等。
第二篇涉及到 Redis 通信连接管理相关、发布订阅机制的实现。
4. 源码分析
4.1 Redis 数据编解码
4.1.1 编解码接口 RedisCodec
RedisCodec 是一个抽象类,定义 Lettuce 与 Redis 通信数据的编解码。
package com.lambdaworks.redis.codec;
import java.nio.ByteBuffer;
// <K>表示键的类型。 <V>表示值的类型。
public abstract class RedisCodec<K, V> {
// 将 Redis 返回的原始字节解码为 key 的类型。
public abstract K decodeKey(ByteBuffer bytes);
// 将 Redis 返回的原始字节解码为 value 的类型。
public abstract V decodeValue(ByteBuffer bytes);
// 将应用程序中的键编码为字节数组,字节数组发送给 Redis。
public abstract byte[] encodeKey(K key);
// 将应用程序中的值编码为字节数组 字节数组再发送给 Redis。
public abstract byte[] encodeValue(V value);
}
- encode 方法需要是线程安全的,因为可能会被多个线程同时调用。
- decode 方法只会被一个线程调用,因此不需要是线程安全的。
4.1.2 编解码实现 Utf8StringCodec
com.lambdaworks.redis.codec.Utf8StringCodec 是一个处理 UTF-8 编码的 Key Value 编码实现。
主要做了两件事:
- 将ByteBuffer中的字节解码为 String 类型 Key 、String 类型 Value。
- 将字符串编码成字节数组返回。
package com.lambdaworks.redis.codec;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.*;
import static java.nio.charset.CoderResult.OVERFLOW;
public class Utf8StringCodec extends RedisCodec<String, String> {
private Charset charset; // 字符集
private CharsetDecoder decoder; // 将字节解码为字符的解码器
private CharBuffer chars; // 用于存储解码后字符的缓冲区
// 初始化一个新实例,使用UTF-8字符集进行字符串的编码和解码
public Utf8StringCodec() {
charset = Charset.forName("UTF-8");
decoder = charset.newDecoder();
chars = CharBuffer.allocate(1024);
}
@Override
public String decodeKey(ByteBuffer bytes) {
return decode(bytes);
}
@Override
public String decodeValue(ByteBuffer bytes) {
return decode(bytes);
}
@Override
public byte[] encodeKey(String key) {
return encode(key);
}
@Override
public byte[] encodeValue(String value) {
return encode(value);
}
// 核心的解码逻辑,将ByteBuffer中的字节解码为字符串
private String decode(ByteBuffer bytes) {
chars.clear(); // 清空字符缓冲区
bytes.mark(); // 标记字节缓冲区的当前位置,后续有需要又可以回到这个位置。这是 ByteBuffer中的用法。
decoder.reset(); // // 重置解码器
// 循环解码,如果溢出的话说明 CharBuffer 容量不够,就会一直扩容,直到所有字节都被成功解码
while (decoder.decode(bytes, chars, true) == OVERFLOW || decoder.flush(chars) == OVERFLOW) {
// 如果字符缓冲区溢出的话,将其容量扩大一倍。
// 可以想象下 ArrayList 之类的扩容机制,都有相似的地方。
chars = CharBuffer.allocate(chars.capacity() * 2);
// / 重置字节缓冲区到之前标记的位置。(这里就是回到一开始 bytes.mark() 标记的那个下标)
bytes.reset();
}
// 调用 flip 方法将 buffer 从写模式切换到读模式。
// toString() 将 buffer 中的字符转换为字符串。
return chars.flip().toString();
}
// 核心的编码逻辑,就是直接调用 String 的方法将字符串编码为字节数组
private byte[] encode(String string) {
return string.getBytes(charset);
}
}
这里面涉及到几个 java.nio 包下的工具类:
- ByteBuffer 是用来存储原始的字节数据。原始的字节数据指的是以最基本的二进制形式存在的数据,比如 [72, 101, 108, 108, 111]。
- CharBuffer 是用来存储解码后的字符。比如上面的原始字节数据解码后为 [ ‘H’ , ‘e’ , ‘l’ , ‘l’ , ‘o’ ]。
- ByteBuffer 跟 CharBuffer 都是 Buffer 接口的实现类。所以也都有一些基本操作,比如 clear(), mark(), reset(), flip() 等等。
这里需要注意的是,通过 CharBuffer.allocate(1024) 创建 CharBuffer 的时候,默认就是处于写模式。每次调用 chars.clear() 也会将 buffer 重置到写模式。写模式下才可以添加数据,最后调用 chars.flip() 就是切换到读的模式。
上面 decode 代码看着很复杂,但是换个简洁点的写法一下子就看得懂:
private String decode(ByteBuffer bytes) {
final Charset charset = StandardCharsets.UTF_8;
return charset.decode(bytes).toString();
}
上面写法虽然很简洁,但是对于非常大输入的话,可能不如原方法高效,因为它每次都创建新的 String 对象。
4.2 协议解析相关
4.2.1 字符集工具类 Charsets
这个类的主要用途是提供一个方法将字符串转换为 ASCII 编码的 ByteBuffer。
package com.lambdaworks.redis.protocol;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
public class Charsets {
public static final Charset ASCII = Charset.forName("US-ASCII");
public static ByteBuffer buffer(String s) {
// 使用 ASCII 字符集将字符串转换为字节数组
return ByteBuffer.wrap(s.getBytes(ASCII));
}
}
这个类使用 ASCII 编码,意味着只适用于 ASCII 字符( 0-127 的字符 )。对于非 ASCII 字符,会导致数据丢失或不正确的编码。
4.2.2 Redis 命令类型 CommandType
这个类主要是定义了 Redis 支持的各种命令类型。
package com.lambdaworks.redis.protocol;
public enum CommandType {
// Connection 连接命令
AUTH, ECHO, PING, QUIT, SELECT,
// Server
BGREWRITEAOF, BGSAVE, CLIENT, CONFIG, DBSIZE, DEBUG, FLUSHALL,
FLUSHDB, INFO, LASTSAVE, MONITOR, SAVE, SHUTDOWN, SLAVEOF,
SLOWLOG, SYNC,
// Keys
DEL, DUMP, EXISTS, EXPIRE, EXPIREAT, KEYS, MIGRATE, MOVE, OBJECT, PERSIST,
PEXPIRE, PEXPIREAT, PTTL, RANDOMKEY, RENAME, RENAMENX, RESTORE, TTL, TYPE,
// String
APPEND, GET, GETRANGE, GETSET, MGET, MSET, MSETNX, SET, SETEX, SETNX,
SETRANGE, STRLEN,
// Numeric
DECR, DECRBY, INCR, INCRBY, INCRBYFLOAT,
// List
BLPOP, BRPOP, BRPOPLPUSH,
LINDEX, LINSERT, LLEN, LPOP, LPUSH, LPUSHX, LRANGE, LREM, LSET, LTRIM,
RPOP, RPOPLPUSH, RPUSH, RPUSHX, SORT,
// Hash
HDEL, HEXISTS, HGET, HGETALL, HINCRBY, HINCRBYFLOAT, HKEYS, HLEN,
HMGET, HMSET, HSET, HSETNX, HVALS,
// Transaction
DISCARD, EXEC, MULTI, UNWATCH, WATCH,
// Pub/Sub
PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE,
// Sets
SADD, SCARD, SDIFF, SDIFFSTORE, SINTER, SINTERSTORE, SISMEMBER,
SMEMBERS, SMOVE, SPOP, SRANDMEMBER, SREM, SUNION, SUNIONSTORE,
// Sorted Set
ZADD, ZCARD, ZCOUNT, ZINCRBY, ZINTERSTORE, ZRANGE, ZRANGEBYSCORE,
ZRANK, ZREM, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREVRANGE,
ZREVRANGEBYSCORE, ZREVRANK, ZSCORE, ZUNIONSTORE,
// Scripting
EVAL, EVALSHA, SCRIPT,
// Bits
BITCOUNT, BITOP, GETBIT, SETBIT;
public byte[] bytes;
private CommandType() {
bytes = name().getBytes(Charsets.ASCII);
}
}
4.2.3 Redis 命令参数编码器 CommandArgs
这个类主要是用来构建 Redis 命令的一个工具类,将各种参数编码为 Redis 协议格式,关于 Redis 协议格式参考之前的文章 【Redis客户端跟服务端通信协议】。
package com.lambdaworks.redis.protocol;
import com.lambdaworks.redis.codec.RedisCodec;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.Map;
import static java.lang.Math.max;
public class CommandArgs<K, V> {
// Redis 协议中使用的行结束符
private static final byte[] CRLF = "\r\n".getBytes(Charsets.ASCII);
// 编码 key value
private RedisCodec<K, V> codec;
// 保存编码后的 RESP 命令参数
private ByteBuffer buffer;
// 参数数量
private int count;
public CommandArgs(RedisCodec<K, V> codec) {
this.codec = codec;
this.buffer = ByteBuffer.allocate(32);
}
// 返回编码后的 Redis 命令参数。
public ByteBuffer buffer() {
// 将 buffer 从写数据的模式切换到读数据的模式。
buffer.flip();
return buffer;
}
// 返回添加的参数数量
public int count() {
return count;
}
// 添加一个 key 到参数 List ,就是往 buffer 中写入。底下几个方法也差不多作用。
public CommandArgs<K, V> addKey(K key) {
return write(codec.encodeKey(key));
}
public CommandArgs<K, V> addKeys(K... keys) {
for (K key : keys) {
addKey(key);
}
return this;
}
public CommandArgs<K, V> addValue(V value) {
return write(codec.encodeValue(value));
}
public CommandArgs<K, V> addValues(V... values) {
for (V value : values) {
addValue(value);
}
return this;
}
public CommandArgs<K, V> add(Map<K, V> map) {
if (map.size() > 2) {
realloc(buffer.capacity() + 16 * map.size());
}
for (Map.Entry<K, V> entry : map.entrySet()) {
write(codec.encodeKey(entry.getKey()));
write(codec.encodeValue(entry.getValue()));
}
return this;
}
public CommandArgs<K, V> add(String s) {
return write(s);
}
public CommandArgs<K, V> add(long n) {
return write(Long.toString(n));
}
public CommandArgs<K, V> add(double n) {
return write(Double.toString(n));
}
public CommandArgs<K, V> add(byte[] value) {
return write(value);
}
public CommandArgs<K, V> add(CommandKeyword keyword) {
return write(keyword.bytes);
}
public CommandArgs<K, V> add(CommandType type) {
return write(type.bytes);
}
// 往 ByteBuffer 中写入字节数据
private CommandArgs<K, V> write(byte[] arg) {
// 标记当前 buffer 的位置,用来给后面发生 BufferOverflowException 时回到这个位置。
buffer.mark();
// buffer 中剩余的空间比传入的参数长度小,就估算一个新的 buffer 长度。
if (buffer.remaining() < arg.length) {
// 估计新的 buffer 大小公式 = 当前剩余空间 + arg 的长度 + 10(额外的缓冲)。
int estimate = buffer.remaining() + arg.length + 10;
// 最终 buffer 的大小取 【当前容量的两倍】、【上面估计扩容大小】 两个中的较大值,保证空间够避免扩容幅度不够。
realloc(max(buffer.capacity() * 2, estimate));
}
// 下面是实现 Redis 通信报文的拼接
while (true) {
try {
// 先写入$表示接下来是字符串,然后写入长度,再写入实际的参数数据。
buffer.put((byte) '$');
write(arg.length);
buffer.put(CRLF);
buffer.put(arg);
buffer.put(CRLF);
break;
} catch (BufferOverflowException e) {
// 发生 buffer 写入容量不够异常,就会将 buffer 的指针切换到一开始标记地方,然后执行扩容
buffer.reset();
// 每次尝试将容量翻倍
realloc(buffer.capacity() * 2);
}
}
// 增加参数计数器值
count++;
return this;
}
// 将字符串写入 ByteBuffer ,跟上面方法套路差不多
private CommandArgs<K, V> write(String arg) {
// 获取要写入字符串的长度,后面会多次用到。
int length = arg.length();
// 标记当前指针位置,后面可能会用到。
buffer.mark();
// buffer 中剩余的空间比传入的参数长度小,就估算一个新的 buffer 长度。 跟上面方法一样。
if (buffer.remaining() < length) {
int estimate = buffer.remaining() + length + 10;
realloc(max(buffer.capacity() * 2, estimate));
}
// 下面是实现 Redis 通信报文的拼接
while (true) {
try {
buffer.put((byte) '$');
write(length);
buffer.put(CRLF);
// 循环将字符串中每一个字符地写入 buffer。
// (byte) arg.charAt(i) 将字符转换为字节,适用于 ASCII 字符。
for (int i = 0; i < length; i++) {
buffer.put((byte) arg.charAt(i));
}
buffer.put(CRLF);
break;
} catch (BufferOverflowException e) {
// buffer 写入超出容量异常处理,默认扩容 2 倍。
buffer.reset();
realloc(buffer.capacity() * 2);
}
}
count++;
return this;
}
// 数值写入处理
private void write(long value) {
// 如果值小于 10,直接转换为对应的 ASCII 码字符然后写入 buffer。
if (value < 10) {
// '0' 的 ASCII 值是 48,'0' + value 会得到正确的 ASCII 字符。
buffer.put((byte) ('0' + value));
return;
}
// 使用 StringBuilder 拼接字符串,优化性能
StringBuilder sb = new StringBuilder(8);
while (value > 0) {
// 每次 %10 就会拿到当前 value 的最后一位数。
// 比如传入数值 12345, 第一次循环的时候, digit=5 , next value = 1234
// 第二次循环的时候, digit=4 , next value = 123
// 第三次循环的时候, digit=3 , next value = 12
// 第四次循环的时候, digit=2 , next value = 1
// 最后一次循环的时候, digit=1 , next value = 1 / 10 = 0
long digit = value % 10;
// 往 StringBuilder 拼接的顺序是反着来的,上面 StringBuilder 的内容是 "54321"
sb.append((char) ('0' + digit));
value /= 10;
}
// 从后向前遍历 StringBuilder,将字符以正确的顺序写入 buffer。
// 上面 StringBuilder 的内容是 "54321",反过来遍历就会变成 12345
for (int i = sb.length() - 1; i的内容是 "54321" >= 0; i--) {
// 直接使用 (byte) sb.charAt(i) 将字符转换为字节,对于数字来说可以这么写
buffer.put((byte) sb.charAt(i));
}
}
// 重新分配 ByteBuffer 的大小
private void realloc(int size) {
// 创建新的 ByteBuffer,容量用传入的大小
ByteBuffer buffer = ByteBuffer.allocate(size);
// 将 buffer 从写模式切换到读模式,主要是为了下面的 buffer 复制。
this.buffer.flip();
// 将旧的 buffer 中的所有内容复制到新的 buffer 中。
buffer.put(this.buffer);
// 记录新 buffer 的当前位置,后续使用 reset() 方法返回到这个位置
buffer.mark();
// this.buffer 指向新 buffer , 旧的 buffer 将会被回收掉。
this.buffer = buffer;
}
}
这个类中的处理数值写入不知道为什么写这么复杂,直接像下面这么写也应该是可以的:
private void write(long value) {
String strValue = Long.toString(value);
for (int i = 0; i < strValue.length(); i++) {
buffer.put((byte) strValue.charAt(i));
}
}
4.2.4 Redis命令处理 CommandHandler
看这命令风格,大概可以猜到就是 Netty 的 Channel Handler了。 实际上它确实是用于处理 Redis 命令的 Handler。
package com.lambdaworks.redis.protocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import java.util.concurrent.BlockingQueue;
// @ChannelHandler.Sharable 注解表示 handler 支持被多个 channel 共享。
// 有点像 Spring 中的单例 Bean 注解
@ChannelHandler.Sharable
public class CommandHandler<K, V> extends ChannelDuplexHandler {
// 存储待处理的 Redis 命令
protected BlockingQueue<Command<K, V, ?>> queue;
// 缓存接收到的数据
protected ByteBuf buffer;
// 状态机,看起来是用于解析 Redis 协议的工具类
protected RedisStateMachine<K, V> rsm;
public CommandHandler(BlockingQueue<Command<K, V, ?>> queue) {
this.queue = queue;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// channel 注册上来时才会创建 buffer 和状态机。
buffer = ctx.alloc().heapBuffer();
rsm = new RedisStateMachine<K, V>();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
// channel 注销时释放 buffer 资源。
buffer.release();
}
// 处理 Redis 端应答的数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf input = (ByteBuf) msg;
try {
// 检查 input 是否有可读的字节。没有的话就直接返回
if (!input.isReadable()) return;
// 丢弃 buffer 中已经读取过的字节,读过的数据不清理的话会一直占着内存的。
buffer.discardReadBytes();
// 往 buffer 中拼接数据,相当于累计数据,数据可能出现粘包或者拆包的情况
buffer.writeBytes(input);
// 尝试解码数据
decode(ctx, buffer);
} finally {
input.release();
}
}
// 往 Redis 服务端写数据
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// 将传入的消息转换为 Command 对象,这依赖于前面已经解码好了为 Command 对象
Command<?, ?, ?> cmd = (Command<?, ?, ?>) msg;
Channel channel = ctx.channel();
// 分配一个 ButeBuf 准备写数据
ByteBuf buf = ctx.alloc().heapBuffer();
// 将 Command 编码成字节放入 ButeBuf 中
cmd.encode(buf);
// 将 ByteBuf 写入 Channel ,也就是发给 Redis
ctx.write(buf, promise);
}
// 尝试将 ByteBuf 解码为
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
// 队列不为空 && Redis 状态机工具类能够成功解码 buffer 中的数据。
while(!queue.isEmpty() && rsm.decode(buffer, queue.peek().getOutput())) {
Command<K, V, ?> cmd = queue.take();
// 通知等待的线程等做下一步工作
cmd.complete();
}
}
}
这个类中 Handler 加了 @ChannelHandler.Sharable 注解,同时变量 buffer 等又是共享的,看着会有线程安全问题。
4.2.5 Redis报文解析状态机 RedisStateMachine
这个类用于解码 Redis 服务器的响应报文。 它依据 Redis 的请求协议,将收到的字节流解析为命令输出。
package com.lambdaworks.redis.protocol;
import com.lambdaworks.redis.RedisException;
import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import static com.lambdaworks.redis.protocol.Charsets.buffer;
import static com.lambdaworks.redis.protocol.RedisStateMachine.State.Type.*;
// 根据 http://redis.io/topics/protocol 进行协议解码
public class RedisStateMachine<K, V> {
// Redis 事务处理中命令被排队的应答回复,通常是事务模式下使用 MULTI 和 EXEC 命令返回的应答。
private static final ByteBuffer QUEUED = buffer("QUEUED");
// 数据类型,对应着 Redis 协议描述文档里面的几种类型
static class State {
// 简单字符串+、错误信息-、整数:、批量$、多条批量响应*、字节数据
enum Type { SINGLE, ERROR, INTEGER, BULK, MULTI, BYTES }
// type 是当前解析的响应类型
Type type = null;
// count 是当前批量响应的计数或字节长度, -1就是空,Redis 协议里面描述的
int count = -1;
}
// 维护解码过程中的状态堆栈, 栈这种结构可以用处理处理嵌套关系
// 比如对于命令【MGET key1 key2】,Redis应答的可能是【*2 $5 value1 $5 value2】
// 这就就是带有嵌套的响应,用栈这种结构可以很好地处理这种嵌套结构。
private LinkedList<State> stack;
// 只能通过实例化这个对象来调用方法
public RedisStateMachine() {
stack = new LinkedList<State>();
}
// 核心解码方法
// 如果读取了完整的响应,返回 true。
public boolean decode(ByteBuf buffer, CommandOutput<K, V, ?> output) {
// length 用来存储长度 、 end 用来存储结束为止
int length, end;
ByteBuffer bytes;
// 添加一个空的状态,让下面的代码至少有一个状态可以处理。
if (stack.isEmpty()) {
stack.add(new State());
}
if (output == null) {
return stack.isEmpty();
}
// 定义 loop 标签,方便跳出或者停止。 不过在 Java 中这种写法比较少。
loop:
// 一直处理到栈为空为止
while (!stack.isEmpty()) {
// 拿到栈顶的 State 对象,这里用 peek 是可能会解析失败,元素暂时不能被移除。
State state = stack.peek();
// 状态类型为空,尝试读取 Redis 回复的类型并标记读取位置。
if (state.type == null) {
// buffer 不可读,就直接跳出循环等待更多数据。
if (!buffer.isReadable()) break;
// 获取当前 Redis 回复的类型
state.type = readReplyType(buffer);
//
buffer.markReaderIndex();
}
switch (state.type) {
// 处理单行回复
case SINGLE:
// 先读取一行数据,如果返回 null 则读取失败,跳出主循环等待更多数据。
if ((bytes = readLine(buffer)) == null) break loop;
// 如果 Redis 应答的是特殊响应 QUEUED,这种就不处理,因为没啥意思。
// 比如说应答的是 +OK\r\n ,那么就可以读取 OK 到输出中。
if (!QUEUED.equals(bytes)) {
output.set(bytes);
}
break;
// 处理错误回复
case ERROR:
// 同上,没读取到就跳出循环重新等待数据。
if ((bytes = readLine(buffer)) == null) break loop;
output.setError(bytes);
break;
// 处理整数回复
case INTEGER:
// 先找到行尾,然后读取整数值。
// 比如响应 【:1000\r\n】,会读取到 1000 再设置到返回值中。
if ((end = findLineEnd(buffer)) == -1) break loop;
output.set(readLong(buffer, buffer.readerIndex(), end));
break;
// 处理批量回复
case BULK:
// 没有结束就继续等待数据
if ((end = findLineEnd(buffer)) == -1) break loop;
// 读取数据长度
length = (int) readLong(buffer, buffer.readerIndex(), end);
// 读取长度,如果是-1,表示null值。
if (length == -1) {
output.set(null);
} else {
// 状态改为 BYTES 类型
state.type = BYTES;
// 设置需要读取的字节数(加2是为了包括\r\n)
// 比如响应【$5hello】,读取到长度是5,然后将状态改为BYTES,设置需要读取的字节数为7(5 + 2)。
state.count = length + 2;
buffer.markReaderIndex();
continue loop;
}
break;
// 处理多行回复
case MULTI:
// 意味着数组的元素数量还不能确定
if (state.count == -1) {
// 查找 \r\n,如果找不到就跳出循环等待更多数据。
if ((end = findLineEnd(buffer)) == -1) break loop;
// 从 buffer 中读取数组的元素数量
length = (int) readLong(buffer, buffer.readerIndex(), end);
// 要处理的元素数量就是上面匹配出来的
state.count = length;
// 保存当前 buffer 指针位置
buffer.markReaderIndex();
}
// 没有元素需要处理,跳出循环
if (state.count <= 0) break;
// 处理完一个元素后,减少计数
state.count--;
// 加入栈顶,用于处理下一个数组元素。
stack.addFirst(new State());
// 开始处理数组中的下一个元素
// 比如响应【*2\r\n$5\r\nhello\r\n$5\r\nworld\r\n】
// *2 表示有两个元素分别是 【$5\r\nhello\r\n】 和 【$5\r\nworld\r\n】
// 这两个会被单独处理,分别加入栈顶进行处理
continue loop;
// 处理字节数据
case BYTES:
// 读取指定长度的字节数据。要是读取不到足够的字节,跳出循环等待更多的数据。
if ((bytes = readBytes(buffer, state.count)) == null) break loop;
output.set(bytes);
}
// 标记读取位置,移除当前状态,
buffer.markReaderIndex();
stack.remove();
// 调用回调方法
output.complete(stack.size());
}
// 栈是为空完成了所有解析,否则就是没有完成解析
return stack.isEmpty();
}
// 查找行尾的换行符 \r\n , 因为Redis协议使用 \r\n 作为行分隔符。
private int findLineEnd(ByteBuf buffer) {
// 从 buffer 当前读取位置开始
int start = buffer.readerIndex();
// 查找下一个'\n'字符 , 然后检查它前面是否是'\r'。
// writerIndex 表示 buffer 中已写入数据的结尾,超过这个范围是没数据的。
int index = buffer.indexOf(start, buffer.writerIndex(), (byte) '\n');
// 确保 \n 前面是 \r,如果是,则返回 \n 的位置。
// -1是没有找到的意思
return (index > 0 && buffer.getByte(index - 1) == '\r') ? index : -1;
}
// 分析 Redis 响应的类型
private State.Type readReplyType(ByteBuf buffer) {
// 读取响应的第一个字节来确定响应类型, 每种类型都有特定的前缀字符
switch (buffer.readByte()) {
case '+': return SINGLE;
case '-': return ERROR;
case ':': return INTEGER;
case '$': return BULK;
case '*': return MULTI;
default: throw new RedisException("Invalid first byte");
}
}
// 读取长整型数值
private long readLong(ByteBuf buffer, int start, int end) {
long value = 0;
// 处理负数,并通过字符串解析来获取数值。使用减法而不是加法可以避免溢出问题。
boolean negative = buffer.getByte(start) == '-';
// 如果是负数就从下一个字符开始处理,跳过 - 号
int offset = negative ? start + 1 : start;
while (offset < end - 1) {
int digit = buffer.getByte(offset++) - '0';
value = value * 10 - digit;
}
// 如果是正数就取反,前面用了减法
if (!negative) value = -value;
buffer.readerIndex(end + 1);
return value;
}
// 读取 \r\n 前的数据
private ByteBuffer readLine(ByteBuf buffer) {
ByteBuffer bytes = null;
// 找 \n 的下标
int end = findLineEnd(buffer);
// 找到了 \n 的下标就开始读取 start下标到 \r的字节内容,这里面不包括 \r\n
if (end > -1) {
int start = buffer.readerIndex();
bytes = buffer.nioBuffer(start, end - start - 1);
// 更新指针到 end+1 位置,也就是跳过 \r\n
buffer.readerIndex(end + 1);
}
return bytes;
}
// 读取指定字节数,忽略 \r\n
private ByteBuffer readBytes(ByteBuf buffer, int count) {
ByteBuffer bytes = null;
// 确保 buffer 中可读的字节大于想读的字节数
if (buffer.readableBytes() >= count) {
// 读取指定字节数,跳过 \r\n
bytes = buffer.nioBuffer(buffer.readerIndex(), count - 2);
// 更新指针跳过已经读取过的字节
buffer.readerIndex(buffer.readerIndex() + count);
}
return bytes;
}
}
4.2.6 Redis命令输出抽象类 CommandOutput
这是一个抽象类,用于表示 Redis 命令的输出。源码中继承这个抽象类的子类有几十个。
package com.lambdaworks.redis.protocol;
import com.lambdaworks.redis.codec.RedisCodec;
import java.nio.ByteBuffer;
//K代表 key 类型、V 代表 value 类型 、 T 表示输出的类型
public abstract class CommandOutput<K, V, T> {
protected RedisCodec<K, V> codec; // key value 编解码器
protected T output; // 保存命令的输出结果
protected String error; // 保存错误信息
public CommandOutput(RedisCodec<K, V> codec, T output) {
this.codec = codec;
this.output = output;
}
// 返回命令输出
public T get() {
return output;
}
// 设置命令输出为字节序列。 默认实现会抛出异常,子类需要根据需要重写。
public void set(ByteBuffer bytes) {
throw new IllegalStateException();
}
// 设置命令输出为64位整数。
public void set(long integer) {
throw new IllegalStateException();
}
// 设置错误信息
public void setError(ByteBuffer error) {
this.error = decodeAscii(error);
}
public void setError(String error) {
this.error = error;
}
// 检查是否有错误信息
public boolean hasError() {
return this.error != null;
}
// 获取错误信息
public String getError() {
return error;
}
// 标记命令输出完成
public void complete(int depth) {
// nothing to do by default
}
// 将 ByteBuffer 解码为 ASCII 字符串,公用的方法,子类也可以用。
protected String decodeAscii(ByteBuffer bytes) {
char[] chars = new char[bytes.remaining()];
for (int i = 0; i < chars.length; i++) {
chars[i] = (char) bytes.get();
}
return new String(chars);
}
}
4.3 Redis 数据输出对象
com.lambdaworks.redis.output 包下就是一些 Java 对象的输出类,都是 CommandOutput 抽象类的实现。
4.3.1 布尔值列表输出 BooleanListOutput
将 Redis 返回的整数值 List 转换为 boolean 值 List
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.util.ArrayList;
import java.util.List;
public class BooleanListOutput<K, V> extends CommandOutput<K, V, List<Boolean>> {
public BooleanListOutput(RedisCodec<K, V> codec) {
super(codec, new ArrayList<Boolean>());
}
@Override
public void set(long integer) {
// 将输入的整数转换为布尔值
// 如果整数是 1,那么对应的布尔值就是 true ,添加到 List 中。
// 其他整数对应的布尔值就是 false ,添加到 List 中。
output.add((integer == 1) ? Boolean.TRUE : Boolean.FALSE);
}
}
4.3.2 布尔值输出 BooleanOutput
单个布尔值的输出处理器,比如 EXISTS 命令。
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.util.ArrayList;
import java.util.List;
public class BooleanListOutput<K, V> extends CommandOutput<K, V, List<Boolean>> {
public BooleanListOutput(RedisCodec<K, V> codec) {
super(codec, new ArrayList<Boolean>());
}
@Override
public void set(long integer) {
// 如果是 1 则设置为 true,否则就是 false。
output.add((integer == 1) ? Boolean.TRUE : Boolean.FALSE);
}
}
4.3.3 字节数组输出 ByteArrayOutput
处理 Redis 命令返回的字节数组。
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
public class ByteArrayOutput<K, V> extends CommandOutput<K, V, byte[]> {
public ByteArrayOutput(RedisCodec<K, V> codec) {
super(codec, null);
}
@Override
public void set(ByteBuffer bytes) {
if (bytes != null) {
// 创建一个新的字节数组,大小为 ByteBuffer 剩余的字节数。
output = new byte[bytes.remaining()];
// 将 ByteBuffer 中的数据复制到新创建的字节数组中去。
bytes.get(output);
}
}
}
4.3.4 日期输出 DateOutput
处理 Redis 命令返回的日期时间输出, 这个输出处理器不处理毫秒级精度。
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.util.Date;
public class DateOutput<K, V> extends CommandOutput<K, V, Date> {
public DateOutput(RedisCodec<K, V> codec) {
super(codec, null);
}
@Override
public void set(long time) {
// 将以秒为单位时间戳转换为毫秒,然后创建一个新的 Date 对象。
// Java 的 Date 构造函数需要的是毫秒级时间戳。
output = new Date(time * 1000);
}
}
4.3.5 双精度浮点数输出 DoubleOutput
处理双精度浮点数的输出。 比如 ZSCORE 等命令输出。
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
import static java.lang.Double.parseDouble;
public class DoubleOutput<K, V> extends CommandOutput<K, V, Double> {
public DoubleOutput(RedisCodec<K, V> codec) {
super(codec, null);
}
@Override
public void set(ByteBuffer bytes) {
// ByteBuffer 为 null,则输出也是 null。
// 使用父类的 decodeAscii 方法将字节缓冲区解码为 ASCII 字符串。
// 再将 ASCII 字符串解析为 Double 值。
output = (bytes == null) ? null : parseDouble(decodeAscii(bytes));
}
}
4.3.6 整形数值输出 IntegerOutput
处理 64 位整数输出。
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
public class IntegerOutput<K, V> extends CommandOutput<K, V, Long> {
public IntegerOutput(RedisCodec<K, V> codec) {
super(codec, null);
}
@Override
public void set(long integer) {
// 直接将输入的长整型值赋给 output
output = integer;
}
@Override
public void set(ByteBuffer bytes) {
// 传入的 ByteBuffer 会被直接忽略掉
// 相当于空值处理
output = null;
}
}
4.3.7 键集合输出 KeyListOutput
处理 Redis 命令返回的 key List 输出。 可以支持处理返回多个 KEY 的 Redis 命令。
可能用于处理 KEYS、SCAN命令输出,或者一次性批量获取 Key 或者遍历数据库的场景。
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class KeyListOutput<K, V> extends CommandOutput<K, V, List<K>> {
public KeyListOutput(RedisCodec<K, V> codec) {
super(codec, new ArrayList<K>());
}
@Override
public void set(ByteBuffer bytes) {
// 利用 codec 解码 ByteBuffer 中的 key
output.add(codec.decodeKey(bytes));
}
}
4.3.8 键输出 KeyOutput
处理单个 Key 的输出结果。
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
public class KeyOutput<K, V> extends CommandOutput<K, V, K> {
public KeyOutput(RedisCodec<K, V> codec) {
super(codec, null);
}
@Override
public void set(ByteBuffer bytes) {
// ByteBuffer 为 null,输出也为 null。
// ByteBuffer 不为 null,利用提供的 codec 解码 ByteBuffer 中的 Key 。
output = (bytes == null) ? null : codec.decodeKey(bytes);
}
}
4.3.9 键值输出 KeyValueOutput
处理 key value 输出。
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.KeyValue;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
public class KeyValueOutput<K, V> extends CommandOutput<K, V, KeyValue<K, V>> {
private K key;
public KeyValueOutput(RedisCodec<K, V> codec) {
super(codec, null);
}
@Override
public void set(ByteBuffer bytes) {
if (bytes != null) {
// 如果 key 还没有值,将 ByteBuffer 解码为 key 赋值。
if (key == null) {
key = codec.decodeKey(bytes);
} else {
// 如果 key 已设置过,将 ByteBuffer 解码为 value 。然后创建一个新的 KeyValue 对象输出。
V value = codec.decodeValue(bytes);
output = new KeyValue<K, V>(key, value);
}
}
}
}
4.3.10 Map输出对象 MapOutput
处理键值对映射输出。
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class MapOutput<K, V> extends CommandOutput<K, V, Map<K, V>> {
private K key;
public MapOutput(RedisCodec<K, V> codec) {
super(codec, new HashMap<K, V>());
}
@Override
public void set(ByteBuffer bytes) {
// key 为 null,将 ByteBuffer 解码为 key 并保存。
if (key == null) {
key = codec.decodeKey(bytes);
return;
}
// 将 ByteBuffer 解码为 value 并保存。
V value = (bytes == null) ? null : codec.decodeValue(bytes);
output.put(key, value);
// 处理完一对 key value 后,重置 key 为 null,准备处理下一对。
key = null;
}
}
4.3.11 事务块输出 MultiOutput
处理 Redis 的 MULTI 事务命令块中所有命令输出。
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.Command;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
import java.util.*;
public class MultiOutput<K, V> extends CommandOutput<K, V, List<Object>> {
private Queue<Command<K, V, ?>> queue;
public MultiOutput(RedisCodec<K, V> codec) {
super(codec, new ArrayList<Object>());
queue = new LinkedList<Command<K, V, ?>>();
}
// 添加命令到队列去
public void add(Command<K, V, ?> cmd) {
queue.add(cmd);
}
// 循环调用队列中命令的 complete 方法
public void cancel() {
for (Command<K, V, ?> c : queue) {
c.complete();
}
}
@Override
public void set(long integer) {
// 将输入的值设置到当前命令的输出
queue.peek().getOutput().set(integer);
}
@Override
public void set(ByteBuffer bytes) {
queue.peek().getOutput().set(bytes);
}
// 设置错误信息
@Override
public void setError(ByteBuffer error) {
CommandOutput<K, V, ?> output = queue.isEmpty() ? this : queue.peek().getOutput();
output.setError(decodeAscii(error));
}
// 完成逻辑
@Override
public void complete(int depth) {
if (depth == 1) {
// 处理单个命令的完成
Command<K, V, ?> cmd = queue.remove();
CommandOutput<K, V, ?> o = cmd.getOutput();
output.add(!o.hasError() ? o.get() : new RedisException(o.getError()));
cmd.complete();
} else if (depth == 0 && !queue.isEmpty()) {
// 处理所有剩余命令的完成
for (Command<K, V, ?> cmd : queue) {
cmd.complete();
}
}
}
}
4.3.12 嵌套命令输出 NestedMultiOutput
嵌套的多层命令输出处理器。
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
import java.util.*;
public class NestedMultiOutput<K, V> extends CommandOutput<K, V, List<Object>> {
private LinkedList<List<Object>> stack; // 管理嵌套命令结构
private int depth; // 记录当前嵌套深度
public NestedMultiOutput(RedisCodec<K, V> codec) {
super(codec, new ArrayList<Object>());
stack = new LinkedList<List<Object>>();
depth = 1;
}
@Override
public void set(long integer) {
output.add(integer);
}
@Override
public void set(ByteBuffer bytes) {
output.add(bytes == null ? null : codec.decodeValue(bytes));
}
@Override
public void setError(ByteBuffer error) {
output.add(new RedisException(decodeAscii(error)));
}
@Override
public void complete(int depth) {
// 当新的深度大于当前深度时,创建新的嵌套层级。
if (depth > this.depth) {
Object o = output.remove(output.size() - 1);
stack.push(output);
output = new ArrayList<Object>();
output.add(o);
// 当新的深度小于当前深度时,完成当前层级并返回上一层。
} else if (depth > 0 && depth < this.depth) {
stack.peek().add(output);
output = stack.pop();
}
// 更新当前深度。
this.depth = depth;
}
}
4.3.13 有序集合输出 ScoredValueListOutput
处理有序集合(Sorted Set)的返回结果。
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.ScoredValue;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class ScoredValueListOutput<K, V> extends CommandOutput<K, V, List<ScoredValue<V>>> {
private V value; // 暂存解码后的值
public ScoredValueListOutput(RedisCodec<K, V> codec) {
super(codec, new ArrayList<ScoredValue<V>>());
}
@Override
public void set(ByteBuffer bytes) {
// 如果 value 为 null,解码 ByteBuffer 并存储 value。
if (value == null) {
value = codec.decodeValue(bytes);
return;
}
// 解码 score ,创建 ScoredValue 对象并添加到 List 中。
double score = Double.parseDouble(decodeAscii(bytes));
output.add(new ScoredValue<V>(score, value));
value = null;
}
}
4.3.14 状态输出 StatusOutput
处理 Redis 命令的状态消息输出。比如简单状态响应,如 SET、DEL、EXPIRE 等命令的执行结果。
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
import static com.lambdaworks.redis.protocol.Charsets.buffer;
public class StatusOutput<K, V> extends CommandOutput<K, V, String> {
private static final ByteBuffer OK = buffer("OK");
public StatusOutput(RedisCodec<K, V> codec) {
super(codec, null);
}
@Override
public void set(ByteBuffer bytes) {
// 如果输入的 ByteBuffer 等于预定义的 OK 常量,则把输出设置为字符串 "OK"。
// 不等于就解码为 ASCII 字符串
output = OK.equals(bytes) ? "OK" : decodeAscii(bytes);
}
}
4.3.15 字符串列表输出 StringListOutput
处理字符串 List 结果输出。
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class StringListOutput<K, V> extends CommandOutput<K, V, List<String>> {
public StringListOutput(RedisCodec<K, V> codec) {
super(codec, new ArrayList<String>());
}
@Override
public void set(ByteBuffer bytes) {
// 输入的 ByteBuffer 为 null,则添加 null 到 List 中。
// 不为空将 ByteBuffer 解码为 ASCII 字符串并添加到 List 中。
output.add(bytes == null ? null : decodeAscii(bytes));
}
}
4.3.16 值列表输出 ValueListOutput
处理命令返回值 List 输出结果。
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class ValueListOutput<K, V> extends CommandOutput<K, V, List<V>> {
public ValueListOutput(RedisCodec<K, V> codec) {
super(codec, new ArrayList<V>());
}
@Override
public void set(ByteBuffer bytes) {
// ByteBuffer 为 null,则添加 null 到 List 中。
// 不为空调用 RedisCodec 解码 ByteBuffer ,再添加到 List 中。
output.add(bytes == null ? null : codec.decodeValue(bytes));
}
}
4.3.17 值输出 ValueOutput
单个值命令输出处理。
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
public class ValueOutput<K, V> extends CommandOutput<K, V, V> {
public ValueOutput(RedisCodec<K, V> codec) {
super(codec, null);
}
@Override
public void set(ByteBuffer bytes) {
// ByteBuffer 为 null,则添加 null 到 List 中。
// 不为空调用 RedisCodec 解码 ByteBuffer ,再添加到 List 中。
output = (bytes == null) ? null : codec.decodeValue(bytes);
}
}
4.3.18 值集合输出 ValueSetOutput
处理返回值集合输出。
package com.lambdaworks.redis.output;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
public class ValueOutput<K, V> extends CommandOutput<K, V, V> {
public ValueOutput(RedisCodec<K, V> codec) {
super(codec, null);
}
@Override
public void set(ByteBuffer bytes) {
// ByteBuffer 为 null,则添加 null 到 List 中。
// 不为空调用 RedisCodec 解码 ByteBuffer ,再添加到 List 中。
output = (bytes == null) ? null : codec.decodeValue(bytes);
}
}
4.4 参数组装辅助类
4.4.1 SortArgs
这是一个构建Redis SORT命令的参数 List 的辅助类。
package com.lambdaworks.redis;
import com.lambdaworks.redis.protocol.CommandArgs;
import com.lambdaworks.redis.protocol.CommandKeyword;
import java.util.ArrayList;
import java.util.List;
import static com.lambdaworks.redis.protocol.CommandKeyword.*;
import static com.lambdaworks.redis.protocol.CommandType.GET;
public class SortArgs {
private String by; // 排序的 key
private Long offset, count; // 分页限制结果
private List<String> get; // 字段 List
private CommandKeyword order; // 升序或降序
private boolean alpha; // 是否按字母顺序排序
// 内部静态类 Builder , 用于创建和初始化SortArgs对象。
public static class Builder {
public static SortArgs by(String pattern) {
return new SortArgs().by(pattern);
}
public static SortArgs limit(long offset, long count) {
return new SortArgs().limit(offset, count);
}
public static SortArgs get(String pattern) {
return new SortArgs().get(pattern);
}
public static SortArgs asc() {
return new SortArgs().asc();
}
public static SortArgs desc() {
return new SortArgs().desc();
}
public static SortArgs alpha() {
return new SortArgs().alpha();
}
}
public SortArgs by(String pattern) {
by = pattern;
return this;
}
public SortArgs limit(long offset, long count) {
this.offset = offset;
this.count = count;
return this;
}
public SortArgs get(String pattern) {
if (get == null) {
get = new ArrayList<String>();
}
get.add(pattern);
return this;
}
public SortArgs asc() {
order = ASC;
return this;
}
public SortArgs desc() {
order = DESC;
return this;
}
public SortArgs alpha() {
alpha = true;
return this;
}
<K, V> void build(CommandArgs<K, V> args, K store) {
if (by != null) {
args.add(BY);
args.add(by);
}
if (get != null) {
for (String pattern : get) {
args.add(GET);
args.add(pattern);
}
}
if (offset != null) {
args.add(LIMIT);
args.add(offset);
args.add(count);
}
if (order != null) {
args.add(order);
}
if (alpha) {
args.add(ALPHA);
}
if (store != null) {
args.add(STORE);
args.addKey(store);
}
}
}
这个类逻辑不多,主要是要对 Redis 的 Sort 命令有一点了解就好理解。
比如要写一个 Redis 命令:
有一个存储用户 ID 的 List 【users】,每个用户都有对应的 【user:
SORT users BY user:*:age DESC GET user:*:name LIMIT 0 5 ALPHA
对 【users】 List 进行排序,使用 【user::age】作为排序依据,降序排列,获取每个用户对应的 【user::name】 值,限制结果为前 5 个,对获取的名字使用字母顺序排序
使用 SortArgs 类来构建这个命令:
final SortArgs args = SortArgs.Builder.by("user:*:age")
.desc()
.get("user:*:name")
.limit(0, 5)
.alpha();
4.4.2 ZStoreArgs
用于构建 Redis 的 ZUNIONSTORE 和 ZINTERSTORE 命令的参数 List 辅助类。
package com.lambdaworks.redis;
import com.lambdaworks.redis.protocol.CommandArgs;
import java.util.*;
import static com.lambdaworks.redis.protocol.CommandKeyword.*;
public class ZStoreArgs {
// 三种聚合方法 求和 最小 最大
private static enum Aggregate { SUM, MIN, MAX }
private List<Long> weights;
private Aggregate aggregate;
public static class Builder {
// 设置输入集合的权重
public static ZStoreArgs weights(long... weights) {
return new ZStoreArgs().weights(weights);
}
public static ZStoreArgs sum() {
return new ZStoreArgs().sum();
}
public static ZStoreArgs min() {
return new ZStoreArgs().min();
}
public static ZStoreArgs max() {
return new ZStoreArgs().max();
}
}
// 设置输入集合的权重
public ZStoreArgs weights(long... weights) {
this.weights = new ArrayList<Long>(weights.length);
for (long weight : weights) {
this.weights.add(weight);
}
return this;
}
public ZStoreArgs sum() {
aggregate = Aggregate.SUM;
return this;
}
public ZStoreArgs min() {
aggregate = Aggregate.MIN;
return this;
}
public ZStoreArgs max() {
aggregate = Aggregate.MAX;
return this;
}
<K, V> void build(CommandArgs<K, V> args) {
if (weights != null) {
args.add(WEIGHTS);
for (long weight : weights) {
args.add(weight);
}
}
if (aggregate != null) {
args.add(AGGREGATE);
switch (aggregate) {
case SUM:
args.add(SUM);
break;
case MIN:
args.add(MIN);
break;
case MAX:
args.add(MAX);
break;
}
}
}
}