七的博客

RedissonV1.0-Lettuce通信源码分析-连接管理以及发布订阅机制

源码分析

RedissonV1.0-Lettuce通信源码分析-连接管理以及发布订阅机制

1. 连接管理

连接管理主要是跟 Netty 相关的一些代码,涉及到连接的建立、断开、重连、数据的处理等。

1.1 异步Redis连接 RedisAsyncConnection

这是一个实现了异步 Redis 连接的类,在整个 Lettuce 中是最核心的一个类之一。

本身提供了一个线程安全的方式来与 Redis 服务器进行异步通信。

package com.lambdaworks.redis;

import com.lambdaworks.codec.Base16;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.output.*;
import com.lambdaworks.redis.protocol.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.*;

import static com.lambdaworks.redis.protocol.CommandKeyword.*;
import static com.lambdaworks.redis.protocol.CommandType.*;


@ChannelHandler.Sharable
public class RedisAsyncConnection<K, V> extends ChannelInboundHandlerAdapter {
    protected BlockingQueue<Command<K, V, ?>> queue;  // 保存命令的阻塞队列
    protected RedisCodec<K, V> codec;   // 编解码器
    protected Channel channel;   // 跟 Redis 通信的 Channel
    protected long timeout;  // Redis命令响应等待的超时时间
    protected TimeUnit unit;   // Redis命令超时时间单位
    protected MultiOutput<K, V> multi;  // 处理 Redis 事务 multi/exec 的输出
    private String password;  // 连接密码
    private int db;   // 选择的 redis 数据库下标
    private boolean closed;  // 是否关闭的标志

   
    public RedisAsyncConnection(BlockingQueue<Command<K, V, ?>> queue, RedisCodec<K, V> codec, long timeout, TimeUnit unit) {
        this.queue = queue;
        this.codec = codec;
        this.timeout = timeout;
        this.unit = unit;
    }

    
    public void setTimeout(long timeout, TimeUnit unit) {
        this.timeout = timeout;
        this.unit = unit;
    }

    // 实现 APPEND 命令,返回的是 Futrue 表明是异步操作
    public Future<Long> append(K key, V value) {
        return dispatch(APPEND, new IntegerOutput<K, V>(codec), key, value);
    }

    // 实现 Redis 的 Auth 命令
    public String auth(String password) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(password);
        // 执行 auth 命令
        Command<K, V, String> cmd = dispatch(AUTH, new StatusOutput<K, V>(codec), args);
        // 阻塞等待 Redis 应答,会有最大的超时时间等待
        String status = await(cmd, timeout, unit);
        // 返回 "OK" 认证成功,会更新 this.password
        if ("OK".equals(status)) this.password = password;
        return status;
    }

    // 实现 BGREWRITEAOF 命令, 用于异步重写AOF文件
    public Future<String> bgrewriteaof() {
        return dispatch(BGREWRITEAOF, new StatusOutput<K, V>(codec));
    }

    // BGSAVE命令,用于在后台保存数据到磁盘
    public Future<String> bgsave() {
        return dispatch(BGSAVE, new StatusOutput<K, V>(codec));
    }

    // 对应 Redis 的 BITCOUNT 命令,用于计算字符串中比特位为 1 的数量
    public Future<Long> bitcount(K key) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key);
        return dispatch(BITCOUNT, new IntegerOutput<K, V>(codec), args);
    }

    public Future<Long> bitcount(K key, long start, long end) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
        args.addKey(key).add(start).add(end);
        return dispatch(BITCOUNT, new IntegerOutput<K, V>(codec), args);
    }

    // 下面几个都是 BITOP命令    用于执行位操作 
    public Future<Long> bitopAnd(K destination, K... keys) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
        args.add(AND).addKey(destination).addKeys(keys);
        return dispatch(BITOP, new IntegerOutput<K, V>(codec), args);
    }

    public Future<Long> bitopNot(K destination, K source) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
        args.add(NOT).addKey(destination).addKey(source);
        return dispatch(BITOP, new IntegerOutput<K, V>(codec), args);
    }

    public Future<Long> bitopOr(K destination, K... keys) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
        args.add(OR).addKey(destination).addKeys(keys);
        return dispatch(BITOP, new IntegerOutput<K, V>(codec), args);
    }

    public Future<Long> bitopXor(K destination, K... keys) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
        args.add(XOR).addKey(destination).addKeys(keys);
        return dispatch(BITOP, new IntegerOutput<K, V>(codec), args);
    }

    // BLPOP和BRPOP命令   用于阻塞式地从 List 中弹出元素。
    public Future<KeyValue<K, V>> blpop(long timeout, K... keys) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKeys(keys).add(timeout);
        return dispatch(BLPOP, new KeyValueOutput<K, V>(codec), args);
    }

    // 
    public Future<KeyValue<K, V>> brpop(long timeout, K... keys) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKeys(keys).add(timeout);
        return dispatch(BRPOP, new KeyValueOutput<K, V>(codec), args);
    }

    // BRPOPLPUSH 命令,从 List 尾部弹出元素并推入另一个  List  头部。
    public Future<V> brpoplpush(long timeout, K source, K destination) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
        args.addKey(source).addKey(destination).add(timeout);
        return dispatch(BRPOPLPUSH, new ValueOutput<K, V>(codec), args);
    }

    public Future<K> clientGetname() {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(GETNAME);
        return dispatch(CLIENT, new KeyOutput<K, V>(codec), args);
    }

    // 下面几个都是管理客户端连接的命令
    public Future<String> clientSetname(K name) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(SETNAME).addKey(name);
        return dispatch(CLIENT, new StatusOutput<K, V>(codec), args);
    }

    public Future<String> clientKill(String addr) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(KILL).add(addr);
        return dispatch(CLIENT, new StatusOutput<K, V>(codec), args);
    }

    public Future<String> clientList() {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(LIST);
        return dispatch(CLIENT, new StatusOutput<K, V>(codec), args);
    }

    public Future<List<String>> configGet(String parameter) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(GET).add(parameter);
        return dispatch(CONFIG, new StringListOutput<K, V>(codec), args);
    }

    public Future<String> configResetstat() {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(RESETSTAT);
        return dispatch(CONFIG, new StatusOutput<K, V>(codec), args);
    }

    public Future<String> configSet(String parameter, String value) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(SET).add(parameter).add(value);
        return dispatch(CONFIG, new StatusOutput<K, V>(codec), args);
    }

    // DBSIZE命令  返回当前数据库中 key 的数量。
    public Future<Long> dbsize() {
        return dispatch(DBSIZE, new IntegerOutput<K, V>(codec));
    }

    public Future<String> debugObject(K key) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(OBJECT).addKey(key);
        return dispatch(DEBUG, new StatusOutput<K, V>(codec), args);
    }

    // DECR 和 DECRBY 命令   用于对数值进行递减操作。
    public Future<Long> decr(K key) {
        return dispatch(DECR, new IntegerOutput<K, V>(codec), key);
    }

    public Future<Long> decrby(K key, long amount) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(amount);
        return dispatch(DECRBY, new IntegerOutput<K, V>(codec), args);
    }

    // 删除一个或多个key
    public Future<Long> del(K... keys) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKeys(keys);
        return dispatch(DEL, new IntegerOutput<K, V>(codec), args);
    }

    // 取消一个事务  注意它会清除multi字段
    public Future<String> discard() {
        if (multi != null) {
            multi.cancel();
            multi = null;
        }
        return dispatch(DISCARD, new StatusOutput<K, V>(codec));
    }

    public Future<byte[]> dump(K key) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key);
        return dispatch(DUMP, new ByteArrayOutput<K, V>(codec), args);
    }

    public Future<V> echo(V msg) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addValue(msg);
        return dispatch(ECHO, new ValueOutput<K, V>(codec), args);
    }

    // 用于执行 Redis EVAL和 EVALSHA 命令,用于执行Lua脚本。
    // 这里可以留意下,后面 Redis 会使用这种执行 lua 脚本的方式。

    public <T> Future<T> eval(V script, ScriptOutputType type, K[] keys, V... values) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
        args.addValue(script).add(keys.length).addKeys(keys).addValues(values);
        CommandOutput<K, V, T> output = newScriptOutput(codec, type);
        return dispatch(EVAL, output, args);
    }

    public <T> Future<T> evalsha(String digest, ScriptOutputType type, K[] keys, V... values) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
        args.add(digest).add(keys.length).addKeys(keys).addValues(values);
        CommandOutput<K, V, T> output = newScriptOutput(codec, type);
        return dispatch(EVALSHA, output, args);
    }

    // 检查给定key是否存在
    public Future<Boolean> exists(K key) {
        return dispatch(EXISTS, new BooleanOutput<K, V>(codec), key);
    }

    // 下面几个都是设置各种过期命令,用于设置key的过期时间。
    public Future<Boolean> expire(K key, long seconds) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(seconds);
        return dispatch(EXPIRE, new BooleanOutput<K, V>(codec), args);
    }

    public Future<Boolean> expireat(K key, Date timestamp) {
        return expireat(key, timestamp.getTime() / 1000);
    }

    public Future<Boolean> expireat(K key, long timestamp) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(timestamp);
        return dispatch(EXPIREAT, new BooleanOutput<K, V>(codec), args);
    }

    // 用于执行事务 注意它会清除multi字段。
    public Future<List<Object>> exec() {
        MultiOutput<K, V> multi = this.multi;
        this.multi = null;
        if (multi == null) multi = new MultiOutput<K, V>(codec);
        return dispatch(EXEC, multi);
    }

    // 用于清空所有数据库或当前数据库。
    public Future<String> flushall() throws Exception {
        return dispatch(FLUSHALL, new StatusOutput<K, V>(codec));
    }

    public Future<String> flushdb() throws Exception {
        return dispatch(FLUSHDB, new StatusOutput<K, V>(codec));
    }

    // 下面的方法都是用于获取指定 key 的值。
    public Future<V> get(K key) {
        return dispatch(GET, new ValueOutput<K, V>(codec), key);
    }

    // 用于获取字符串指定位置的 bit 值
    public Future<Long> getbit(K key, long offset) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(offset);
        return dispatch(GETBIT, new IntegerOutput<K, V>(codec), args);
    }

    public Future<V> getrange(K key, long start, long end) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(start).add(end);
        return dispatch(GETRANGE, new ValueOutput<K, V>(codec), args);
    }

    public Future<V> getset(K key, V value) {
        return dispatch(GETSET, new ValueOutput<K, V>(codec), key, value);
    }

    // hash相关的操作方法
    public Future<Long> hdel(K key, K... fields) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addKeys(fields);
        return dispatch(HDEL, new IntegerOutput<K, V>(codec), args);
    }

    public Future<Boolean> hexists(K key, K field) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addKey(field);
        return dispatch(HEXISTS, new BooleanOutput<K, V>(codec), args);
    }

    public Future<V> hget(K key, K field) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addKey(field);
        return dispatch(HGET, new ValueOutput<K, V>(codec), args);
    }

    public Future<Long> hincrby(K key, K field, long amount) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addKey(field).add(amount);
        return dispatch(HINCRBY, new IntegerOutput<K, V>(codec), args);
    }

    public Future<Double> hincrbyfloat(K key, K field, double amount) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addKey(field).add(amount);
        return dispatch(HINCRBYFLOAT, new DoubleOutput<K, V>(codec), args);
    }

    public Future<Map<K, V>> hgetall(K key) {
        return dispatch(HGETALL, new MapOutput<K, V>(codec), key);
    }

    public Future<List<K>> hkeys(K key) {
        return dispatch(HKEYS, new KeyListOutput<K, V>(codec), key);
    }

    public Future<Long> hlen(K key) {
        return dispatch(HLEN, new IntegerOutput<K, V>(codec), key);
    }

    public Future<List<V>> hmget(K key, K... fields) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addKeys(fields);
        return dispatch(HMGET, new ValueListOutput<K, V>(codec), args);
    }

    public Future<String> hmset(K key, Map<K, V> map) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(map);
        return dispatch(HMSET, new StatusOutput<K, V>(codec), args);
    }

    public Future<Boolean> hset(K key, K field, V value) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addKey(field).addValue(value);
        return dispatch(HSET, new BooleanOutput<K, V>(codec), args);
    }

    public Future<Boolean> hsetnx(K key, K field, V value) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addKey(field).addValue(value);
        return dispatch(HSETNX, new BooleanOutput<K, V>(codec), args);
    }

    public Future<List<V>> hvals(K key) {
        return dispatch(HVALS, new ValueListOutput<K, V>(codec), key);
    }

    // 自增相关的方法
    public Future<Long> incr(K key) {
        return dispatch(INCR, new IntegerOutput<K, V>(codec), key);
    }

    public Future<Long> incrby(K key, long amount) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(amount);
        return dispatch(INCRBY, new IntegerOutput<K, V>(codec), args);
    }

    public Future<Double> incrbyfloat(K key, double amount) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(amount);
        return dispatch(INCRBYFLOAT, new DoubleOutput<K, V>(codec), args);
    }

    // 获取服务器的各种信息
    public Future<String> info() {
        return dispatch(INFO, new StatusOutput<K, V>(codec));
    }

    public Future<String> info(String section) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(section);
        return dispatch(INFO, new StatusOutput<K, V>(codec), args);
    }

    // 用于查找所有符合给定模式的 key
    public Future<List<K>> keys(K pattern) {
       return dispatch(KEYS, new KeyListOutput<K, V>(codec), pattern);
    }

    // 最后一次成功保存到磁盘的时间
    public Future<Date> lastsave() {
        return dispatch(LASTSAVE, new DateOutput<K, V>(codec));
    }

    // List 相关的方法
    public Future<V> lindex(K key, long index) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(index);
        return dispatch(LINDEX, new ValueOutput<K, V>(codec), args);
    }

    public Future<Long> linsert(K key, boolean before, V pivot, V value) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
        args.addKey(key).add(before ? BEFORE : AFTER).addValue(pivot).addValue(value);
        return dispatch(LINSERT, new IntegerOutput<K, V>(codec), args);
    }

    public Future<Long> llen(K key) {
        return dispatch(LLEN, new IntegerOutput<K, V>(codec), key);
    }

    public Future<V> lpop(K key) {
        return dispatch(LPOP, new ValueOutput<K, V>(codec), key);
    }

    public Future<Long> lpush(K key, V... values) {
        return dispatch(LPUSH, new IntegerOutput<K, V>(codec), key, values);
    }

    public Future<Long> lpushx(K key, V value) {
        return dispatch(LPUSHX, new IntegerOutput<K, V>(codec), key, value);
    }

    public Future<List<V>> lrange(K key, long start, long stop) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(start).add(stop);
        return dispatch(LRANGE, new ValueListOutput<K, V>(codec), args);
    }

    public Future<Long> lrem(K key, long count, V value) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(count).addValue(value);
        return dispatch(LREM, new IntegerOutput<K, V>(codec), args);
    }

    public Future<String> lset(K key, long index, V value) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(index).addValue(value);
        return dispatch(LSET, new StatusOutput<K, V>(codec), args);
    }

    public Future<String> ltrim(K key, long start, long stop) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(start).add(stop);
        return dispatch(LTRIM, new StatusOutput<K, V>(codec), args);
    }

    // 将 key 迁移到另一个 Redis 实例。
    public Future<String> migrate(String host, int port, K key, int db, long timeout) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
        args.add(host).add(port).addKey(key).add(db).add(timeout);
        return dispatch(MIGRATE, new StatusOutput<K, V>(codec), args);
    }

    public Future<List<V>> mget(K... keys) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKeys(keys);
        return dispatch(MGET, new ValueListOutput<K, V>(codec), args);
    }

    // 将当前数据库的键移动到另一个数据库
    public Future<Boolean> move(K key, int db) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(db);
        return dispatch(MOVE, new BooleanOutput<K, V>(codec), args);
    }

    public Future<String> multi() {
        Command<K, V, String> cmd = dispatch(MULTI, new StatusOutput<K, V>(codec));
        multi = (multi == null ? new MultiOutput<K, V>(codec) : multi);
        return cmd;
    }

    public Future<String> mset(Map<K, V> map) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(map);
        return dispatch(MSET, new StatusOutput<K, V>(codec), args);
    }

    public Future<Boolean> msetnx(Map<K, V> map) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(map);
        return dispatch(MSETNX, new BooleanOutput<K, V>(codec), args);
    }

    public Future<String> objectEncoding(K key) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(ENCODING).addKey(key);
        return dispatch(OBJECT, new StatusOutput<K, V>(codec), args);
    }

    public Future<Long> objectIdletime(K key) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(IDLETIME).addKey(key);
        return dispatch(OBJECT, new IntegerOutput<K, V>(codec), args);
    }

    public Future<Long> objectRefcount(K key) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(REFCOUNT).addKey(key);
        return dispatch(OBJECT, new IntegerOutput<K, V>(codec), args);
    }

    public Future<Boolean> persist(K key) {
        return dispatch(PERSIST, new BooleanOutput<K, V>(codec), key);
    }

    public Future<Boolean> pexpire(K key, long milliseconds) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(milliseconds);
        return dispatch(PEXPIRE, new BooleanOutput<K, V>(codec), args);
    }

    public Future<Boolean> pexpireat(K key, Date timestamp) {
        return pexpireat(key, timestamp.getTime());
    }

    public Future<Boolean> pexpireat(K key, long timestamp) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(timestamp);
        return dispatch(PEXPIREAT, new BooleanOutput<K, V>(codec), args);
    }

    public Future<String> ping() {
        return dispatch(PING, new StatusOutput<K, V>(codec));
    }

    public Future<Long> pttl(K key) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key);
        return dispatch(PTTL, new IntegerOutput<K, V>(codec), args);
    }

    public Future<Long> publish(K channel, V message) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(channel).addValue(message);
        return dispatch(PUBLISH, new IntegerOutput<K, V>(codec), args);
    }

    public Future<String> quit() {
        return dispatch(QUIT, new StatusOutput<K, V>(codec));
    }

    public Future<V> randomkey() {
        return dispatch(RANDOMKEY, new ValueOutput<K, V>(codec));
    }

    public Future<String> rename(K key, K newKey) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addKey(newKey);
        return dispatch(RENAME, new StatusOutput<K, V>(codec), args);
    }

    public Future<Boolean> renamenx(K key, K newKey) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addKey(newKey);
        return dispatch(RENAMENX, new BooleanOutput<K, V>(codec), args);
    }

    public Future<String> restore(K key, long ttl, byte[] value) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(ttl).add(value);
        return dispatch(RESTORE, new StatusOutput<K, V>(codec), args);
    }

    public Future<V> rpop(K key) {
        return dispatch(RPOP, new ValueOutput<K, V>(codec), key);
    }

    public Future<V> rpoplpush(K source, K destination) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(source).addKey(destination);
        return dispatch(RPOPLPUSH, new ValueOutput<K, V>(codec), args);
    }

    public Future<Long> rpush(K key, V... values) {
        return dispatch(RPUSH, new IntegerOutput<K, V>(codec), key, values);
    }

    public Future<Long> rpushx(K key, V value) {
        return dispatch(RPUSHX, new IntegerOutput<K, V>(codec), key, value);
    }

    public Future<Long> sadd(K key, V... members) {
        return dispatch(SADD, new IntegerOutput<K, V>(codec), key, members);
    }

    public Future<String> save() {
        return dispatch(SAVE, new StatusOutput<K, V>(codec));
    }

    public Future<Long> scard(K key) {
        return dispatch(SCARD, new IntegerOutput<K, V>(codec), key);
    }

    public Future<List<Boolean>> scriptExists(String... digests) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(EXISTS);
        for (String sha : digests) args.add(sha);
        return dispatch(SCRIPT, new BooleanListOutput<K, V>(codec), args);
    }

    public Future<String> scriptFlush() {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(FLUSH);
        return dispatch(SCRIPT, new StatusOutput<K, V>(codec), args);
    }

    public Future<String> scriptKill() {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(KILL);
        return dispatch(SCRIPT, new StatusOutput<K, V>(codec), args);
    }

    public Future<String> scriptLoad(V script) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(LOAD).addValue(script);
        return dispatch(SCRIPT, new StatusOutput<K, V>(codec), args);
    }

    public Future<Set<V>> sdiff(K... keys) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKeys(keys);
        return dispatch(SDIFF, new ValueSetOutput<K, V>(codec), args);
    }

    public Future<Long> sdiffstore(K destination, K... keys) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(destination).addKeys(keys);
        return dispatch(SDIFFSTORE, new IntegerOutput<K, V>(codec), args);
    }

    public String select(int db) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(db);
        Command<K, V, String> cmd = dispatch(SELECT, new StatusOutput<K, V>(codec), args);
        String status = await(cmd, timeout, unit);
        if ("OK".equals(status)) this.db = db;
        return status;
    }

    public Future<String> set(K key, V value) {
        return dispatch(SET, new StatusOutput<K, V>(codec), key, value);
    }

    public Future<Long> setbit(K key, long offset, int value) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(offset).add(value);
        return dispatch(SETBIT, new IntegerOutput<K, V>(codec), args);
    }

    public Future<String> setex(K key, long seconds, V value) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(seconds).addValue(value);
        return dispatch(SETEX, new StatusOutput<K, V>(codec), args);
    }

    public Future<Boolean> setnx(K key, V value) {
        return dispatch(SETNX, new BooleanOutput<K, V>(codec), key, value);
    }

    public Future<Long> setrange(K key, long offset, V value) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(offset).addValue(value);
        return dispatch(SETRANGE, new IntegerOutput<K, V>(codec), args);
    }

    @Deprecated
    public void shutdown() {
        dispatch(SHUTDOWN, new StatusOutput<K, V>(codec));
    }

    public void shutdown(boolean save) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
        dispatch(SHUTDOWN, new StatusOutput<K, V>(codec), save ? args.add(SAVE) : args.add(NOSAVE));
    }

    public Future<Set<V>> sinter(K... keys) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKeys(keys);
        return dispatch(SINTER, new ValueSetOutput<K, V>(codec), args);
    }

    public Future<Long> sinterstore(K destination, K... keys) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(destination).addKeys(keys);
        return dispatch(SINTERSTORE, new IntegerOutput<K, V>(codec), args);
    }

    public Future<Boolean> sismember(K key, V member) {
        return dispatch(SISMEMBER, new BooleanOutput<K, V>(codec), key, member);
    }

    public Future<Boolean> smove(K source, K destination, V member) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(source).addKey(destination).addValue(member);
        return dispatch(SMOVE, new BooleanOutput<K, V>(codec), args);
    }

    public Future<String> slaveof(String host, int port) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(host).add(port);
        return dispatch(SLAVEOF, new StatusOutput<K, V>(codec), args);
    }

    public Future<String> slaveofNoOne() {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(NO).add(ONE);
        return dispatch(SLAVEOF, new StatusOutput<K, V>(codec), args);
    }

    public Future<List<Object>> slowlogGet() {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(GET);
        return dispatch(SLOWLOG, new NestedMultiOutput<K, V>(codec), args);
    }

    public Future<List<Object>> slowlogGet(int count) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(GET).add(count);
        return dispatch(SLOWLOG, new NestedMultiOutput<K, V>(codec), args);
    }

    public Future<Long> slowlogLen() {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(LEN);
        return dispatch(SLOWLOG, new IntegerOutput<K, V>(codec), args);
    }

    public Future<String> slowlogReset() {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(RESET);
        return dispatch(SLOWLOG, new StatusOutput<K, V>(codec), args);
    }

    public Future<Set<V>> smembers(K key) {
        return dispatch(SMEMBERS, new ValueSetOutput<K, V>(codec), key);
    }

    public Future<List<V>> sort(K key) {
        return dispatch(SORT, new ValueListOutput<K, V>(codec), key);
    }

    public Future<List<V>> sort(K key, SortArgs sortArgs) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key);
        sortArgs.build(args, null);
        return dispatch(SORT, new ValueListOutput<K, V>(codec), args);
    }

    public Future<Long> sortStore(K key, SortArgs sortArgs, K destination) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key);
        sortArgs.build(args, destination);
        return dispatch(SORT, new IntegerOutput<K, V>(codec), args);
    }

    public Future<V> spop(K key) {
        return dispatch(SPOP, new ValueOutput<K, V>(codec), key);
    }

    public Future<V> srandmember(K key) {
        return dispatch(SRANDMEMBER, new ValueOutput<K, V>(codec), key);
    }

    public Future<Set<V>> srandmember(K key, long count) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(count);
        return dispatch(SRANDMEMBER, new ValueSetOutput<K, V>(codec), args);
    }

    public Future<Long> srem(K key, V... members) {
        return dispatch(SREM, new IntegerOutput<K, V>(codec), key, members);
    }

    public Future<Set<V>> sunion(K... keys) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKeys(keys);
        return dispatch(SUNION, new ValueSetOutput<K, V>(codec), args);
    }

    public Future<Long> sunionstore(K destination, K... keys) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(destination).addKeys(keys);
        return dispatch(SUNIONSTORE, new IntegerOutput<K, V>(codec), args);
    }

    public Future<String> sync() {
        return dispatch(SYNC, new StatusOutput<K, V>(codec));
    }

    public Future<Long> strlen(K key) {
        return dispatch(STRLEN, new IntegerOutput<K, V>(codec), key);
    }

    public Future<Long> ttl(K key) {
        return dispatch(TTL, new IntegerOutput<K, V>(codec), key);
    }

    public Future<String> type(K key) {
        return dispatch(TYPE, new StatusOutput<K, V>(codec), key);
    }

    public Future<String> watch(K... keys) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKeys(keys);
        return dispatch(WATCH, new StatusOutput<K, V>(codec), args);
    }

    public Future<String> unwatch() {
        return dispatch(UNWATCH, new StatusOutput<K, V>(codec));
    }

    public Future<Long> zadd(K key, double score, V member) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(score).addValue(member);
        return dispatch(ZADD, new IntegerOutput<K, V>(codec), args);
    }

    @SuppressWarnings("unchecked")
    public Future<Long> zadd(K key, Object... scoresAndValues) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key);
        for (int i = 0; i < scoresAndValues.length; i += 2) {
            args.add((Double) scoresAndValues[i]);
            args.addValue((V) scoresAndValues[i + 1]);
        }
        return dispatch(ZADD, new IntegerOutput<K, V>(codec), args);
    }

    public Future<Long> zcard(K key) {
        return dispatch(ZCARD, new IntegerOutput<K, V>(codec), key);
    }

    public Future<Long> zcount(K key, double min, double max) {
        return zcount(key, string(min), string(max));
    }

    public Future<Long> zcount(K key, String min, String max) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(min).add(max);
        return dispatch(ZCOUNT, new IntegerOutput<K, V>(codec), args);
    }

    public Future<Double> zincrby(K key, double amount, K member) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(amount).addKey(member);
        return dispatch(ZINCRBY, new DoubleOutput<K, V>(codec), args);
    }

    public Future<Long> zinterstore(K destination, K... keys) {
        return zinterstore(destination, new ZStoreArgs(), keys);
    }

    public Future<Long> zinterstore(K destination, ZStoreArgs storeArgs, K... keys) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(destination).add(keys.length).addKeys(keys);
        storeArgs.build(args);
        return dispatch(ZINTERSTORE, new IntegerOutput<K, V>(codec), args);
    }

    public Future<List<V>> zrange(K key, long start, long stop) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(start).add(stop);
        return dispatch(ZRANGE, new ValueListOutput<K, V>(codec), args);
    }

    public Future<List<ScoredValue<V>>> zrangeWithScores(K key, long start, long stop) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
        args.addKey(key).add(start).add(stop).add(WITHSCORES);
        return dispatch(ZRANGE, new ScoredValueListOutput<K, V>(codec), args);
    }

    public Future<List<V>> zrangebyscore(K key, double min, double max) {
        return zrangebyscore(key, string(min), string(max));
    }

    public Future<List<V>> zrangebyscore(K key, String min, String max) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(min).add(max);
        return dispatch(ZRANGEBYSCORE, new ValueListOutput<K, V>(codec), args);
    }

    public Future<List<V>> zrangebyscore(K key, double min, double max, long offset, long count) {
        return zrangebyscore(key, string(min), string(max), offset, count);
    }

    public Future<List<V>> zrangebyscore(K key, String min, String max, long offset, long count) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
        args.addKey(key).add(min).add(max).add(LIMIT).add(offset).add(count);
        return dispatch(ZRANGEBYSCORE, new ValueListOutput<K, V>(codec), args);
    }

    public Future<List<ScoredValue<V>>> zrangebyscoreWithScores(K key, double min, double max) {
        return zrangebyscoreWithScores(key, string(min), string(max));
    }

    public Future<List<ScoredValue<V>>> zrangebyscoreWithScores(K key, String min, String max) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
        args.addKey(key).add(min).add(max).add(WITHSCORES);
        return dispatch(ZRANGEBYSCORE, new ScoredValueListOutput<K, V>(codec), args);
    }

    public Future<List<ScoredValue<V>>> zrangebyscoreWithScores(K key, double min, double max, long offset, long count) {
        return zrangebyscoreWithScores(key, string(min), string(max), offset, count);
    }

    public Future<List<ScoredValue<V>>> zrangebyscoreWithScores(K key, String min, String max, long offset, long count) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
        args.addKey(key).add(min).add(max).add(WITHSCORES).add(LIMIT).add(offset).add(count);
        return dispatch(ZRANGEBYSCORE, new ScoredValueListOutput<K, V>(codec), args);
    }

    public Future<Long> zrank(K key, V member) {
        return dispatch(ZRANK, new IntegerOutput<K, V>(codec), key, member);
    }

    public Future<Long> zrem(K key, V... members) {
        return dispatch(ZREM, new IntegerOutput<K, V>(codec), key, members);
    }

    public Future<Long> zremrangebyrank(K key, long start, long stop) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(start).add(stop);
        return dispatch(ZREMRANGEBYRANK, new IntegerOutput<K, V>(codec), args);
    }

    public Future<Long> zremrangebyscore(K key, double min, double max) {
        return zremrangebyscore(key, string(min), string(max));
    }

    public Future<Long> zremrangebyscore(K key, String min, String max) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(min).add(max);
        return dispatch(ZREMRANGEBYSCORE, new IntegerOutput<K, V>(codec), args);
    }

    public Future<List<V>> zrevrange(K key, long start, long stop) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(start).add(stop);
        return dispatch(ZREVRANGE, new ValueListOutput<K, V>(codec), args);
    }

    public Future<List<ScoredValue<V>>> zrevrangeWithScores(K key, long start, long stop) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
        args.addKey(key).add(start).add(stop).add(WITHSCORES);
        return dispatch(ZREVRANGE, new ScoredValueListOutput<K, V>(codec), args);
    }

    public Future<List<V>> zrevrangebyscore(K key, double max, double min) {
        return zrevrangebyscore(key, string(max), string(min));
    }

    public Future<List<V>> zrevrangebyscore(K key, String max, String min) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(max).add(min);
        return dispatch(ZREVRANGEBYSCORE, new ValueListOutput<K, V>(codec), args);
    }

    public Future<List<V>> zrevrangebyscore(K key, double max, double min, long offset, long count) {
        return zrevrangebyscore(key, string(max), string(min), offset, count);
    }

    public Future<List<V>> zrevrangebyscore(K key, String max, String min, long offset, long count) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
        args.addKey(key).add(max).add(min).add(LIMIT).add(offset).add(count);
        return dispatch(ZREVRANGEBYSCORE, new ValueListOutput<K, V>(codec), args);
    }

    public Future<List<ScoredValue<V>>> zrevrangebyscoreWithScores(K key, double max, double min) {
        return zrevrangebyscoreWithScores(key, string(max), string(min));
    }

    public Future<List<ScoredValue<V>>> zrevrangebyscoreWithScores(K key, String max, String min) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
        args.addKey(key).add(max).add(min).add(WITHSCORES);
        return dispatch(ZREVRANGEBYSCORE, new ScoredValueListOutput<K, V>(codec), args);
    }

    public Future<List<ScoredValue<V>>> zrevrangebyscoreWithScores(K key, double max, double min, long offset, long count) {
        return zrevrangebyscoreWithScores(key, string(max), string(min), offset, count);
    }

    public Future<List<ScoredValue<V>>> zrevrangebyscoreWithScores(K key, String max, String min, long offset, long count) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
        args.addKey(key).add(max).add(min).add(WITHSCORES).add(LIMIT).add(offset).add(count);
        return dispatch(ZREVRANGEBYSCORE, new ScoredValueListOutput<K, V>(codec), args);
    }

    public Future<Long> zrevrank(K key, V member) {
        return dispatch(ZREVRANK, new IntegerOutput<K, V>(codec), key, member);
    }

    public Future<Double> zscore(K key, V member) {
        return dispatch(ZSCORE, new DoubleOutput<K, V>(codec), key, member);
    }

    public Future<Long> zunionstore(K destination, K... keys) {
        return zunionstore(destination, new ZStoreArgs(), keys);
    }

    public Future<Long> zunionstore(K destination, ZStoreArgs storeArgs, K... keys) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
        args.addKey(destination).add(keys.length).addKeys(keys);
        storeArgs.build(args);
        return dispatch(ZUNIONSTORE, new IntegerOutput<K, V>(codec), args);
    }

    // 等待多个Future完成,使用默认的超时时间
    public boolean awaitAll(Future<?>... futures) {
        return awaitAll(timeout, unit, futures);
    }

    // 等待多个Future完成, 总的超时时间由方法指定。 方法可以确保在给定时间内所有 Future 都能完成或超时。
    public boolean awaitAll(long timeout, TimeUnit unit, Future<?>... futures) {
        boolean complete;

        try {
            // 超时时间
            long nanos = unit.toNanos(timeout);
            // 当前时间
            long time  = System.nanoTime();

            // 调用每个 Future 的 get 方法等待完成
            for (Future<?> f : futures) {
                // 如果总的等待时间耗尽,返回 false。
                if (nanos < 0) return false;
                f.get(nanos, TimeUnit.NANOSECONDS);

                // 从总的等待时间里面减掉这次 future 等待的时间
                long now = System.nanoTime();
                nanos -= now - time;
                time   = now;
            }
            // 所有 Future 都在超时前完成,返回true
            complete = true;
        } catch (TimeoutException e) {
            // 超时设置 complete 为 false。
            complete = false;
        } catch (Exception e) {
            // 其他异常就直接抛出自定义异常 RedisCommandInterruptedException。
            throw new RedisCommandInterruptedException(e);
        }

        return complete;
    }

    // 安全关闭 Redis 连接
    public synchronized void close() {
        // 先检查连接是否已经关闭,以及channel是否存在
        if (!closed && channel != null) {
            // 获取看门狗并禁用重连。
            ConnectionWatchdog watchdog = channel.pipeline().get(ConnectionWatchdog.class);
            watchdog.setReconnect(false);
            closed = true;
            channel.close();
        }
    }

    // 计算 lua 脚本的 SHA1 摘要
    public String digest(V script) {
        try {
            MessageDigest md = MessageDigest.getInstance("SHA1");
            md.update(codec.encodeValue(script));
            return new String(Base16.encode(md.digest(), false));
        } catch (NoSuchAlgorithmException e) {
            throw new RedisException("JVM does not support SHA1");
        }
    }

    // 连接建立时,做身份验证和切换正确的数据库。
    // 重新发送之前连接问题而没有发送的命令。
    @Override
    public synchronized void channelActive(ChannelHandlerContext ctx) throws Exception {
        channel = ctx.channel();

        // 临时List来存储命令。
        // 初始容量设置为当前队列大小加2 , 为可能的 AUTH 和切换数据库 命令预留空间。
        List<Command<K, V, ?>> tmp = new ArrayList<Command<K, V, ?>>(queue.size() + 2);

        // 设置了密码的情况下创建一个AUTH命令并添加到临时 List。
        if (password != null) {
            CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(password);
            tmp.add(new Command<K, V, String>(AUTH, new StatusOutput<K, V>(codec), args, false));
        }

        // 选择了非默认数据库 , 创建一个 SELECT 命令并添加到临时 List。
        if (db != 0) {
            CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(db);
            tmp.add(new Command<K, V, String>(SELECT, new StatusOutput<K, V>(codec), args, false));
        }

        // 将原有队列中的所有命令添加到 tmp ,然后清空原队列。
        tmp.addAll(queue);
        queue.clear();

        // 遍历临时 List 中的所有命令
        for (Command<K, V, ?> cmd : tmp) {
            // 如果命令未被取消 , 重新添加到队列中
            if (!cmd.isCancelled()) {
                queue.add(cmd);
                channel.writeAndFlush(cmd);
            }
        }

        // 清空临时列表,释放内存。
        tmp.clear();
    }

    @Override
    public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 连接已经关闭的话,循环执行所有待执行的命令
        if (closed) {
            for (Command<K, V, ?> cmd : queue) {
                if (cmd.getOutput() != null) {
                    // 为每个命令设置错误信息并标记为完成。
                    cmd.getOutput().setError("Connection closed");
                }
                cmd.complete();
            }
            queue.clear();
            queue = null;
            channel = null;
        }
    }

    public <T> Command<K, V, T> dispatch(CommandType type, CommandOutput<K, V, T> output) {
        return dispatch(type, output, (CommandArgs<K, V>) null);
    }

    public <T> Command<K, V, T> dispatch(CommandType type, CommandOutput<K, V, T> output, K key) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key);
        return dispatch(type, output, args);
    }

    public <T> Command<K, V, T> dispatch(CommandType type, CommandOutput<K, V, T> output, K key, V value) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addValue(value);
        return dispatch(type, output, args);
    }

    public <T> Command<K, V, T> dispatch(CommandType type, CommandOutput<K, V, T> output, K key, V[] values) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addValues(values);
        return dispatch(type, output, args);
    }

    // 分发 Redis 命令
    public synchronized <T> Command<K, V, T> dispatch(CommandType type, CommandOutput<K, V, T> output, CommandArgs<K, V> args) {
        // 创建Command 对象,最后一个入参表示是否在事务中。
        Command<K, V, T> cmd = new Command<K, V, T>(type, output, args, multi != null);

        try {
            // 如果在事务中的话,将命令添加到事务。
            if (multi != null) {
                multi.add(cmd);
            }
            // 将命令放入队列
            queue.put(cmd);

            // channel 可用,立即写入并刷新命令。
            if (channel != null) {
                channel.writeAndFlush(cmd);
            }
        } catch (NullPointerException e) {
            throw new RedisException("Connection is closed");
        } catch (InterruptedException e) {
            throw new RedisCommandInterruptedException(e);
        }

        return cmd;
    }

    // 等待命令执行完成然后获取结果
    public <T> T await(Command<K, V, T> cmd, long timeout, TimeUnit unit) {
        if (!cmd.await(timeout, unit)) {
            cmd.cancel(true);
            throw new RedisException("Command timed out");
        }
        CommandOutput<K, V, T> output = cmd.getOutput();
        if (output.hasError()) throw new RedisException(output.getError());
        return output.get();
    }

    // 创建脚本输出类型的 CommandOutput 对象
    @SuppressWarnings("unchecked")
    protected <K, V, T> CommandOutput<K, V, T> newScriptOutput(RedisCodec<K, V> codec, ScriptOutputType type) {
        switch (type) {
            case BOOLEAN: return (CommandOutput<K, V, T>) new BooleanOutput<K, V>(codec);
            case INTEGER: return (CommandOutput<K, V, T>) new IntegerOutput<K, V>(codec);
            case STATUS:  return (CommandOutput<K, V, T>) new StatusOutput<K, V>(codec);
            case MULTI:   return (CommandOutput<K, V, T>) new NestedMultiOutput<K, V>(codec);
            case VALUE:   return (CommandOutput<K, V, T>) new ValueOutput<K, V>(codec);
            default:      throw new RedisException("Unsupported script output type");
        }
    }

    // 将double值转换为字符串
    public String string(double n) {
        if (Double.isInfinite(n)) {
            // 处理无穷大的情况,返回 "+inf" 或 "-inf"
            return (n > 0) ? "+inf" : "-inf";
        }
        return Double.toString(n);
    }
}


1.2 同步Redis连接 RedisConnection

这是个同步的连接,跟 RedisAsyncConnection 套路基本一致,这个类的方法都是直接返回响应值的。 RedisAsyncConnection 的方法返回值都是 Future 。

在实现上内部也还是使用异步连接,它使用的是 await方法来等待异步操作完成,使得调用方式更简单。

1.3 Redis客户端 RedisClient

可扩展的线程安全Redis客户端,支持多个线程共享一个连接,只要避免使用阻塞和事务操作(如BLPOP和MULTI/EXEC)。

package com.lambdaworks.redis;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.net.InetSocketAddress;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.codec.Utf8StringCodec;
import com.lambdaworks.redis.protocol.Command;
import com.lambdaworks.redis.protocol.CommandHandler;
import com.lambdaworks.redis.protocol.ConnectionWatchdog;
import com.lambdaworks.redis.pubsub.PubSubCommandHandler;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection;


public class RedisClient {
    private EventLoopGroup group;
    private Bootstrap bootstrap;
    private HashedWheelTimer timer;  // 用于处理超时和重连的计时器
    private ChannelGroup channels;  //  管理所有活动连接的通道组
    private long timeout;  //  默认超时时间及其单位
    private TimeUnit unit;

   
    public RedisClient(String host) {
        this(host, 6379);
    }

   
    public RedisClient(String host, int port) {
        InetSocketAddress addr = new InetSocketAddress(host, port);

        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(group).remoteAddress(addr);

        setDefaultTimeout(60, TimeUnit.SECONDS);

        channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        timer    = new HashedWheelTimer();
        timer.start();
    }

   
    public void setDefaultTimeout(long timeout, TimeUnit unit) {
        this.timeout = timeout;
        this.unit    = unit;
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) unit.toMillis(timeout));
    }

    // 创建同步 Redis 连接
    public RedisConnection<String, String> connect() {
        return connect(new Utf8StringCodec());
    }

    // 创建异步 Redis 连接
    public RedisAsyncConnection<String, String> connectAsync() {
        return connectAsync(new Utf8StringCodec());
    }

    // 创建发布/订阅连接 
    public RedisPubSubConnection<String, String> connectPubSub() {
        return connectPubSub(new Utf8StringCodec());
    }

    // 下面三个方法跟上面一样,只是加了解码器
    public <K, V> RedisConnection<K, V> connect(RedisCodec<K, V> codec) {
        return new RedisConnection<K, V>(connectAsync(codec));
    }

    
    public <K, V> RedisAsyncConnection<K, V> connectAsync(RedisCodec<K, V> codec) {
        BlockingQueue<Command<K, V, ?>> queue = new LinkedBlockingQueue<Command<K, V, ?>>();

        CommandHandler<K, V> handler = new CommandHandler<K, V>(queue);
        RedisAsyncConnection<K, V> connection = new RedisAsyncConnection<K, V>(queue, codec, timeout, unit);

        return connect(handler, connection);
    }

    
    public <K, V> RedisPubSubConnection<K, V> connectPubSub(RedisCodec<K, V> codec) {
        BlockingQueue<Command<K, V, ?>> queue = new LinkedBlockingQueue<Command<K, V, ?>>();

        PubSubCommandHandler<K, V> handler = new PubSubCommandHandler<K, V>(queue, codec);
        RedisPubSubConnection<K, V> connection = new RedisPubSubConnection<K, V>(queue, codec, timeout, unit);

        return connect(handler, connection);
    }

    // connect 私有方法,上面的方法最终都是调用这个
    private <K, V, T extends RedisAsyncConnection<K, V>> T connect(final CommandHandler<K, V> handler, final T connection) {
        try {
            // 创建看门狗用于监控连接状态,处理断线重连
            final ConnectionWatchdog watchdog = new ConnectionWatchdog(bootstrap, channels, timer);

            // 开始创建连接
            ChannelFuture connect = null;
            // TODO use better concurrent workaround
            synchronized (bootstrap) {
                connect = bootstrap.handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        // pipeline中添加 看门狗、 redis命令响应处理器handler 和 redis连接处理器 connection

                        // pipeline 顺序很重要
                        // 看门狗放在最前面,这样它可以第一时间检测到连接问题并进行处理。
                        // Redis协议处理放中间,连接处理放最后
                        ch.pipeline().addLast(watchdog, handler, connection);
                    }
                }).connect();
            }
            // 等待连接完成
            connect.sync();
            // 看门狗看起自动重连
            watchdog.setReconnect(true);

            return connection;
        } catch (Throwable e) {
            throw new RedisException("Unable to connect", e);
        }
    }

   // 关闭操作
    public void shutdown() {
        // 关闭所有的 channel 
        for (Channel c : channels) {
            ChannelPipeline pipeline = c.pipeline();
            // 调用连接管理器去关闭连接
            RedisAsyncConnection<?, ?> connection = pipeline.get(RedisAsyncConnection.class);
            connection.close();
        }

        // 等待关闭结束
        ChannelGroupFuture future = channels.close();
        future.awaitUninterruptibly();
        group.shutdownGracefully().syncUninterruptibly();
        // 关闭定时器,重连的任务通过定时器触发
        timer.stop();
    }
}



1.4 看门狗 ConnectionWatchdog

这个类主要就是监控 Netty 的 Channel ,在掉线时进行重新连接。

package com.lambdaworks.redis.protocol;

import com.lambdaworks.redis.RedisAsyncConnection;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;

import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;


@ChannelHandler.Sharable
public class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask {
    private Bootstrap bootstrap;
    private Channel channel; // 当前活动的 channel
    private ChannelGroup channels;  // channel 群组,用于管理多个 channel
    private Timer timer;  // 定时任务,用于执行重连
    private boolean reconnect;  // 重连标记
    private int attempts;  // 重试连接次数


    public ConnectionWatchdog(Bootstrap bootstrap, ChannelGroup channels, Timer timer) {
        this.bootstrap = bootstrap;
        this.channels  = channels;
        this.timer     = timer;
    }

    public void setReconnect(boolean reconnect) {
        this.reconnect = reconnect;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 添加到 ChannelGroup 中
        channel = ctx.channel();
        channels.add(channel);
        // 重置重连次数
        attempts = 0;
        // 往后传播事件
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 需要重连的话执行重连逻辑
        if (reconnect) {
            // 最多尝试 8次重连,不会无线重试
            if (attempts < 8) attempts++;
            // 每次延长超时的时间,比如第一次2毫秒,第二次4毫秒,第三次8毫秒,以此类推,最多512毫秒。
            int timeout = 2 << attempts;
            timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS);
        }
        ctx.fireChannelInactive();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 发生异常就关闭 channel 
        ctx.channel().close();
    }

    // 执行重连逻辑
    @Override
    public void run(Timeout timeout) throws Exception {
        // 获取老的 channel pipeline 
        ChannelPipeline old = channel.pipeline();
        // 老的 pipeline 里面获取 CommandHandler 以及 RedisAsyncConnection 处理器
        final CommandHandler<?, ?> handler = old.get(CommandHandler.class);
        final RedisAsyncConnection<?, ?> connection = old.get(RedisAsyncConnection.class);

        // 准备重新发起连接
        ChannelFuture connect = null;

        // 同步块保证 bootstrap 的线程安全
        // TODO use better concurrent workaround
        synchronized (bootstrap) {
            // 把之前 pipeline 中的处理器加到新的 bootstrap 里面去 
            // 这样可以保持新旧连接行为一致
            connect = bootstrap.handler(new ChannelInitializer<Channel>() {
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    ch.pipeline().addLast(this, handler, connection);
                }
            }).connect();
        }
        // 阻塞等待当前线程,等到连接建立或者建立失败
        connect.sync();
    }
}


2. 发布订阅机制

主要是涉及到 Redis 发布订阅相关处理。

2.1 发布订阅命令Netty处理器 PubSubCommandHandler

用来处理 Redis 发布(Pub)/订阅(Sub) 命令的 Netty Channel 。

支持同时处理常规 Redis 命令和 Pub/Sub 消息。

package com.lambdaworks.redis.pubsub;

import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.*;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;

import java.util.concurrent.BlockingQueue;

public class PubSubCommandHandler<K, V> extends CommandHandler<K, V> {
    private RedisCodec<K, V> codec;   // Redis 数据编解码
    private PubSubOutput<K, V> output;  // 存储 Pub/Sub 操作的输出

    public PubSubCommandHandler(BlockingQueue<Command<K, V, ?>> queue, RedisCodec<K, V> codec) {
        super(queue);
        this.codec  = codec;
        this.output = new PubSubOutput<K, V>(codec);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
        // 处理队列中的常规命令

        //  检查当前是否没有正在处理的 Pub/Sub 消息
        while (output.type() == null && !queue.isEmpty()) {
            // 尝试解码接收到的数据。
            CommandOutput<K, V, ?> output = queue.peek().getOutput();
            // 返回 false说明解码失败,方法直接返回,等待更多数据来处理。
            if (!rsm.decode(buffer, output)) return;
            // 命令从队列中移除,调用回调方法。
            queue.take().complete();

            // PubSubOutput 类型触发 ChannelRead 事件,将结果传递给下一个处理器
            if (output instanceof PubSubOutput) ctx.fireChannelRead(output);
        }

        // 处理 Pub/Sub 消息
        while (rsm.decode(buffer, output)) {
            // 触发 ChannelRead 事件,将解码后的消息传递给下一个处理器
            ctx.fireChannelRead(output);

            // 创建一个新的 PubSubOutput 对象再继续处理下一条消息
            output = new PubSubOutput<K, V>(codec);
        }
    }

}

2.2 发布订阅监听器 RedisPubSubListener

Redis 发布/订阅事件的回调处理接口。

package com.lambdaworks.redis.pubsub;

public interface RedisPubSubListener<K, V> {
    // 处理普通的发布/订阅消息
    void message(K channel, V message);

    // 处理模式匹配的发布/订阅消息
    void message(K pattern, K channel, V message);

    // 处理成功订阅频道的事件
    void subscribed(K channel, long count);

    // 处理成功订阅模式的事件
    void psubscribed(K pattern, long count);

    // 处理取消订阅频道的事件
    void unsubscribed(K channel, long count);

    // 处理取消订阅模式的事件
    void punsubscribed(K pattern, long count);
}


### RedisPubSubAdapter

实现 RedisPubSubListener  接口的适配器给接口所有方法的空实现

```java
package com.lambdaworks.redis.pubsub;

public class RedisPubSubAdapter<K, V> implements RedisPubSubListener<K, V> {
    @Override
    public void message(K channel, V message) {
    }

    @Override
    public void message(K pattern, K channel, V message) {
    }

    @Override
    public void subscribed(K channel, long count) {
    }

    @Override
    public void psubscribed(K pattern, long count) {
    }

    @Override
    public void unsubscribed(K channel, long count) {
    }

    @Override
    public void punsubscribed(K pattern, long count) {
    }
}

2.3 发布订阅连接管理 RedisPubSubConnection

用于管理与 Redis 服务器的发布/订阅(Pub/Sub)连接。

package com.lambdaworks.redis.pubsub;

import com.lambdaworks.redis.RedisAsyncConnection;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.Command;
import com.lambdaworks.redis.protocol.CommandArgs;
import io.netty.channel.ChannelHandlerContext;

import java.lang.reflect.Array;
import java.util.*;
import java.util.concurrent.*;

import static com.lambdaworks.redis.protocol.CommandType.*;


public class RedisPubSubConnection<K, V> extends RedisAsyncConnection<K, V> {
    private List<RedisPubSubListener<K, V>> listeners;  // 保存所有注册的监听器
    private Set<K> channels;  // 当前订阅的频道
    private Set<K> patterns;  // 当前订阅的模式

   
    public RedisPubSubConnection(BlockingQueue<Command<K, V, ?>> queue, RedisCodec<K, V> codec, long timeout, TimeUnit unit) {
        super(queue, codec, timeout, unit);
        listeners = new CopyOnWriteArrayList<RedisPubSubListener<K, V>>();
        channels  = new HashSet<K>();
        patterns  = new HashSet<K>();
    }

   
    public void addListener(RedisPubSubListener<K, V> listener) {
        listeners.add(listener);
    }

    
    public void removeListener(RedisPubSubListener<K, V> listener) {
        listeners.remove(listener);
    }

    public void psubscribe(K... patterns) {
        dispatch(PSUBSCRIBE, new PubSubOutput<K, V>(codec), args(patterns));
    }

    public void punsubscribe(K... patterns) {
        dispatch(PUNSUBSCRIBE, new PubSubOutput<K, V>(codec), args(patterns));
    }

    public void subscribe(K... channels) {
        dispatch(SUBSCRIBE, new PubSubOutput<K, V>(codec), args(channels));
    }

    public void unsubscribe(K... channels) {
        dispatch(UNSUBSCRIBE, new PubSubOutput<K, V>(codec), args(channels));
    }

    @Override
    public synchronized void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);

        // 下面的逻辑主要是在重连后的逻辑
        // 重新订阅之前的频道
        if (channels.size() > 0) {
            subscribe(toArray(channels));
            channels.clear();
        }
        // 重新订阅之前的模式
        if (patterns.size() > 0) {
            psubscribe(toArray(patterns));
            patterns.clear();
        }
    }

    @Override
    @SuppressWarnings("unchecked")
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        PubSubOutput<K, V> output = (PubSubOutput<K, V>) msg;
        for (RedisPubSubListener<K, V> listener : listeners) {
            switch (output.type()) {
                // 普通的发布/订阅消息
                case message:
                    listener.message(output.channel(), output.get());
                    break;
                //  模式匹配的消息
                case pmessage:
                    listener.message(output.pattern(), output.channel(), output.get());
                    break;
                //  模式订阅确认   
                case psubscribe:
                    patterns.add(output.pattern());
                    listener.psubscribed(output.pattern(), output.count());
                    break;
                // 取消模式订阅确认    
                case punsubscribe:
                    patterns.remove(output.pattern());
                    listener.punsubscribed(output.pattern(), output.count());
                    break;
                // 频道订阅确认    
                case subscribe:
                    channels.add(output.channel());
                    listener.subscribed(output.channel(), output.count());
                    break;
                // 取消频道订阅确认    
                case unsubscribe:
                    channels.remove(output.channel());
                    listener.unsubscribed(output.channel(), output.count());
                    break;
            }
        }
    }

    private CommandArgs<K, V> args(K... keys) {
        CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
        args.addKeys(keys);
        return args;
    }

    @SuppressWarnings("unchecked")
    private <T> T[] toArray(Collection<T> c) {
        Class<T> cls = (Class<T>) c.iterator().next().getClass();
        T[] array = (T[]) Array.newInstance(cls, c.size());
        return c.toArray(array);
    }
}

2.4 发布订阅输出 PubSubOutput

用于处理 Redis 发布/订阅(Pub/Sub)操作输出。

补充下发布订阅类型的知识点,发布订阅有下面几种操作类型:

  • message: 普通的发布消息。
  • pmessage: 模式匹配的发布消息。
  • psubscribe: 订阅某个模式。
  • punsubscribe: 取消订阅某个模式。
  • subscribe: 订阅特定频道。
  • unsubscribe: 取消订阅特定频道。
package com.lambdaworks.redis.pubsub;

import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;

import java.nio.ByteBuffer;


public class PubSubOutput<K, V> extends CommandOutput<K, V, V> {
    enum Type { message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe }

    private Type type; // 当前处理的Pub/Sub操作类型
    private K channel;
    private K pattern;  // 订阅模式,用于模式订阅
    private long count;  // 订阅/取消订阅操作后的活跃的订阅个数

    public PubSubOutput(RedisCodec<K, V> codec) {
        super(codec, null);
    }

    public Type type() {
        return type;
    }

    public K channel() {
        return channel;
    }

    public K pattern() {
        return pattern;
    }

    public long count() {
        return count;
    }

    @Override
    @SuppressWarnings("fallthrough")
    public void set(ByteBuffer bytes) {
        // 将接收到的字节解码为ASCII并设置为 type。
        if (type == null) {
            // 直接通过Type枚举的 valueOf() 方法通过名称去匹配枚举 
            type = Type.valueOf(decodeAscii(bytes));
            return;
        }

        switch (type) {
            // 模式匹配的发布消息
            case pmessage:
                // pattern为null就会设置pattern并退出。
                if (pattern == null) {
                    pattern = codec.decodeKey(bytes);
                    break;
                }
                // 不为null,继续执行message的逻辑。 这里没有 break 
            // 普通的发布消息        
            case message:
                // channel为null,设置 channel 并退出
                if (channel == null) {
                    channel = codec.decodeKey(bytes);
                    break;
                }
                // 将bytes解码为值并设置为 output
                output = codec.decodeValue(bytes);
                break;
            // 订阅某个模式
            // 取消订阅某个模式
            case psubscribe:
            case punsubscribe:
                // 将bytes解码为key并设置为 pattern
                pattern = codec.decodeKey(bytes);
                break;
            // 订阅特定频道
            // 取消订阅特定频道     
            case subscribe:
            case unsubscribe:
                // 将bytes解码为key并设置为channel
                channel = codec.decodeKey(bytes);
                break;
        }
    }

    @Override
    public void set(long integer) {
        // 订阅/取消订阅操作后更新活跃订阅数
        count = integer;
    }
}

通过几个例子来理解这个类,主要还是要对 Redis 的发布订阅机制有一定的了解。

2.4.1 案例一 频道订阅例子

客户端订阅了一个名为 “my-channel” 的频道。 Redis 会发送下面这条消息给客户端 (为了可视化做了下拆分):

1) "subscribe"
2) "my-channel"
3) (integer) 1

看下 PubSubOutput 的处理流程:

final PubSubOutput<String, String> output = new PubSubOutput<>(stringCodec);

// 第一次调用 set
output.set(ByteBuffer.wrap("subscribe".getBytes()));
// output.type() == Type.subscribe

// 第二次调用 set
output.set(ByteBuffer.wrap("my-channel".getBytes()));
// output.channel() == "my-channel"

// 第三次调用 set
output.set(1);
// output.count() == 1

System.out.println("Type: " + output.type());   // subscribe
System.out.println("Channel: " + output.channel()); // my-channel
System.out.println("Count: " + output.count());  // 1


2.4.2 案例二 频道接收消息例子

客户端收到了发布到 “my-channel” 的消息 “new channel message”。Redis 会发送下面这条消息给客户端 (为了可视化做了下拆分):

1) "message"
2) "my-channel"
3) "new channel message"

看下 PubSubOutput 的处理流程:

final PubSubOutput<String, String> output = new PubSubOutput<>(stringCodec);

// 第一次调用 set
output.set(ByteBuffer.wrap("message".getBytes()));
//  output.type() == Type.message

// 第二次调用 set
output.set(ByteBuffer.wrap("my-channel".getBytes()));
//  output.channel() == "my-channel"

// 第三次调用 set
output.set(ByteBuffer.wrap("new channel message".getBytes()));
//  output 中的 output 字段被设置为 "new channel message"

System.out.println("Type: " + output.type());   // message
System.out.println("Channel: " + output.channel());  // my-channel
System.out.println("Message: " + output.get());  // new channel message

2.4.3 案例三 模式订阅例子

客户端使用模式 “channel.*” 进行订阅。 Redis 会发送下面这条消息给客户端 (为了可视化做了下拆分):

1) "psubscribe"
2) "channel.*"
3) (integer) 1

看下 PubSubOutput 的处理流程:

final PubSubOutput<String, String> output = new PubSubOutput<>(stringCodec);

// 第一次调用 set
output.set(ByteBuffer.wrap("psubscribe".getBytes()));
// output.type() == Type.psubscribe

// 第二次调用 set
output.set(ByteBuffer.wrap("channel.*".getBytes()));
// output.pattern() == "channel.*"

// 第三次调用 set
output.set(1);
// output.count() == 1

System.out.println("Type: " + output.type());  // psubscribe
System.out.println("Pattern: " + output.pattern());  // channel.*
System.out.println("Count: " + output.count());  // 1

2.4.4 案例四 接收模式匹配的消息例子

客户端使用模式 “channel.*” 订阅了消息。 然后有消息被发布到 “channel.channel123” 频道。 Redis 会发送下面这条消息给客户端 (为了可视化做了下拆分):

1) "pmessage"
2) "channel.*"
3) "channel.channel123"
4) "channel message content"

可以看到比普通的 message 多了一个具体的子channel 名称。

看下 PubSubOutput 的处理流程:

PubSubOutput<String, String> output = new PubSubOutput<>(stringCodec);

// 第一次调用 set
output.set(ByteBuffer.wrap("pmessage".getBytes()));
// 此时 output.type() == Type.pmessage

// 第二次调用 set
output.set(ByteBuffer.wrap("channel.*".getBytes()));
// 此时 output.pattern() == "channel.*"

// 第三次调用 set
output.set(ByteBuffer.wrap("channel.channel123".getBytes()));
// 此时 output.channel() == "channel.channel123"

// 第四次调用 set
output.set(ByteBuffer.wrap("channel message content".getBytes()));
// output 字段被设置为 "channel message content"

System.out.println("Type: " + output.type());   // pmessage
System.out.println("Pattern: " + output.pattern()); // channel.*
System.out.println("Channel: " + output.channel()); // channel.channel123
System.out.println("Message: " + output.get());  // channel message content