Redisson2.0源码分析4-基础连接管理
Redisson2.0源码分析4-基础连接管理
基础连接管理主要是跟 Redis 服务端的节点直接通信的连接维护,这一块的代码是不包含业务逻辑的,只是单纯的提供连接封装提供给上层使用。有点类似于之前 Lettuce 的代码, Redisson 之前是对这块的连接管理进行封装一层,便于 Redisson使用。
Redisson 还有更上层的连接管理,包括主从连接维护、集群连接维护、哨兵连接维护等。这些连接管理则是调用更加底层的基础连接管理来实现的。
1. 底层Redis连接建立 RedisClient
这个类就是一个标准的 Netty 连接配置类,主要是管理与 Redis 服务器的连接。
主要作用:
负责跟指定的 Redis 服务建立连接。支持同步跟异步的连接方式。
关闭连接。
建立连接后的 Netty Channel 以及 Bootstrap 对象将会通过 RedisConnection 对象进行保证后再返回。
发送数据给 Redis 服务器时,Channel pipeline 的执行顺序顺序是:看门狗 > 编码处理器 > 集合编码处理器 > 命令管理队列处理器 > 命令解码。
从 Redis 服务器接受到数据时,Channel pipeline 的执行顺序顺序是: 命令解码 > 命令管理队列处理器 > 集合编码处理器 > 编码处理器 > 看门狗。
这个顺序也是通信应用中比较常见的顺序。
- 看门狗作为第一个处理器,这样可以最早捕获到连接断开等事件,然后进行重连等。
- 两个编码器负责将 Java 对象转成字节数据。
- 命令管理队列负责将命令按照正确的顺序发送出去。
- 命令解码处理器负责将 Redis 服务器返回的字节流解码成 Java 对象。
package org.redisson.client;
import java.net.InetSocketAddress;
import org.redisson.client.handler.CommandDecoder;
import org.redisson.client.handler.CommandEncoder;
import org.redisson.client.handler.CommandsListEncoder;
import org.redisson.client.handler.CommandsQueue;
import org.redisson.client.handler.ConnectionWatchdog;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
public class RedisClient {
private final Bootstrap bootstrap;
// Redis服务器的地址
private final InetSocketAddress addr;
// channel 组
private final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private final long timeout;
public RedisClient(String host, int port) {
this(new NioEventLoopGroup(), NioSocketChannel.class, host, port, 60*1000);
}
public RedisClient(EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int port, int timeout) {
addr = new InetSocketAddress(host, port);
bootstrap = new Bootstrap().channel(socketChannelClass).group(group).remoteAddress(addr);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
// 注册 4 个处理器
// 看门狗 > 编码处理器 > 集合编码处理器 > 命令队列 > 命令解码
ch.pipeline().addFirst(new ConnectionWatchdog(bootstrap, channels),
new CommandEncoder(),
new CommandsListEncoder(),
new CommandsQueue(),
new CommandDecoder());
}
});
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
this.timeout = timeout;
}
public InetSocketAddress getAddr() {
return addr;
}
long getTimeout() {
return timeout;
}
public Bootstrap getBootstrap() {
return bootstrap;
}
// 同步连接
public RedisConnection connect() {
try {
ChannelFuture future = bootstrap.connect();
future.syncUninterruptibly();
return new RedisConnection(this, future.channel());
} catch (Exception e) {
throw new RedisConnectionException("unable to connect", e);
}
}
// 异步连接
public Future<RedisConnection> connectAsync() {
final Promise<RedisConnection> f = bootstrap.group().next().newPromise();
ChannelFuture channelFuture = bootstrap.connect();
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
RedisConnection c = new RedisConnection(RedisClient.this, future.channel());
f.setSuccess(c);
} else {
f.setFailure(future.cause());
}
}
});
// 返回一个Future对象,允许调用方在连接完成后执行后续操作
return f;
}
public RedisPubSubConnection connectPubSub() {
try {
ChannelFuture future = bootstrap.connect();
future.syncUninterruptibly();
return new RedisPubSubConnection(this, future.channel());
} catch (Exception e) {
throw new RedisConnectionException("unable to connect", e);
}
}
public void shutdown() {
shutdownAsync().syncUninterruptibly();
}
public ChannelGroupFuture shutdownAsync() {
return channels.close();
}
@Override
public String toString() {
return "RedisClient [addr=" + addr + "]";
}
}
2. Redis连接封装 RedisConnection
这个类是对 RedisClient 的封装,提供了一些业务上需要调用的方法:
- 同步执行 Redis 命令。
- 异步执行 Redis 命令。
- 向 Redis 发送命令数据,支持发送单个以及批量数据。
- 关闭 Redis 连接。
- 更新当前连接绑定的 Channel,用于重连时的更新。
package org.redisson.client;
import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
public class RedisConnection implements RedisCommands {
public static final AttributeKey<RedisConnection> CONNECTION = AttributeKey.valueOf("connection");
final RedisClient redisClient; // RedisClient 引用
private volatile boolean closed; // 连接是否关闭标志
volatile Channel channel; // 实际跟 Redis 服务器连接 channel
public RedisConnection(RedisClient redisClient, Channel channel) {
super();
this.redisClient = redisClient;
this.channel = channel;
// 将当前连接实例存储在 Channel 的属性中,后续可以使用
channel.attr(CONNECTION).set(this);
}
// 用于重新连接或切换连接更新 chanel 使用
public void updateChannel(Channel channel) {
this.channel = channel;
}
public RedisClient getRedisClient() {
return redisClient;
}
// 等待一个异步操作的完成
public <R> R await(Future<R> cmd) {
// 等待指定超时时间
if (!cmd.awaitUninterruptibly(redisClient.getTimeout(), TimeUnit.MILLISECONDS)) {
// 操作没有成功完成,抛出超时异常
Promise<R> promise = (Promise<R>)cmd;
RedisTimeoutException ex = new RedisTimeoutException();
promise.setFailure(ex);
throw ex;
}
if (!cmd.isSuccess()) {
// 命令执行失败抛出异常
if (cmd.cause() instanceof RedisException) {
throw (RedisException) cmd.cause();
}
throw new RedisException("Unexpected exception while processing command", cmd.cause());
}
// 返回操作结果
return cmd.getNow();
}
public <T> T sync(RedisStrictCommand<T> command, Object ... params) {
// 调用 async 方法发送命令,使用 await 方法等待结果
Future<T> r = async(null, command, params);
return await(r);
}
// 向 Redis 服务器发送单个命令数据
public <T, R> ChannelFuture send(CommandData<T, R> data) {
return channel.writeAndFlush(data);
}
// 向 Redis 服务器发送批量命令数据
public ChannelFuture send(CommandsData data) {
return channel.writeAndFlush(data);
}
// 同步执行 Redis 命令
public <T, R> R sync(Codec encoder, RedisCommand<T> command, Object ... params) {
Future<R> r = async(encoder, command, params);
return await(r);
}
// 异步执行 Redis 命令
public <T, R> Future<R> async(RedisCommand<T> command, Object ... params) {
return async(null, command, params);
}
public <T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params) {
Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise();
send(new CommandData<T, R>(promise, encoder, command, params));
return promise;
}
public <T, R> CommandData<T, R> create(Codec encoder, RedisCommand<T> command, Object ... params) {
Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise();
return new CommandData<T, R>(promise, encoder, command, params);
}
public void setClosed(boolean reconnect) {
this.closed = reconnect;
}
public boolean isClosed() {
return closed;
}
public ChannelFuture closeAsync() {
setClosed(true);
return channel.close();
}
@Override
public String toString() {
return getClass().getSimpleName() + " [redisClient=" + redisClient + ", channel=" + channel + "]";
}
}
3. 发布订阅连接 RedisPubSubConnection
这个类继承自RedisConnection,扩展了基础连接功能。
这样可以支持Redis的发布/订阅模式,没什么很特殊的地方。 主要添加了几个跟发布订阅相关的方法,比如添加监听器、添加订阅、取消订阅等。
package org.redisson.client;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.pubsub.PubSubMessage;
import org.redisson.client.protocol.pubsub.PubSubMessageDecoder;
import org.redisson.client.protocol.pubsub.PubSubPatternMessage;
import org.redisson.client.protocol.pubsub.PubSubPatternMessageDecoder;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import io.netty.channel.Channel;
public class RedisPubSubConnection extends RedisConnection {
// 保存所有的Pub/Sub监听器
final Queue<RedisPubSubListener<Object>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<Object>>();
public RedisPubSubConnection(RedisClient redisClient, Channel channel) {
super(redisClient, channel);
}
// 添加监听器,收到消息的时候,监听器的方法会收到回调
public void addListener(RedisPubSubListener listener) {
listeners.add(listener);
}
// 添加一个一次性监听器,该监听器在接收到一条消息后被被移除,不再在接收后续的消息
public void addOneShotListener(RedisPubSubListener listener) {
listeners.add(new OneShotPubSubListener<Object>(this, listener));
}
// 移除监听器
public void removeListener(RedisPubSubListener<?> listener) {
listeners.remove(listener);
}
// 收到普通的发布订阅状态消息时,通知所有监听器。 这里包括订阅确认这种状态消息。
public void onMessage(PubSubStatusMessage message) {
for (RedisPubSubListener<Object> redisPubSubListener : listeners) {
redisPubSubListener.onStatus(message.getType(), message.getChannel());
}
}
// 收到普通的发布订阅消息的时候,通知所有监听器
public void onMessage(PubSubMessage message) {
for (RedisPubSubListener<Object> redisPubSubListener : listeners) {
redisPubSubListener.onMessage(message.getChannel(), message.getValue());
}
}
// 收到普通的模式匹配的发布订阅消息的时候,通知所有监听器
public void onMessage(PubSubPatternMessage message) {
for (RedisPubSubListener<Object> redisPubSubListener : listeners) {
redisPubSubListener.onPatternMessage(message.getPattern(), message.getChannel(), message.getValue());
}
}
// 订阅指定的 channel
public void subscribe(Codec codec, String ... channel) {
// 异步发送一个 SUBSCRIBE 命令到 Redis 服务器
async(new PubSubMessageDecoder(codec.getValueDecoder()), RedisCommands.SUBSCRIBE, channel);
}
// 订阅符合特定模式的 channel
public void psubscribe(Codec codec, String ... channel) {
// 异步发送一个 PSUBSCRIBE 命令到 Redis 服务器
async(new PubSubPatternMessageDecoder(codec.getValueDecoder()), RedisCommands.PSUBSCRIBE, channel);
}
// 取消订阅指定的 channel
public void unsubscribe(String ... channel) {
// 异步发送一个 UNSUBSCRIBE 命令到 Redis 服务器
async((MultiDecoder)null, RedisCommands.UNSUBSCRIBE, channel);
}
// 取消订阅符合特定模式的 channel
public void punsubscribe(String ... channel) {
// 异步发送一个 PUNSUBSCRIBE 命令到 Redis 服务器
async((MultiDecoder)null, RedisCommands.PUNSUBSCRIBE, channel);
}
private <T, R> void async(MultiDecoder<Object> messageDecoder, RedisCommand<T> command, Object ... params) {
// 立刻通过 netty 的 channel 把消息发送出去
channel.writeAndFlush(new CommandData<T, R>(null, messageDecoder, null, command, params));
}
}
4. 连接看门狗 ConnectionWatchdog
连接看门狗,主要是监控连接状态并在连接断开时进行重连。 跟 Redisson V1 Lettuce 中的看门狗代码几乎一模一样。
核心的知识点就是重连,重连逻辑是采用递归调用来实现的。重连失败后,也不会立即就重连。 重连间隔时间会根据 attempts 值按指数增加。
每次失败的重连尝试后,等待的时间是 2 << attempts 毫秒。 这是通过位移来实现的。具体的等待时间如下:
- 第一次重连失败后,等待 2^2 = 4 毫秒。
- 第二次重连失败后,等待 2^3 = 8 毫秒。
- 第三次重连失败后,等待 2^4 = 16 毫秒。
- 以此类推。
但是为了防止后面等待的时间越来越长,重试的最大等待时间将会是 2^13 = 8192 毫秒。 这个跟变量 BACKOFF_CAP 有关系,也就是重试了12次以上的话,间隔时间将不会继续变大。
这样可以减少对 Redis 服务器的压力,同时还可以保证中断后可以正常重连恢复。
package org.redisson.client.handler;
import java.util.concurrent.TimeUnit;
import org.redisson.client.RedisConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.util.concurrent.GenericFutureListener;
public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
private final Logger log = LoggerFactory.getLogger(getClass());
private final Bootstrap bootstrap; // 重新建立连接需要的 bootstrap 对象
// 管理所有活跃的 channel 。 ChannelGroup 是 Netty 中的工具,可以对一组 Channel 进行批量操作
private final ChannelGroup channels;
private static final int BACKOFF_CAP = 12; //最大重试间隔指数
public ConnectionWatchdog(Bootstrap bootstrap, ChannelGroup channels) {
this.bootstrap = bootstrap;
this.channels = channels;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 连接活跃状态时,添加到 ChannelGroup 中
channels.add(ctx.channel());
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
RedisConnection connection = ctx.channel().attr(RedisConnection.CONNECTION).get();
// 连接未关闭,尝试重新连接
if (!connection.isClosed()) {
EventLoopGroup group = ctx.channel().eventLoop().parent();
// 尝试进行重连
reconnect(group, connection);
}
ctx.fireChannelInactive();
}
private void reconnect(final EventLoopGroup group, final RedisConnection connection){
// 往 netty EveentLoppGroup 添加延迟执行任务, 会延时 100 毫秒
group.schedule(new Runnable() {
@Override
public void run() {
doReConnect(group, connection, 1);
}
}, 100, TimeUnit.MILLISECONDS);
}
private void doReConnect(final EventLoopGroup group, final RedisConnection connection, final int attempts) {
// 连接已关闭,直接返回
if (connection.isClosed()) {
return;
}
log.debug("reconnecting {} to {} ", connection, connection.getRedisClient().getAddr(), connection);
// 尝试重新连接
bootstrap.connect().addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 连接已关闭,直接返回
if (connection.isClosed()) {
return;
}
// 连接成功,更新连接的 channel
if (future.isSuccess()) {
log.debug("{} connected to {}", connection, connection.getRedisClient().getAddr());
connection.updateChannel(future.channel());
return;
}
// 连接失败,计划下一次重试任务
// 每次重试都会增加等待时间。 比如 2毫秒、4毫秒、8毫秒。。。。
int timeout = 2 << attempts;
group.schedule(new Runnable() {
@Override
public void run() {
doReConnect(group, connection, Math.min(BACKOFF_CAP, attempts + 1));
}
}, timeout, TimeUnit.MILLISECONDS);
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().close();
}
}
5. 连接管理业务接口 ConnectionManager
这个接口定义了一组用于管理 Redis 连接的方法,提供了创建、管理和关闭连接的功能。不过接口定义的也是很奇怪,各种方法混在一起。
这个接口包含: - 客户端管理 - 集群下的 slot 位置计算 - 编解码配置 - 连接管理 - 客户端连接创建 - 发布订阅 - 事件管理
package org.redisson.connection;
import java.util.Collection;
import java.util.NavigableMap;
import java.util.concurrent.TimeUnit;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.misc.InfinitySemaphoreLatch;
import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
public interface ConnectionManager {
// 获取所有 Redis 客户端列表
Collection<RedisClientEntry> getClients();
// 异步关闭指定的 Redis 客户端
void shutdownAsync(RedisClient client);
// 计算给定 key 的槽位。 这个应该是集群环境中的。
int calcSlot(String key);
// 获取时间轮实例
HashedWheelTimer getTimer();
// 获取主从服务器配置
MasterSlaveServersConfig getConfig();
// 获取编解码器
Codec getCodec();
// 获取所有主从映射
NavigableMap<Integer, MasterSlaveEntry> getEntries();
// 创建新的 Promise
<R> Promise<R> newPromise();
// 释放读操作连接
void releaseRead(int slot, RedisConnection connection);
// 释放写操作连接
void releaseWrite(int slot, RedisConnection connection);
// 拿到读操作连接
RedisConnection connectionReadOp(int slot);
// 拿到写操作连接
RedisConnection connectionWriteOp(int slot);
// 创建释放读操作连接的监听器
<T> FutureListener<T> createReleaseReadListener(int slot,
RedisConnection conn, Timeout timeout);
// 创建释放写操作连接的监听器
<T> FutureListener<T> createReleaseWriteListener(int slot,
RedisConnection conn, Timeout timeout);
// 创建新的Redis客户端,指定IP、端口和超时时间
RedisClient createClient(String host, int port, int timeout);
// 创建新的Redis客户端,指定 IP 和端口
RedisClient createClient(String host, int port);
// 获取指定 channel 的Pub/Sub连接
PubSubConnectionEntry getEntry(String channelName);
// 订阅指定 channel
PubSubConnectionEntry subscribe(String channelName);
// 模式 channel 订阅
PubSubConnectionEntry psubscribe(String pattern);
// 订阅指定 channel , 添加监听器
<V> void subscribe(RedisPubSubListener<V> listener, String channelName);
// 取消订阅指定 channel
void unsubscribe(String channelName);
// 取消模式 channel订阅
void punsubscribe(String channelName);
// 关闭连接管理器
void shutdown();
// 获取 netty 的 EventLoopGroup
EventLoopGroup getGroup();
// 创建新的定时任务
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
// 拿到信号量锁
InfinitySemaphoreLatch getShutdownLatch();
}