RedissonV1.0-Lettuce通信源码分析-连接管理以及发布订阅机制
RedissonV1.0-Lettuce通信源码分析-连接管理以及发布订阅机制
1. 连接管理
连接管理主要是跟 Netty 相关的一些代码,涉及到连接的建立、断开、重连、数据的处理等。
1.1 异步Redis连接 RedisAsyncConnection
这是一个实现了异步 Redis 连接的类,在整个 Lettuce 中是最核心的一个类之一。
本身提供了一个线程安全的方式来与 Redis 服务器进行异步通信。
package com.lambdaworks.redis;
import com.lambdaworks.codec.Base16;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.output.*;
import com.lambdaworks.redis.protocol.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.*;
import static com.lambdaworks.redis.protocol.CommandKeyword.*;
import static com.lambdaworks.redis.protocol.CommandType.*;
@ChannelHandler.Sharable
public class RedisAsyncConnection<K, V> extends ChannelInboundHandlerAdapter {
protected BlockingQueue<Command<K, V, ?>> queue; // 保存命令的阻塞队列
protected RedisCodec<K, V> codec; // 编解码器
protected Channel channel; // 跟 Redis 通信的 Channel
protected long timeout; // Redis命令响应等待的超时时间
protected TimeUnit unit; // Redis命令超时时间单位
protected MultiOutput<K, V> multi; // 处理 Redis 事务 multi/exec 的输出
private String password; // 连接密码
private int db; // 选择的 redis 数据库下标
private boolean closed; // 是否关闭的标志
public RedisAsyncConnection(BlockingQueue<Command<K, V, ?>> queue, RedisCodec<K, V> codec, long timeout, TimeUnit unit) {
this.queue = queue;
this.codec = codec;
this.timeout = timeout;
this.unit = unit;
}
public void setTimeout(long timeout, TimeUnit unit) {
this.timeout = timeout;
this.unit = unit;
}
// 实现 APPEND 命令,返回的是 Futrue 表明是异步操作
public Future<Long> append(K key, V value) {
return dispatch(APPEND, new IntegerOutput<K, V>(codec), key, value);
}
// 实现 Redis 的 Auth 命令
public String auth(String password) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(password);
// 执行 auth 命令
Command<K, V, String> cmd = dispatch(AUTH, new StatusOutput<K, V>(codec), args);
// 阻塞等待 Redis 应答,会有最大的超时时间等待
String status = await(cmd, timeout, unit);
// 返回 "OK" 认证成功,会更新 this.password
if ("OK".equals(status)) this.password = password;
return status;
}
// 实现 BGREWRITEAOF 命令, 用于异步重写AOF文件
public Future<String> bgrewriteaof() {
return dispatch(BGREWRITEAOF, new StatusOutput<K, V>(codec));
}
// BGSAVE命令,用于在后台保存数据到磁盘
public Future<String> bgsave() {
return dispatch(BGSAVE, new StatusOutput<K, V>(codec));
}
// 对应 Redis 的 BITCOUNT 命令,用于计算字符串中比特位为 1 的数量
public Future<Long> bitcount(K key) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key);
return dispatch(BITCOUNT, new IntegerOutput<K, V>(codec), args);
}
public Future<Long> bitcount(K key, long start, long end) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.addKey(key).add(start).add(end);
return dispatch(BITCOUNT, new IntegerOutput<K, V>(codec), args);
}
// 下面几个都是 BITOP命令 用于执行位操作
public Future<Long> bitopAnd(K destination, K... keys) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.add(AND).addKey(destination).addKeys(keys);
return dispatch(BITOP, new IntegerOutput<K, V>(codec), args);
}
public Future<Long> bitopNot(K destination, K source) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.add(NOT).addKey(destination).addKey(source);
return dispatch(BITOP, new IntegerOutput<K, V>(codec), args);
}
public Future<Long> bitopOr(K destination, K... keys) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.add(OR).addKey(destination).addKeys(keys);
return dispatch(BITOP, new IntegerOutput<K, V>(codec), args);
}
public Future<Long> bitopXor(K destination, K... keys) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.add(XOR).addKey(destination).addKeys(keys);
return dispatch(BITOP, new IntegerOutput<K, V>(codec), args);
}
// BLPOP和BRPOP命令 用于阻塞式地从 List 中弹出元素。
public Future<KeyValue<K, V>> blpop(long timeout, K... keys) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKeys(keys).add(timeout);
return dispatch(BLPOP, new KeyValueOutput<K, V>(codec), args);
}
//
public Future<KeyValue<K, V>> brpop(long timeout, K... keys) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKeys(keys).add(timeout);
return dispatch(BRPOP, new KeyValueOutput<K, V>(codec), args);
}
// BRPOPLPUSH 命令,从 List 尾部弹出元素并推入另一个 List 头部。
public Future<V> brpoplpush(long timeout, K source, K destination) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.addKey(source).addKey(destination).add(timeout);
return dispatch(BRPOPLPUSH, new ValueOutput<K, V>(codec), args);
}
public Future<K> clientGetname() {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(GETNAME);
return dispatch(CLIENT, new KeyOutput<K, V>(codec), args);
}
// 下面几个都是管理客户端连接的命令
public Future<String> clientSetname(K name) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(SETNAME).addKey(name);
return dispatch(CLIENT, new StatusOutput<K, V>(codec), args);
}
public Future<String> clientKill(String addr) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(KILL).add(addr);
return dispatch(CLIENT, new StatusOutput<K, V>(codec), args);
}
public Future<String> clientList() {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(LIST);
return dispatch(CLIENT, new StatusOutput<K, V>(codec), args);
}
public Future<List<String>> configGet(String parameter) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(GET).add(parameter);
return dispatch(CONFIG, new StringListOutput<K, V>(codec), args);
}
public Future<String> configResetstat() {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(RESETSTAT);
return dispatch(CONFIG, new StatusOutput<K, V>(codec), args);
}
public Future<String> configSet(String parameter, String value) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(SET).add(parameter).add(value);
return dispatch(CONFIG, new StatusOutput<K, V>(codec), args);
}
// DBSIZE命令 返回当前数据库中 key 的数量。
public Future<Long> dbsize() {
return dispatch(DBSIZE, new IntegerOutput<K, V>(codec));
}
public Future<String> debugObject(K key) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(OBJECT).addKey(key);
return dispatch(DEBUG, new StatusOutput<K, V>(codec), args);
}
// DECR 和 DECRBY 命令 用于对数值进行递减操作。
public Future<Long> decr(K key) {
return dispatch(DECR, new IntegerOutput<K, V>(codec), key);
}
public Future<Long> decrby(K key, long amount) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(amount);
return dispatch(DECRBY, new IntegerOutput<K, V>(codec), args);
}
// 删除一个或多个key
public Future<Long> del(K... keys) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKeys(keys);
return dispatch(DEL, new IntegerOutput<K, V>(codec), args);
}
// 取消一个事务 注意它会清除multi字段
public Future<String> discard() {
if (multi != null) {
multi.cancel();
multi = null;
}
return dispatch(DISCARD, new StatusOutput<K, V>(codec));
}
public Future<byte[]> dump(K key) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key);
return dispatch(DUMP, new ByteArrayOutput<K, V>(codec), args);
}
public Future<V> echo(V msg) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addValue(msg);
return dispatch(ECHO, new ValueOutput<K, V>(codec), args);
}
// 用于执行 Redis EVAL和 EVALSHA 命令,用于执行Lua脚本。
// 这里可以留意下,后面 Redis 会使用这种执行 lua 脚本的方式。
public <T> Future<T> eval(V script, ScriptOutputType type, K[] keys, V... values) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.addValue(script).add(keys.length).addKeys(keys).addValues(values);
CommandOutput<K, V, T> output = newScriptOutput(codec, type);
return dispatch(EVAL, output, args);
}
public <T> Future<T> evalsha(String digest, ScriptOutputType type, K[] keys, V... values) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.add(digest).add(keys.length).addKeys(keys).addValues(values);
CommandOutput<K, V, T> output = newScriptOutput(codec, type);
return dispatch(EVALSHA, output, args);
}
// 检查给定key是否存在
public Future<Boolean> exists(K key) {
return dispatch(EXISTS, new BooleanOutput<K, V>(codec), key);
}
// 下面几个都是设置各种过期命令,用于设置key的过期时间。
public Future<Boolean> expire(K key, long seconds) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(seconds);
return dispatch(EXPIRE, new BooleanOutput<K, V>(codec), args);
}
public Future<Boolean> expireat(K key, Date timestamp) {
return expireat(key, timestamp.getTime() / 1000);
}
public Future<Boolean> expireat(K key, long timestamp) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(timestamp);
return dispatch(EXPIREAT, new BooleanOutput<K, V>(codec), args);
}
// 用于执行事务 注意它会清除multi字段。
public Future<List<Object>> exec() {
MultiOutput<K, V> multi = this.multi;
this.multi = null;
if (multi == null) multi = new MultiOutput<K, V>(codec);
return dispatch(EXEC, multi);
}
// 用于清空所有数据库或当前数据库。
public Future<String> flushall() throws Exception {
return dispatch(FLUSHALL, new StatusOutput<K, V>(codec));
}
public Future<String> flushdb() throws Exception {
return dispatch(FLUSHDB, new StatusOutput<K, V>(codec));
}
// 下面的方法都是用于获取指定 key 的值。
public Future<V> get(K key) {
return dispatch(GET, new ValueOutput<K, V>(codec), key);
}
// 用于获取字符串指定位置的 bit 值
public Future<Long> getbit(K key, long offset) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(offset);
return dispatch(GETBIT, new IntegerOutput<K, V>(codec), args);
}
public Future<V> getrange(K key, long start, long end) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(start).add(end);
return dispatch(GETRANGE, new ValueOutput<K, V>(codec), args);
}
public Future<V> getset(K key, V value) {
return dispatch(GETSET, new ValueOutput<K, V>(codec), key, value);
}
// hash相关的操作方法
public Future<Long> hdel(K key, K... fields) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addKeys(fields);
return dispatch(HDEL, new IntegerOutput<K, V>(codec), args);
}
public Future<Boolean> hexists(K key, K field) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addKey(field);
return dispatch(HEXISTS, new BooleanOutput<K, V>(codec), args);
}
public Future<V> hget(K key, K field) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addKey(field);
return dispatch(HGET, new ValueOutput<K, V>(codec), args);
}
public Future<Long> hincrby(K key, K field, long amount) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addKey(field).add(amount);
return dispatch(HINCRBY, new IntegerOutput<K, V>(codec), args);
}
public Future<Double> hincrbyfloat(K key, K field, double amount) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addKey(field).add(amount);
return dispatch(HINCRBYFLOAT, new DoubleOutput<K, V>(codec), args);
}
public Future<Map<K, V>> hgetall(K key) {
return dispatch(HGETALL, new MapOutput<K, V>(codec), key);
}
public Future<List<K>> hkeys(K key) {
return dispatch(HKEYS, new KeyListOutput<K, V>(codec), key);
}
public Future<Long> hlen(K key) {
return dispatch(HLEN, new IntegerOutput<K, V>(codec), key);
}
public Future<List<V>> hmget(K key, K... fields) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addKeys(fields);
return dispatch(HMGET, new ValueListOutput<K, V>(codec), args);
}
public Future<String> hmset(K key, Map<K, V> map) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(map);
return dispatch(HMSET, new StatusOutput<K, V>(codec), args);
}
public Future<Boolean> hset(K key, K field, V value) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addKey(field).addValue(value);
return dispatch(HSET, new BooleanOutput<K, V>(codec), args);
}
public Future<Boolean> hsetnx(K key, K field, V value) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addKey(field).addValue(value);
return dispatch(HSETNX, new BooleanOutput<K, V>(codec), args);
}
public Future<List<V>> hvals(K key) {
return dispatch(HVALS, new ValueListOutput<K, V>(codec), key);
}
// 自增相关的方法
public Future<Long> incr(K key) {
return dispatch(INCR, new IntegerOutput<K, V>(codec), key);
}
public Future<Long> incrby(K key, long amount) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(amount);
return dispatch(INCRBY, new IntegerOutput<K, V>(codec), args);
}
public Future<Double> incrbyfloat(K key, double amount) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(amount);
return dispatch(INCRBYFLOAT, new DoubleOutput<K, V>(codec), args);
}
// 获取服务器的各种信息
public Future<String> info() {
return dispatch(INFO, new StatusOutput<K, V>(codec));
}
public Future<String> info(String section) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(section);
return dispatch(INFO, new StatusOutput<K, V>(codec), args);
}
// 用于查找所有符合给定模式的 key
public Future<List<K>> keys(K pattern) {
return dispatch(KEYS, new KeyListOutput<K, V>(codec), pattern);
}
// 最后一次成功保存到磁盘的时间
public Future<Date> lastsave() {
return dispatch(LASTSAVE, new DateOutput<K, V>(codec));
}
// List 相关的方法
public Future<V> lindex(K key, long index) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(index);
return dispatch(LINDEX, new ValueOutput<K, V>(codec), args);
}
public Future<Long> linsert(K key, boolean before, V pivot, V value) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.addKey(key).add(before ? BEFORE : AFTER).addValue(pivot).addValue(value);
return dispatch(LINSERT, new IntegerOutput<K, V>(codec), args);
}
public Future<Long> llen(K key) {
return dispatch(LLEN, new IntegerOutput<K, V>(codec), key);
}
public Future<V> lpop(K key) {
return dispatch(LPOP, new ValueOutput<K, V>(codec), key);
}
public Future<Long> lpush(K key, V... values) {
return dispatch(LPUSH, new IntegerOutput<K, V>(codec), key, values);
}
public Future<Long> lpushx(K key, V value) {
return dispatch(LPUSHX, new IntegerOutput<K, V>(codec), key, value);
}
public Future<List<V>> lrange(K key, long start, long stop) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(start).add(stop);
return dispatch(LRANGE, new ValueListOutput<K, V>(codec), args);
}
public Future<Long> lrem(K key, long count, V value) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(count).addValue(value);
return dispatch(LREM, new IntegerOutput<K, V>(codec), args);
}
public Future<String> lset(K key, long index, V value) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(index).addValue(value);
return dispatch(LSET, new StatusOutput<K, V>(codec), args);
}
public Future<String> ltrim(K key, long start, long stop) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(start).add(stop);
return dispatch(LTRIM, new StatusOutput<K, V>(codec), args);
}
// 将 key 迁移到另一个 Redis 实例。
public Future<String> migrate(String host, int port, K key, int db, long timeout) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.add(host).add(port).addKey(key).add(db).add(timeout);
return dispatch(MIGRATE, new StatusOutput<K, V>(codec), args);
}
public Future<List<V>> mget(K... keys) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKeys(keys);
return dispatch(MGET, new ValueListOutput<K, V>(codec), args);
}
// 将当前数据库的键移动到另一个数据库
public Future<Boolean> move(K key, int db) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(db);
return dispatch(MOVE, new BooleanOutput<K, V>(codec), args);
}
public Future<String> multi() {
Command<K, V, String> cmd = dispatch(MULTI, new StatusOutput<K, V>(codec));
multi = (multi == null ? new MultiOutput<K, V>(codec) : multi);
return cmd;
}
public Future<String> mset(Map<K, V> map) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(map);
return dispatch(MSET, new StatusOutput<K, V>(codec), args);
}
public Future<Boolean> msetnx(Map<K, V> map) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(map);
return dispatch(MSETNX, new BooleanOutput<K, V>(codec), args);
}
public Future<String> objectEncoding(K key) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(ENCODING).addKey(key);
return dispatch(OBJECT, new StatusOutput<K, V>(codec), args);
}
public Future<Long> objectIdletime(K key) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(IDLETIME).addKey(key);
return dispatch(OBJECT, new IntegerOutput<K, V>(codec), args);
}
public Future<Long> objectRefcount(K key) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(REFCOUNT).addKey(key);
return dispatch(OBJECT, new IntegerOutput<K, V>(codec), args);
}
public Future<Boolean> persist(K key) {
return dispatch(PERSIST, new BooleanOutput<K, V>(codec), key);
}
public Future<Boolean> pexpire(K key, long milliseconds) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(milliseconds);
return dispatch(PEXPIRE, new BooleanOutput<K, V>(codec), args);
}
public Future<Boolean> pexpireat(K key, Date timestamp) {
return pexpireat(key, timestamp.getTime());
}
public Future<Boolean> pexpireat(K key, long timestamp) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(timestamp);
return dispatch(PEXPIREAT, new BooleanOutput<K, V>(codec), args);
}
public Future<String> ping() {
return dispatch(PING, new StatusOutput<K, V>(codec));
}
public Future<Long> pttl(K key) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key);
return dispatch(PTTL, new IntegerOutput<K, V>(codec), args);
}
public Future<Long> publish(K channel, V message) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(channel).addValue(message);
return dispatch(PUBLISH, new IntegerOutput<K, V>(codec), args);
}
public Future<String> quit() {
return dispatch(QUIT, new StatusOutput<K, V>(codec));
}
public Future<V> randomkey() {
return dispatch(RANDOMKEY, new ValueOutput<K, V>(codec));
}
public Future<String> rename(K key, K newKey) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addKey(newKey);
return dispatch(RENAME, new StatusOutput<K, V>(codec), args);
}
public Future<Boolean> renamenx(K key, K newKey) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addKey(newKey);
return dispatch(RENAMENX, new BooleanOutput<K, V>(codec), args);
}
public Future<String> restore(K key, long ttl, byte[] value) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(ttl).add(value);
return dispatch(RESTORE, new StatusOutput<K, V>(codec), args);
}
public Future<V> rpop(K key) {
return dispatch(RPOP, new ValueOutput<K, V>(codec), key);
}
public Future<V> rpoplpush(K source, K destination) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(source).addKey(destination);
return dispatch(RPOPLPUSH, new ValueOutput<K, V>(codec), args);
}
public Future<Long> rpush(K key, V... values) {
return dispatch(RPUSH, new IntegerOutput<K, V>(codec), key, values);
}
public Future<Long> rpushx(K key, V value) {
return dispatch(RPUSHX, new IntegerOutput<K, V>(codec), key, value);
}
public Future<Long> sadd(K key, V... members) {
return dispatch(SADD, new IntegerOutput<K, V>(codec), key, members);
}
public Future<String> save() {
return dispatch(SAVE, new StatusOutput<K, V>(codec));
}
public Future<Long> scard(K key) {
return dispatch(SCARD, new IntegerOutput<K, V>(codec), key);
}
public Future<List<Boolean>> scriptExists(String... digests) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(EXISTS);
for (String sha : digests) args.add(sha);
return dispatch(SCRIPT, new BooleanListOutput<K, V>(codec), args);
}
public Future<String> scriptFlush() {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(FLUSH);
return dispatch(SCRIPT, new StatusOutput<K, V>(codec), args);
}
public Future<String> scriptKill() {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(KILL);
return dispatch(SCRIPT, new StatusOutput<K, V>(codec), args);
}
public Future<String> scriptLoad(V script) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(LOAD).addValue(script);
return dispatch(SCRIPT, new StatusOutput<K, V>(codec), args);
}
public Future<Set<V>> sdiff(K... keys) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKeys(keys);
return dispatch(SDIFF, new ValueSetOutput<K, V>(codec), args);
}
public Future<Long> sdiffstore(K destination, K... keys) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(destination).addKeys(keys);
return dispatch(SDIFFSTORE, new IntegerOutput<K, V>(codec), args);
}
public String select(int db) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(db);
Command<K, V, String> cmd = dispatch(SELECT, new StatusOutput<K, V>(codec), args);
String status = await(cmd, timeout, unit);
if ("OK".equals(status)) this.db = db;
return status;
}
public Future<String> set(K key, V value) {
return dispatch(SET, new StatusOutput<K, V>(codec), key, value);
}
public Future<Long> setbit(K key, long offset, int value) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(offset).add(value);
return dispatch(SETBIT, new IntegerOutput<K, V>(codec), args);
}
public Future<String> setex(K key, long seconds, V value) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(seconds).addValue(value);
return dispatch(SETEX, new StatusOutput<K, V>(codec), args);
}
public Future<Boolean> setnx(K key, V value) {
return dispatch(SETNX, new BooleanOutput<K, V>(codec), key, value);
}
public Future<Long> setrange(K key, long offset, V value) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(offset).addValue(value);
return dispatch(SETRANGE, new IntegerOutput<K, V>(codec), args);
}
@Deprecated
public void shutdown() {
dispatch(SHUTDOWN, new StatusOutput<K, V>(codec));
}
public void shutdown(boolean save) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
dispatch(SHUTDOWN, new StatusOutput<K, V>(codec), save ? args.add(SAVE) : args.add(NOSAVE));
}
public Future<Set<V>> sinter(K... keys) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKeys(keys);
return dispatch(SINTER, new ValueSetOutput<K, V>(codec), args);
}
public Future<Long> sinterstore(K destination, K... keys) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(destination).addKeys(keys);
return dispatch(SINTERSTORE, new IntegerOutput<K, V>(codec), args);
}
public Future<Boolean> sismember(K key, V member) {
return dispatch(SISMEMBER, new BooleanOutput<K, V>(codec), key, member);
}
public Future<Boolean> smove(K source, K destination, V member) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(source).addKey(destination).addValue(member);
return dispatch(SMOVE, new BooleanOutput<K, V>(codec), args);
}
public Future<String> slaveof(String host, int port) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(host).add(port);
return dispatch(SLAVEOF, new StatusOutput<K, V>(codec), args);
}
public Future<String> slaveofNoOne() {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(NO).add(ONE);
return dispatch(SLAVEOF, new StatusOutput<K, V>(codec), args);
}
public Future<List<Object>> slowlogGet() {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(GET);
return dispatch(SLOWLOG, new NestedMultiOutput<K, V>(codec), args);
}
public Future<List<Object>> slowlogGet(int count) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(GET).add(count);
return dispatch(SLOWLOG, new NestedMultiOutput<K, V>(codec), args);
}
public Future<Long> slowlogLen() {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(LEN);
return dispatch(SLOWLOG, new IntegerOutput<K, V>(codec), args);
}
public Future<String> slowlogReset() {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(RESET);
return dispatch(SLOWLOG, new StatusOutput<K, V>(codec), args);
}
public Future<Set<V>> smembers(K key) {
return dispatch(SMEMBERS, new ValueSetOutput<K, V>(codec), key);
}
public Future<List<V>> sort(K key) {
return dispatch(SORT, new ValueListOutput<K, V>(codec), key);
}
public Future<List<V>> sort(K key, SortArgs sortArgs) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key);
sortArgs.build(args, null);
return dispatch(SORT, new ValueListOutput<K, V>(codec), args);
}
public Future<Long> sortStore(K key, SortArgs sortArgs, K destination) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key);
sortArgs.build(args, destination);
return dispatch(SORT, new IntegerOutput<K, V>(codec), args);
}
public Future<V> spop(K key) {
return dispatch(SPOP, new ValueOutput<K, V>(codec), key);
}
public Future<V> srandmember(K key) {
return dispatch(SRANDMEMBER, new ValueOutput<K, V>(codec), key);
}
public Future<Set<V>> srandmember(K key, long count) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(count);
return dispatch(SRANDMEMBER, new ValueSetOutput<K, V>(codec), args);
}
public Future<Long> srem(K key, V... members) {
return dispatch(SREM, new IntegerOutput<K, V>(codec), key, members);
}
public Future<Set<V>> sunion(K... keys) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKeys(keys);
return dispatch(SUNION, new ValueSetOutput<K, V>(codec), args);
}
public Future<Long> sunionstore(K destination, K... keys) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(destination).addKeys(keys);
return dispatch(SUNIONSTORE, new IntegerOutput<K, V>(codec), args);
}
public Future<String> sync() {
return dispatch(SYNC, new StatusOutput<K, V>(codec));
}
public Future<Long> strlen(K key) {
return dispatch(STRLEN, new IntegerOutput<K, V>(codec), key);
}
public Future<Long> ttl(K key) {
return dispatch(TTL, new IntegerOutput<K, V>(codec), key);
}
public Future<String> type(K key) {
return dispatch(TYPE, new StatusOutput<K, V>(codec), key);
}
public Future<String> watch(K... keys) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKeys(keys);
return dispatch(WATCH, new StatusOutput<K, V>(codec), args);
}
public Future<String> unwatch() {
return dispatch(UNWATCH, new StatusOutput<K, V>(codec));
}
public Future<Long> zadd(K key, double score, V member) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(score).addValue(member);
return dispatch(ZADD, new IntegerOutput<K, V>(codec), args);
}
@SuppressWarnings("unchecked")
public Future<Long> zadd(K key, Object... scoresAndValues) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key);
for (int i = 0; i < scoresAndValues.length; i += 2) {
args.add((Double) scoresAndValues[i]);
args.addValue((V) scoresAndValues[i + 1]);
}
return dispatch(ZADD, new IntegerOutput<K, V>(codec), args);
}
public Future<Long> zcard(K key) {
return dispatch(ZCARD, new IntegerOutput<K, V>(codec), key);
}
public Future<Long> zcount(K key, double min, double max) {
return zcount(key, string(min), string(max));
}
public Future<Long> zcount(K key, String min, String max) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(min).add(max);
return dispatch(ZCOUNT, new IntegerOutput<K, V>(codec), args);
}
public Future<Double> zincrby(K key, double amount, K member) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(amount).addKey(member);
return dispatch(ZINCRBY, new DoubleOutput<K, V>(codec), args);
}
public Future<Long> zinterstore(K destination, K... keys) {
return zinterstore(destination, new ZStoreArgs(), keys);
}
public Future<Long> zinterstore(K destination, ZStoreArgs storeArgs, K... keys) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(destination).add(keys.length).addKeys(keys);
storeArgs.build(args);
return dispatch(ZINTERSTORE, new IntegerOutput<K, V>(codec), args);
}
public Future<List<V>> zrange(K key, long start, long stop) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(start).add(stop);
return dispatch(ZRANGE, new ValueListOutput<K, V>(codec), args);
}
public Future<List<ScoredValue<V>>> zrangeWithScores(K key, long start, long stop) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.addKey(key).add(start).add(stop).add(WITHSCORES);
return dispatch(ZRANGE, new ScoredValueListOutput<K, V>(codec), args);
}
public Future<List<V>> zrangebyscore(K key, double min, double max) {
return zrangebyscore(key, string(min), string(max));
}
public Future<List<V>> zrangebyscore(K key, String min, String max) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(min).add(max);
return dispatch(ZRANGEBYSCORE, new ValueListOutput<K, V>(codec), args);
}
public Future<List<V>> zrangebyscore(K key, double min, double max, long offset, long count) {
return zrangebyscore(key, string(min), string(max), offset, count);
}
public Future<List<V>> zrangebyscore(K key, String min, String max, long offset, long count) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.addKey(key).add(min).add(max).add(LIMIT).add(offset).add(count);
return dispatch(ZRANGEBYSCORE, new ValueListOutput<K, V>(codec), args);
}
public Future<List<ScoredValue<V>>> zrangebyscoreWithScores(K key, double min, double max) {
return zrangebyscoreWithScores(key, string(min), string(max));
}
public Future<List<ScoredValue<V>>> zrangebyscoreWithScores(K key, String min, String max) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.addKey(key).add(min).add(max).add(WITHSCORES);
return dispatch(ZRANGEBYSCORE, new ScoredValueListOutput<K, V>(codec), args);
}
public Future<List<ScoredValue<V>>> zrangebyscoreWithScores(K key, double min, double max, long offset, long count) {
return zrangebyscoreWithScores(key, string(min), string(max), offset, count);
}
public Future<List<ScoredValue<V>>> zrangebyscoreWithScores(K key, String min, String max, long offset, long count) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.addKey(key).add(min).add(max).add(WITHSCORES).add(LIMIT).add(offset).add(count);
return dispatch(ZRANGEBYSCORE, new ScoredValueListOutput<K, V>(codec), args);
}
public Future<Long> zrank(K key, V member) {
return dispatch(ZRANK, new IntegerOutput<K, V>(codec), key, member);
}
public Future<Long> zrem(K key, V... members) {
return dispatch(ZREM, new IntegerOutput<K, V>(codec), key, members);
}
public Future<Long> zremrangebyrank(K key, long start, long stop) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(start).add(stop);
return dispatch(ZREMRANGEBYRANK, new IntegerOutput<K, V>(codec), args);
}
public Future<Long> zremrangebyscore(K key, double min, double max) {
return zremrangebyscore(key, string(min), string(max));
}
public Future<Long> zremrangebyscore(K key, String min, String max) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(min).add(max);
return dispatch(ZREMRANGEBYSCORE, new IntegerOutput<K, V>(codec), args);
}
public Future<List<V>> zrevrange(K key, long start, long stop) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(start).add(stop);
return dispatch(ZREVRANGE, new ValueListOutput<K, V>(codec), args);
}
public Future<List<ScoredValue<V>>> zrevrangeWithScores(K key, long start, long stop) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.addKey(key).add(start).add(stop).add(WITHSCORES);
return dispatch(ZREVRANGE, new ScoredValueListOutput<K, V>(codec), args);
}
public Future<List<V>> zrevrangebyscore(K key, double max, double min) {
return zrevrangebyscore(key, string(max), string(min));
}
public Future<List<V>> zrevrangebyscore(K key, String max, String min) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(max).add(min);
return dispatch(ZREVRANGEBYSCORE, new ValueListOutput<K, V>(codec), args);
}
public Future<List<V>> zrevrangebyscore(K key, double max, double min, long offset, long count) {
return zrevrangebyscore(key, string(max), string(min), offset, count);
}
public Future<List<V>> zrevrangebyscore(K key, String max, String min, long offset, long count) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.addKey(key).add(max).add(min).add(LIMIT).add(offset).add(count);
return dispatch(ZREVRANGEBYSCORE, new ValueListOutput<K, V>(codec), args);
}
public Future<List<ScoredValue<V>>> zrevrangebyscoreWithScores(K key, double max, double min) {
return zrevrangebyscoreWithScores(key, string(max), string(min));
}
public Future<List<ScoredValue<V>>> zrevrangebyscoreWithScores(K key, String max, String min) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.addKey(key).add(max).add(min).add(WITHSCORES);
return dispatch(ZREVRANGEBYSCORE, new ScoredValueListOutput<K, V>(codec), args);
}
public Future<List<ScoredValue<V>>> zrevrangebyscoreWithScores(K key, double max, double min, long offset, long count) {
return zrevrangebyscoreWithScores(key, string(max), string(min), offset, count);
}
public Future<List<ScoredValue<V>>> zrevrangebyscoreWithScores(K key, String max, String min, long offset, long count) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.addKey(key).add(max).add(min).add(WITHSCORES).add(LIMIT).add(offset).add(count);
return dispatch(ZREVRANGEBYSCORE, new ScoredValueListOutput<K, V>(codec), args);
}
public Future<Long> zrevrank(K key, V member) {
return dispatch(ZREVRANK, new IntegerOutput<K, V>(codec), key, member);
}
public Future<Double> zscore(K key, V member) {
return dispatch(ZSCORE, new DoubleOutput<K, V>(codec), key, member);
}
public Future<Long> zunionstore(K destination, K... keys) {
return zunionstore(destination, new ZStoreArgs(), keys);
}
public Future<Long> zunionstore(K destination, ZStoreArgs storeArgs, K... keys) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.addKey(destination).add(keys.length).addKeys(keys);
storeArgs.build(args);
return dispatch(ZUNIONSTORE, new IntegerOutput<K, V>(codec), args);
}
// 等待多个Future完成,使用默认的超时时间
public boolean awaitAll(Future<?>... futures) {
return awaitAll(timeout, unit, futures);
}
// 等待多个Future完成, 总的超时时间由方法指定。 方法可以确保在给定时间内所有 Future 都能完成或超时。
public boolean awaitAll(long timeout, TimeUnit unit, Future<?>... futures) {
boolean complete;
try {
// 超时时间
long nanos = unit.toNanos(timeout);
// 当前时间
long time = System.nanoTime();
// 调用每个 Future 的 get 方法等待完成
for (Future<?> f : futures) {
// 如果总的等待时间耗尽,返回 false。
if (nanos < 0) return false;
f.get(nanos, TimeUnit.NANOSECONDS);
// 从总的等待时间里面减掉这次 future 等待的时间
long now = System.nanoTime();
nanos -= now - time;
time = now;
}
// 所有 Future 都在超时前完成,返回true
complete = true;
} catch (TimeoutException e) {
// 超时设置 complete 为 false。
complete = false;
} catch (Exception e) {
// 其他异常就直接抛出自定义异常 RedisCommandInterruptedException。
throw new RedisCommandInterruptedException(e);
}
return complete;
}
// 安全关闭 Redis 连接
public synchronized void close() {
// 先检查连接是否已经关闭,以及channel是否存在
if (!closed && channel != null) {
// 获取看门狗并禁用重连。
ConnectionWatchdog watchdog = channel.pipeline().get(ConnectionWatchdog.class);
watchdog.setReconnect(false);
closed = true;
channel.close();
}
}
// 计算 lua 脚本的 SHA1 摘要
public String digest(V script) {
try {
MessageDigest md = MessageDigest.getInstance("SHA1");
md.update(codec.encodeValue(script));
return new String(Base16.encode(md.digest(), false));
} catch (NoSuchAlgorithmException e) {
throw new RedisException("JVM does not support SHA1");
}
}
// 连接建立时,做身份验证和切换正确的数据库。
// 重新发送之前连接问题而没有发送的命令。
@Override
public synchronized void channelActive(ChannelHandlerContext ctx) throws Exception {
channel = ctx.channel();
// 临时List来存储命令。
// 初始容量设置为当前队列大小加2 , 为可能的 AUTH 和切换数据库 命令预留空间。
List<Command<K, V, ?>> tmp = new ArrayList<Command<K, V, ?>>(queue.size() + 2);
// 设置了密码的情况下创建一个AUTH命令并添加到临时 List。
if (password != null) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(password);
tmp.add(new Command<K, V, String>(AUTH, new StatusOutput<K, V>(codec), args, false));
}
// 选择了非默认数据库 , 创建一个 SELECT 命令并添加到临时 List。
if (db != 0) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add(db);
tmp.add(new Command<K, V, String>(SELECT, new StatusOutput<K, V>(codec), args, false));
}
// 将原有队列中的所有命令添加到 tmp ,然后清空原队列。
tmp.addAll(queue);
queue.clear();
// 遍历临时 List 中的所有命令
for (Command<K, V, ?> cmd : tmp) {
// 如果命令未被取消 , 重新添加到队列中
if (!cmd.isCancelled()) {
queue.add(cmd);
channel.writeAndFlush(cmd);
}
}
// 清空临时列表,释放内存。
tmp.clear();
}
@Override
public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 连接已经关闭的话,循环执行所有待执行的命令
if (closed) {
for (Command<K, V, ?> cmd : queue) {
if (cmd.getOutput() != null) {
// 为每个命令设置错误信息并标记为完成。
cmd.getOutput().setError("Connection closed");
}
cmd.complete();
}
queue.clear();
queue = null;
channel = null;
}
}
public <T> Command<K, V, T> dispatch(CommandType type, CommandOutput<K, V, T> output) {
return dispatch(type, output, (CommandArgs<K, V>) null);
}
public <T> Command<K, V, T> dispatch(CommandType type, CommandOutput<K, V, T> output, K key) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key);
return dispatch(type, output, args);
}
public <T> Command<K, V, T> dispatch(CommandType type, CommandOutput<K, V, T> output, K key, V value) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addValue(value);
return dispatch(type, output, args);
}
public <T> Command<K, V, T> dispatch(CommandType type, CommandOutput<K, V, T> output, K key, V[] values) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).addValues(values);
return dispatch(type, output, args);
}
// 分发 Redis 命令
public synchronized <T> Command<K, V, T> dispatch(CommandType type, CommandOutput<K, V, T> output, CommandArgs<K, V> args) {
// 创建Command 对象,最后一个入参表示是否在事务中。
Command<K, V, T> cmd = new Command<K, V, T>(type, output, args, multi != null);
try {
// 如果在事务中的话,将命令添加到事务。
if (multi != null) {
multi.add(cmd);
}
// 将命令放入队列
queue.put(cmd);
// channel 可用,立即写入并刷新命令。
if (channel != null) {
channel.writeAndFlush(cmd);
}
} catch (NullPointerException e) {
throw new RedisException("Connection is closed");
} catch (InterruptedException e) {
throw new RedisCommandInterruptedException(e);
}
return cmd;
}
// 等待命令执行完成然后获取结果
public <T> T await(Command<K, V, T> cmd, long timeout, TimeUnit unit) {
if (!cmd.await(timeout, unit)) {
cmd.cancel(true);
throw new RedisException("Command timed out");
}
CommandOutput<K, V, T> output = cmd.getOutput();
if (output.hasError()) throw new RedisException(output.getError());
return output.get();
}
// 创建脚本输出类型的 CommandOutput 对象
@SuppressWarnings("unchecked")
protected <K, V, T> CommandOutput<K, V, T> newScriptOutput(RedisCodec<K, V> codec, ScriptOutputType type) {
switch (type) {
case BOOLEAN: return (CommandOutput<K, V, T>) new BooleanOutput<K, V>(codec);
case INTEGER: return (CommandOutput<K, V, T>) new IntegerOutput<K, V>(codec);
case STATUS: return (CommandOutput<K, V, T>) new StatusOutput<K, V>(codec);
case MULTI: return (CommandOutput<K, V, T>) new NestedMultiOutput<K, V>(codec);
case VALUE: return (CommandOutput<K, V, T>) new ValueOutput<K, V>(codec);
default: throw new RedisException("Unsupported script output type");
}
}
// 将double值转换为字符串
public String string(double n) {
if (Double.isInfinite(n)) {
// 处理无穷大的情况,返回 "+inf" 或 "-inf"
return (n > 0) ? "+inf" : "-inf";
}
return Double.toString(n);
}
}
1.2 同步Redis连接 RedisConnection
这是个同步的连接,跟 RedisAsyncConnection 套路基本一致,这个类的方法都是直接返回响应值的。 RedisAsyncConnection 的方法返回值都是 Future 。
在实现上内部也还是使用异步连接,它使用的是 await方法来等待异步操作完成,使得调用方式更简单。
1.3 Redis客户端 RedisClient
可扩展的线程安全Redis客户端,支持多个线程共享一个连接,只要避免使用阻塞和事务操作(如BLPOP和MULTI/EXEC)。
package com.lambdaworks.redis;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.net.InetSocketAddress;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.codec.Utf8StringCodec;
import com.lambdaworks.redis.protocol.Command;
import com.lambdaworks.redis.protocol.CommandHandler;
import com.lambdaworks.redis.protocol.ConnectionWatchdog;
import com.lambdaworks.redis.pubsub.PubSubCommandHandler;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
public class RedisClient {
private EventLoopGroup group;
private Bootstrap bootstrap;
private HashedWheelTimer timer; // 用于处理超时和重连的计时器
private ChannelGroup channels; // 管理所有活动连接的通道组
private long timeout; // 默认超时时间及其单位
private TimeUnit unit;
public RedisClient(String host) {
this(host, 6379);
}
public RedisClient(String host, int port) {
InetSocketAddress addr = new InetSocketAddress(host, port);
group = new NioEventLoopGroup();
bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(group).remoteAddress(addr);
setDefaultTimeout(60, TimeUnit.SECONDS);
channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
timer = new HashedWheelTimer();
timer.start();
}
public void setDefaultTimeout(long timeout, TimeUnit unit) {
this.timeout = timeout;
this.unit = unit;
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) unit.toMillis(timeout));
}
// 创建同步 Redis 连接
public RedisConnection<String, String> connect() {
return connect(new Utf8StringCodec());
}
// 创建异步 Redis 连接
public RedisAsyncConnection<String, String> connectAsync() {
return connectAsync(new Utf8StringCodec());
}
// 创建发布/订阅连接
public RedisPubSubConnection<String, String> connectPubSub() {
return connectPubSub(new Utf8StringCodec());
}
// 下面三个方法跟上面一样,只是加了解码器
public <K, V> RedisConnection<K, V> connect(RedisCodec<K, V> codec) {
return new RedisConnection<K, V>(connectAsync(codec));
}
public <K, V> RedisAsyncConnection<K, V> connectAsync(RedisCodec<K, V> codec) {
BlockingQueue<Command<K, V, ?>> queue = new LinkedBlockingQueue<Command<K, V, ?>>();
CommandHandler<K, V> handler = new CommandHandler<K, V>(queue);
RedisAsyncConnection<K, V> connection = new RedisAsyncConnection<K, V>(queue, codec, timeout, unit);
return connect(handler, connection);
}
public <K, V> RedisPubSubConnection<K, V> connectPubSub(RedisCodec<K, V> codec) {
BlockingQueue<Command<K, V, ?>> queue = new LinkedBlockingQueue<Command<K, V, ?>>();
PubSubCommandHandler<K, V> handler = new PubSubCommandHandler<K, V>(queue, codec);
RedisPubSubConnection<K, V> connection = new RedisPubSubConnection<K, V>(queue, codec, timeout, unit);
return connect(handler, connection);
}
// connect 私有方法,上面的方法最终都是调用这个
private <K, V, T extends RedisAsyncConnection<K, V>> T connect(final CommandHandler<K, V> handler, final T connection) {
try {
// 创建看门狗用于监控连接状态,处理断线重连
final ConnectionWatchdog watchdog = new ConnectionWatchdog(bootstrap, channels, timer);
// 开始创建连接
ChannelFuture connect = null;
// TODO use better concurrent workaround
synchronized (bootstrap) {
connect = bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
// pipeline中添加 看门狗、 redis命令响应处理器handler 和 redis连接处理器 connection
// pipeline 顺序很重要
// 看门狗放在最前面,这样它可以第一时间检测到连接问题并进行处理。
// Redis协议处理放中间,连接处理放最后
ch.pipeline().addLast(watchdog, handler, connection);
}
}).connect();
}
// 等待连接完成
connect.sync();
// 看门狗看起自动重连
watchdog.setReconnect(true);
return connection;
} catch (Throwable e) {
throw new RedisException("Unable to connect", e);
}
}
// 关闭操作
public void shutdown() {
// 关闭所有的 channel
for (Channel c : channels) {
ChannelPipeline pipeline = c.pipeline();
// 调用连接管理器去关闭连接
RedisAsyncConnection<?, ?> connection = pipeline.get(RedisAsyncConnection.class);
connection.close();
}
// 等待关闭结束
ChannelGroupFuture future = channels.close();
future.awaitUninterruptibly();
group.shutdownGracefully().syncUninterruptibly();
// 关闭定时器,重连的任务通过定时器触发
timer.stop();
}
}
1.4 看门狗 ConnectionWatchdog
这个类主要就是监控 Netty 的 Channel ,在掉线时进行重新连接。
package com.lambdaworks.redis.protocol;
import com.lambdaworks.redis.RedisAsyncConnection;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
@ChannelHandler.Sharable
public class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask {
private Bootstrap bootstrap;
private Channel channel; // 当前活动的 channel
private ChannelGroup channels; // channel 群组,用于管理多个 channel
private Timer timer; // 定时任务,用于执行重连
private boolean reconnect; // 重连标记
private int attempts; // 重试连接次数
public ConnectionWatchdog(Bootstrap bootstrap, ChannelGroup channels, Timer timer) {
this.bootstrap = bootstrap;
this.channels = channels;
this.timer = timer;
}
public void setReconnect(boolean reconnect) {
this.reconnect = reconnect;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 添加到 ChannelGroup 中
channel = ctx.channel();
channels.add(channel);
// 重置重连次数
attempts = 0;
// 往后传播事件
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 需要重连的话执行重连逻辑
if (reconnect) {
// 最多尝试 8次重连,不会无线重试
if (attempts < 8) attempts++;
// 每次延长超时的时间,比如第一次2毫秒,第二次4毫秒,第三次8毫秒,以此类推,最多512毫秒。
int timeout = 2 << attempts;
timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS);
}
ctx.fireChannelInactive();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 发生异常就关闭 channel
ctx.channel().close();
}
// 执行重连逻辑
@Override
public void run(Timeout timeout) throws Exception {
// 获取老的 channel pipeline
ChannelPipeline old = channel.pipeline();
// 老的 pipeline 里面获取 CommandHandler 以及 RedisAsyncConnection 处理器
final CommandHandler<?, ?> handler = old.get(CommandHandler.class);
final RedisAsyncConnection<?, ?> connection = old.get(RedisAsyncConnection.class);
// 准备重新发起连接
ChannelFuture connect = null;
// 同步块保证 bootstrap 的线程安全
// TODO use better concurrent workaround
synchronized (bootstrap) {
// 把之前 pipeline 中的处理器加到新的 bootstrap 里面去
// 这样可以保持新旧连接行为一致
connect = bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(this, handler, connection);
}
}).connect();
}
// 阻塞等待当前线程,等到连接建立或者建立失败
connect.sync();
}
}
2. 发布订阅机制
主要是涉及到 Redis 发布订阅相关处理。
2.1 发布订阅命令Netty处理器 PubSubCommandHandler
用来处理 Redis 发布(Pub)/订阅(Sub) 命令的 Netty Channel 。
支持同时处理常规 Redis 命令和 Pub/Sub 消息。
package com.lambdaworks.redis.pubsub;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.*;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import java.util.concurrent.BlockingQueue;
public class PubSubCommandHandler<K, V> extends CommandHandler<K, V> {
private RedisCodec<K, V> codec; // Redis 数据编解码
private PubSubOutput<K, V> output; // 存储 Pub/Sub 操作的输出
public PubSubCommandHandler(BlockingQueue<Command<K, V, ?>> queue, RedisCodec<K, V> codec) {
super(queue);
this.codec = codec;
this.output = new PubSubOutput<K, V>(codec);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
// 处理队列中的常规命令
// 检查当前是否没有正在处理的 Pub/Sub 消息
while (output.type() == null && !queue.isEmpty()) {
// 尝试解码接收到的数据。
CommandOutput<K, V, ?> output = queue.peek().getOutput();
// 返回 false说明解码失败,方法直接返回,等待更多数据来处理。
if (!rsm.decode(buffer, output)) return;
// 命令从队列中移除,调用回调方法。
queue.take().complete();
// PubSubOutput 类型触发 ChannelRead 事件,将结果传递给下一个处理器
if (output instanceof PubSubOutput) ctx.fireChannelRead(output);
}
// 处理 Pub/Sub 消息
while (rsm.decode(buffer, output)) {
// 触发 ChannelRead 事件,将解码后的消息传递给下一个处理器
ctx.fireChannelRead(output);
// 创建一个新的 PubSubOutput 对象再继续处理下一条消息
output = new PubSubOutput<K, V>(codec);
}
}
}
2.2 发布订阅监听器 RedisPubSubListener
Redis 发布/订阅事件的回调处理接口。
package com.lambdaworks.redis.pubsub;
public interface RedisPubSubListener<K, V> {
// 处理普通的发布/订阅消息
void message(K channel, V message);
// 处理模式匹配的发布/订阅消息
void message(K pattern, K channel, V message);
// 处理成功订阅频道的事件
void subscribed(K channel, long count);
// 处理成功订阅模式的事件
void psubscribed(K pattern, long count);
// 处理取消订阅频道的事件
void unsubscribed(K channel, long count);
// 处理取消订阅模式的事件
void punsubscribed(K pattern, long count);
}
### RedisPubSubAdapter
实现 RedisPubSubListener 接口的适配器,给接口所有方法的空实现。
```java
package com.lambdaworks.redis.pubsub;
public class RedisPubSubAdapter<K, V> implements RedisPubSubListener<K, V> {
@Override
public void message(K channel, V message) {
}
@Override
public void message(K pattern, K channel, V message) {
}
@Override
public void subscribed(K channel, long count) {
}
@Override
public void psubscribed(K pattern, long count) {
}
@Override
public void unsubscribed(K channel, long count) {
}
@Override
public void punsubscribed(K pattern, long count) {
}
}
2.3 发布订阅连接管理 RedisPubSubConnection
用于管理与 Redis 服务器的发布/订阅(Pub/Sub)连接。
package com.lambdaworks.redis.pubsub;
import com.lambdaworks.redis.RedisAsyncConnection;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.Command;
import com.lambdaworks.redis.protocol.CommandArgs;
import io.netty.channel.ChannelHandlerContext;
import java.lang.reflect.Array;
import java.util.*;
import java.util.concurrent.*;
import static com.lambdaworks.redis.protocol.CommandType.*;
public class RedisPubSubConnection<K, V> extends RedisAsyncConnection<K, V> {
private List<RedisPubSubListener<K, V>> listeners; // 保存所有注册的监听器
private Set<K> channels; // 当前订阅的频道
private Set<K> patterns; // 当前订阅的模式
public RedisPubSubConnection(BlockingQueue<Command<K, V, ?>> queue, RedisCodec<K, V> codec, long timeout, TimeUnit unit) {
super(queue, codec, timeout, unit);
listeners = new CopyOnWriteArrayList<RedisPubSubListener<K, V>>();
channels = new HashSet<K>();
patterns = new HashSet<K>();
}
public void addListener(RedisPubSubListener<K, V> listener) {
listeners.add(listener);
}
public void removeListener(RedisPubSubListener<K, V> listener) {
listeners.remove(listener);
}
public void psubscribe(K... patterns) {
dispatch(PSUBSCRIBE, new PubSubOutput<K, V>(codec), args(patterns));
}
public void punsubscribe(K... patterns) {
dispatch(PUNSUBSCRIBE, new PubSubOutput<K, V>(codec), args(patterns));
}
public void subscribe(K... channels) {
dispatch(SUBSCRIBE, new PubSubOutput<K, V>(codec), args(channels));
}
public void unsubscribe(K... channels) {
dispatch(UNSUBSCRIBE, new PubSubOutput<K, V>(codec), args(channels));
}
@Override
public synchronized void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
// 下面的逻辑主要是在重连后的逻辑
// 重新订阅之前的频道
if (channels.size() > 0) {
subscribe(toArray(channels));
channels.clear();
}
// 重新订阅之前的模式
if (patterns.size() > 0) {
psubscribe(toArray(patterns));
patterns.clear();
}
}
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
PubSubOutput<K, V> output = (PubSubOutput<K, V>) msg;
for (RedisPubSubListener<K, V> listener : listeners) {
switch (output.type()) {
// 普通的发布/订阅消息
case message:
listener.message(output.channel(), output.get());
break;
// 模式匹配的消息
case pmessage:
listener.message(output.pattern(), output.channel(), output.get());
break;
// 模式订阅确认
case psubscribe:
patterns.add(output.pattern());
listener.psubscribed(output.pattern(), output.count());
break;
// 取消模式订阅确认
case punsubscribe:
patterns.remove(output.pattern());
listener.punsubscribed(output.pattern(), output.count());
break;
// 频道订阅确认
case subscribe:
channels.add(output.channel());
listener.subscribed(output.channel(), output.count());
break;
// 取消频道订阅确认
case unsubscribe:
channels.remove(output.channel());
listener.unsubscribed(output.channel(), output.count());
break;
}
}
}
private CommandArgs<K, V> args(K... keys) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.addKeys(keys);
return args;
}
@SuppressWarnings("unchecked")
private <T> T[] toArray(Collection<T> c) {
Class<T> cls = (Class<T>) c.iterator().next().getClass();
T[] array = (T[]) Array.newInstance(cls, c.size());
return c.toArray(array);
}
}
2.4 发布订阅输出 PubSubOutput
用于处理 Redis 发布/订阅(Pub/Sub)操作输出。
补充下发布订阅类型的知识点,发布订阅有下面几种操作类型:
- message: 普通的发布消息。
- pmessage: 模式匹配的发布消息。
- psubscribe: 订阅某个模式。
- punsubscribe: 取消订阅某个模式。
- subscribe: 订阅特定频道。
- unsubscribe: 取消订阅特定频道。
package com.lambdaworks.redis.pubsub;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
import java.nio.ByteBuffer;
public class PubSubOutput<K, V> extends CommandOutput<K, V, V> {
enum Type { message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe }
private Type type; // 当前处理的Pub/Sub操作类型
private K channel;
private K pattern; // 订阅模式,用于模式订阅
private long count; // 订阅/取消订阅操作后的活跃的订阅个数
public PubSubOutput(RedisCodec<K, V> codec) {
super(codec, null);
}
public Type type() {
return type;
}
public K channel() {
return channel;
}
public K pattern() {
return pattern;
}
public long count() {
return count;
}
@Override
@SuppressWarnings("fallthrough")
public void set(ByteBuffer bytes) {
// 将接收到的字节解码为ASCII并设置为 type。
if (type == null) {
// 直接通过Type枚举的 valueOf() 方法通过名称去匹配枚举
type = Type.valueOf(decodeAscii(bytes));
return;
}
switch (type) {
// 模式匹配的发布消息
case pmessage:
// pattern为null就会设置pattern并退出。
if (pattern == null) {
pattern = codec.decodeKey(bytes);
break;
}
// 不为null,继续执行message的逻辑。 这里没有 break
// 普通的发布消息
case message:
// channel为null,设置 channel 并退出
if (channel == null) {
channel = codec.decodeKey(bytes);
break;
}
// 将bytes解码为值并设置为 output
output = codec.decodeValue(bytes);
break;
// 订阅某个模式
// 取消订阅某个模式
case psubscribe:
case punsubscribe:
// 将bytes解码为key并设置为 pattern
pattern = codec.decodeKey(bytes);
break;
// 订阅特定频道
// 取消订阅特定频道
case subscribe:
case unsubscribe:
// 将bytes解码为key并设置为channel
channel = codec.decodeKey(bytes);
break;
}
}
@Override
public void set(long integer) {
// 订阅/取消订阅操作后更新活跃订阅数
count = integer;
}
}
通过几个例子来理解这个类,主要还是要对 Redis 的发布订阅机制有一定的了解。
2.4.1 案例一 频道订阅例子
客户端订阅了一个名为 “my-channel” 的频道。 Redis 会发送下面这条消息给客户端 (为了可视化做了下拆分):
1) "subscribe"
2) "my-channel"
3) (integer) 1
看下 PubSubOutput 的处理流程:
final PubSubOutput<String, String> output = new PubSubOutput<>(stringCodec);
// 第一次调用 set
output.set(ByteBuffer.wrap("subscribe".getBytes()));
// output.type() == Type.subscribe
// 第二次调用 set
output.set(ByteBuffer.wrap("my-channel".getBytes()));
// output.channel() == "my-channel"
// 第三次调用 set
output.set(1);
// output.count() == 1
System.out.println("Type: " + output.type()); // subscribe
System.out.println("Channel: " + output.channel()); // my-channel
System.out.println("Count: " + output.count()); // 1
2.4.2 案例二 频道接收消息例子
客户端收到了发布到 “my-channel” 的消息 “new channel message”。Redis 会发送下面这条消息给客户端 (为了可视化做了下拆分):
1) "message"
2) "my-channel"
3) "new channel message"
看下 PubSubOutput 的处理流程:
final PubSubOutput<String, String> output = new PubSubOutput<>(stringCodec);
// 第一次调用 set
output.set(ByteBuffer.wrap("message".getBytes()));
// output.type() == Type.message
// 第二次调用 set
output.set(ByteBuffer.wrap("my-channel".getBytes()));
// output.channel() == "my-channel"
// 第三次调用 set
output.set(ByteBuffer.wrap("new channel message".getBytes()));
// output 中的 output 字段被设置为 "new channel message"
System.out.println("Type: " + output.type()); // message
System.out.println("Channel: " + output.channel()); // my-channel
System.out.println("Message: " + output.get()); // new channel message
2.4.3 案例三 模式订阅例子
客户端使用模式 “channel.*” 进行订阅。 Redis 会发送下面这条消息给客户端 (为了可视化做了下拆分):
1) "psubscribe"
2) "channel.*"
3) (integer) 1
看下 PubSubOutput 的处理流程:
final PubSubOutput<String, String> output = new PubSubOutput<>(stringCodec);
// 第一次调用 set
output.set(ByteBuffer.wrap("psubscribe".getBytes()));
// output.type() == Type.psubscribe
// 第二次调用 set
output.set(ByteBuffer.wrap("channel.*".getBytes()));
// output.pattern() == "channel.*"
// 第三次调用 set
output.set(1);
// output.count() == 1
System.out.println("Type: " + output.type()); // psubscribe
System.out.println("Pattern: " + output.pattern()); // channel.*
System.out.println("Count: " + output.count()); // 1
2.4.4 案例四 接收模式匹配的消息例子
客户端使用模式 “channel.*” 订阅了消息。 然后有消息被发布到 “channel.channel123” 频道。 Redis 会发送下面这条消息给客户端 (为了可视化做了下拆分):
1) "pmessage"
2) "channel.*"
3) "channel.channel123"
4) "channel message content"
可以看到比普通的 message 多了一个具体的子channel 名称。
看下 PubSubOutput 的处理流程:
PubSubOutput<String, String> output = new PubSubOutput<>(stringCodec);
// 第一次调用 set
output.set(ByteBuffer.wrap("pmessage".getBytes()));
// 此时 output.type() == Type.pmessage
// 第二次调用 set
output.set(ByteBuffer.wrap("channel.*".getBytes()));
// 此时 output.pattern() == "channel.*"
// 第三次调用 set
output.set(ByteBuffer.wrap("channel.channel123".getBytes()));
// 此时 output.channel() == "channel.channel123"
// 第四次调用 set
output.set(ByteBuffer.wrap("channel message content".getBytes()));
// output 字段被设置为 "channel message content"
System.out.println("Type: " + output.type()); // pmessage
System.out.println("Pattern: " + output.pattern()); // channel.*
System.out.println("Channel: " + output.channel()); // channel.channel123
System.out.println("Message: " + output.get()); // channel message content