七的博客

Redisson2.0源码分析4-基础连接管理

源码分析

Redisson2.0源码分析4-基础连接管理

基础连接管理主要是跟 Redis 服务端的节点直接通信的连接维护,这一块的代码是不包含业务逻辑的,只是单纯的提供连接封装提供给上层使用。有点类似于之前 Lettuce 的代码, Redisson 之前是对这块的连接管理进行封装一层,便于 Redisson使用。

Redisson 还有更上层的连接管理,包括主从连接维护、集群连接维护、哨兵连接维护等。这些连接管理则是调用更加底层的基础连接管理来实现的。

1. 底层Redis连接建立 RedisClient

这个类就是一个标准的 Netty 连接配置类,主要是管理与 Redis 服务器的连接。

主要作用:

  • 负责跟指定的 Redis 服务建立连接。支持同步跟异步的连接方式。

  • 关闭连接。

建立连接后的 Netty Channel 以及 Bootstrap 对象将会通过 RedisConnection 对象进行保证后再返回。

发送数据给 Redis 服务器时,Channel pipeline 的执行顺序顺序是:看门狗 > 编码处理器 > 集合编码处理器 > 命令管理队列处理器 > 命令解码。

从 Redis 服务器接受到数据时,Channel pipeline 的执行顺序顺序是: 命令解码 > 命令管理队列处理器 > 集合编码处理器 > 编码处理器 > 看门狗。

这个顺序也是通信应用中比较常见的顺序。

  • 看门狗作为第一个处理器,这样可以最早捕获到连接断开等事件,然后进行重连等。
  • 两个编码器负责将 Java 对象转成字节数据。
  • 命令管理队列负责将命令按照正确的顺序发送出去。
  • 命令解码处理器负责将 Redis 服务器返回的字节流解码成 Java 对象。
package org.redisson.client;

import java.net.InetSocketAddress;

import org.redisson.client.handler.CommandDecoder;
import org.redisson.client.handler.CommandEncoder;
import org.redisson.client.handler.CommandsListEncoder;
import org.redisson.client.handler.CommandsQueue;
import org.redisson.client.handler.ConnectionWatchdog;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;

public class RedisClient {

    private final Bootstrap bootstrap;
    // Redis服务器的地址
    private final InetSocketAddress addr; 
    
    // channel 组
    private final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); 

    private final long timeout;

    public RedisClient(String host, int port) {
        this(new NioEventLoopGroup(), NioSocketChannel.class, host, port, 60*1000);
    }

    public RedisClient(EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int port, int timeout) {
        addr = new InetSocketAddress(host, port);
        bootstrap = new Bootstrap().channel(socketChannelClass).group(group).remoteAddress(addr);
        bootstrap.handler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) throws Exception {
                // 注册 4 个处理器
                // 看门狗 > 编码处理器 > 集合编码处理器 > 命令队列 > 命令解码
                ch.pipeline().addFirst(new ConnectionWatchdog(bootstrap, channels),
                                        new CommandEncoder(),
                                        new CommandsListEncoder(),
                                        new CommandsQueue(),
                                        new CommandDecoder());
            }
        });

        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
        this.timeout = timeout;
    }

    public InetSocketAddress getAddr() {
        return addr;
    }

    long getTimeout() {
        return timeout;
    }

    public Bootstrap getBootstrap() {
        return bootstrap;
    }

    // 同步连接
    public RedisConnection connect() {
        try {
            ChannelFuture future = bootstrap.connect();
            future.syncUninterruptibly();
            return new RedisConnection(this, future.channel());
        } catch (Exception e) {
            throw new RedisConnectionException("unable to connect", e);
        }
    }

    //  异步连接
    public Future<RedisConnection> connectAsync() {
        final Promise<RedisConnection> f = bootstrap.group().next().newPromise();
        ChannelFuture channelFuture = bootstrap.connect();
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    RedisConnection c = new RedisConnection(RedisClient.this, future.channel());
                    f.setSuccess(c);
                } else {
                    f.setFailure(future.cause());
                }
            }
        });
        // 返回一个Future对象,允许调用方在连接完成后执行后续操作
        return f;
    }

    public RedisPubSubConnection connectPubSub() {
        try {
            ChannelFuture future = bootstrap.connect();
            future.syncUninterruptibly();
            return new RedisPubSubConnection(this, future.channel());
        } catch (Exception e) {
            throw new RedisConnectionException("unable to connect", e);
        }
    }

    public void shutdown() {
        shutdownAsync().syncUninterruptibly();
    }

    public ChannelGroupFuture shutdownAsync() {
        return channels.close();
    }

    @Override
    public String toString() {
        return "RedisClient [addr=" + addr + "]";
    }

}


2. Redis连接封装 RedisConnection

这个类是对 RedisClient 的封装,提供了一些业务上需要调用的方法:

  • 同步执行 Redis 命令。
  • 异步执行 Redis 命令。
  • 向 Redis 发送命令数据,支持发送单个以及批量数据。
  • 关闭 Redis 连接。
  • 更新当前连接绑定的 Channel,用于重连时的更新。
package org.redisson.client;

import java.util.concurrent.TimeUnit;

import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;

public class RedisConnection implements RedisCommands {

    public static final AttributeKey<RedisConnection> CONNECTION = AttributeKey.valueOf("connection");

    final RedisClient redisClient;  // RedisClient 引用

    private volatile boolean closed;  // 连接是否关闭标志
    volatile Channel channel;  // 实际跟 Redis 服务器连接 channel 

    public RedisConnection(RedisClient redisClient, Channel channel) {
        super();
        this.redisClient = redisClient;
        this.channel = channel;

        // 将当前连接实例存储在 Channel 的属性中,后续可以使用
        channel.attr(CONNECTION).set(this);
    }

    // 用于重新连接或切换连接更新 chanel 使用
    public void updateChannel(Channel channel) {
        this.channel = channel;
    }

    public RedisClient getRedisClient() {
        return redisClient;
    }

    // 等待一个异步操作的完成
    public <R> R await(Future<R> cmd) {
        // 等待指定超时时间
        if (!cmd.awaitUninterruptibly(redisClient.getTimeout(), TimeUnit.MILLISECONDS)) {
            // 操作没有成功完成,抛出超时异常
            Promise<R> promise = (Promise<R>)cmd;
            RedisTimeoutException ex = new RedisTimeoutException();
            promise.setFailure(ex);
            throw ex;
        }
        if (!cmd.isSuccess()) {
            // 命令执行失败抛出异常
            if (cmd.cause() instanceof RedisException) {
                throw (RedisException) cmd.cause();
            }
            throw new RedisException("Unexpected exception while processing command", cmd.cause());
        }
        // 返回操作结果
        return cmd.getNow();
    }

    public <T> T sync(RedisStrictCommand<T> command, Object ... params) {
        // 调用 async 方法发送命令,使用 await 方法等待结果
        Future<T> r = async(null, command, params);
        return await(r);
    }

    // 向 Redis 服务器发送单个命令数据
    public <T, R> ChannelFuture send(CommandData<T, R> data) {
        return channel.writeAndFlush(data);
    }
	
    // 向 Redis 服务器发送批量命令数据
    public ChannelFuture send(CommandsData data) {
        return channel.writeAndFlush(data);
    }
	
    // 同步执行 Redis 命令
    public <T, R> R sync(Codec encoder, RedisCommand<T> command, Object ... params) {
        Future<R> r = async(encoder, command, params);
        return await(r);
    }
	
    // 异步执行 Redis 命令
    public <T, R> Future<R> async(RedisCommand<T> command, Object ... params) {
        return async(null, command, params);
    }

    public <T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params) {
        Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise();
        send(new CommandData<T, R>(promise, encoder, command, params));
        return promise;
    }

    public <T, R> CommandData<T, R> create(Codec encoder, RedisCommand<T> command, Object ... params) {
        Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise();
        return new CommandData<T, R>(promise, encoder, command, params);
    }

    public void setClosed(boolean reconnect) {
        this.closed = reconnect;
    }

    public boolean isClosed() {
        return closed;
    }

    public ChannelFuture closeAsync() {
        setClosed(true);
        return channel.close();
    }

    @Override
    public String toString() {
        return getClass().getSimpleName() + " [redisClient=" + redisClient + ", channel=" + channel + "]";
    }

}

3. 发布订阅连接 RedisPubSubConnection

这个类继承自RedisConnection,扩展了基础连接功能。

这样可以支持Redis的发布/订阅模式,没什么很特殊的地方。 主要添加了几个跟发布订阅相关的方法,比如添加监听器、添加订阅、取消订阅等。

package org.redisson.client;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.pubsub.PubSubMessage;
import org.redisson.client.protocol.pubsub.PubSubMessageDecoder;
import org.redisson.client.protocol.pubsub.PubSubPatternMessage;
import org.redisson.client.protocol.pubsub.PubSubPatternMessageDecoder;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;

import io.netty.channel.Channel;

public class RedisPubSubConnection extends RedisConnection {
	// 保存所有的Pub/Sub监听器
    final Queue<RedisPubSubListener<Object>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<Object>>();  

    public RedisPubSubConnection(RedisClient redisClient, Channel channel) {
        super(redisClient, channel);
    }
	
    // 添加监听器,收到消息的时候,监听器的方法会收到回调
    public void addListener(RedisPubSubListener listener) {
        listeners.add(listener);
    }
	
    // 添加一个一次性监听器,该监听器在接收到一条消息后被被移除,不再在接收后续的消息
    public void addOneShotListener(RedisPubSubListener listener) {
        listeners.add(new OneShotPubSubListener<Object>(this, listener));
    }

    // 移除监听器
    public void removeListener(RedisPubSubListener<?> listener) {
        listeners.remove(listener);
    }

    // 收到普通的发布订阅状态消息时,通知所有监听器。 这里包括订阅确认这种状态消息。
    public void onMessage(PubSubStatusMessage message) {
        for (RedisPubSubListener<Object> redisPubSubListener : listeners) {
            redisPubSubListener.onStatus(message.getType(), message.getChannel());
        }
    }
	
    // 收到普通的发布订阅消息的时候,通知所有监听器
    public void onMessage(PubSubMessage message) {
        for (RedisPubSubListener<Object> redisPubSubListener : listeners) {
            redisPubSubListener.onMessage(message.getChannel(), message.getValue());
        }
    }
	
    // 收到普通的模式匹配的发布订阅消息的时候,通知所有监听器
    public void onMessage(PubSubPatternMessage message) {
        for (RedisPubSubListener<Object> redisPubSubListener : listeners) {
            redisPubSubListener.onPatternMessage(message.getPattern(), message.getChannel(), message.getValue());
        }
    }
	
    // 订阅指定的 channel 
    public void subscribe(Codec codec, String ... channel) {
        // 异步发送一个 SUBSCRIBE 命令到 Redis 服务器
        async(new PubSubMessageDecoder(codec.getValueDecoder()), RedisCommands.SUBSCRIBE, channel);
    }
	
    // 订阅符合特定模式的 channel 
    public void psubscribe(Codec codec, String ... channel) {
        // 异步发送一个 PSUBSCRIBE  命令到 Redis 服务器
        async(new PubSubPatternMessageDecoder(codec.getValueDecoder()), RedisCommands.PSUBSCRIBE, channel);
    }
	
    // 取消订阅指定的 channel 
    public void unsubscribe(String ... channel) {
        // 异步发送一个 UNSUBSCRIBE  命令到 Redis 服务器
        async((MultiDecoder)null, RedisCommands.UNSUBSCRIBE, channel);
    }
	
     // 取消订阅符合特定模式的 channel
    public void punsubscribe(String ... channel) {
       // 异步发送一个 PUNSUBSCRIBE  命令到 Redis 服务器
        async((MultiDecoder)null, RedisCommands.PUNSUBSCRIBE, channel);
    }

    private <T, R> void async(MultiDecoder<Object> messageDecoder, RedisCommand<T> command, Object ... params) {
        // 立刻通过 netty 的 channel 把消息发送出去
        channel.writeAndFlush(new CommandData<T, R>(null, messageDecoder, null, command, params));
    }

}

4. 连接看门狗 ConnectionWatchdog

连接看门狗,主要是监控连接状态并在连接断开时进行重连。 跟 Redisson V1 Lettuce 中的看门狗代码几乎一模一样。

核心的知识点就是重连,重连逻辑是采用递归调用来实现的。重连失败后,也不会立即就重连。 重连间隔时间会根据 attempts 值按指数增加。

每次失败的重连尝试后,等待的时间是 2 << attempts 毫秒。 这是通过位移来实现的。具体的等待时间如下:

  • 第一次重连失败后,等待 2^2 = 4 毫秒。
  • 第二次重连失败后,等待 2^3 = 8 毫秒。
  • 第三次重连失败后,等待 2^4 = 16 毫秒。
  • 以此类推。

但是为了防止后面等待的时间越来越长,重试的最大等待时间将会是 2^13 = 8192 毫秒。 这个跟变量 BACKOFF_CAP 有关系,也就是重试了12次以上的话,间隔时间将不会继续变大。

这样可以减少对 Redis 服务器的压力,同时还可以保证中断后可以正常重连恢复。

package org.redisson.client.handler;

import java.util.concurrent.TimeUnit;

import org.redisson.client.RedisConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.util.concurrent.GenericFutureListener;

public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {

    private final Logger log = LoggerFactory.getLogger(getClass());

    private final Bootstrap bootstrap;  // 重新建立连接需要的 bootstrap 对象
    
    // 管理所有活跃的 channel 。 ChannelGroup 是 Netty 中的工具,可以对一组 Channel 进行批量操作
    private final ChannelGroup channels;  
    private static final int BACKOFF_CAP = 12;  //最大重试间隔指数

    public ConnectionWatchdog(Bootstrap bootstrap, ChannelGroup channels) {
        this.bootstrap = bootstrap;
        this.channels  = channels;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 连接活跃状态时,添加到 ChannelGroup 中
        channels.add(ctx.channel());
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        RedisConnection connection = ctx.channel().attr(RedisConnection.CONNECTION).get();
        // 连接未关闭,尝试重新连接
        if (!connection.isClosed()) {
            EventLoopGroup group = ctx.channel().eventLoop().parent();
            // 尝试进行重连
            reconnect(group, connection);
        }
        ctx.fireChannelInactive();
    }

    private void reconnect(final EventLoopGroup group, final RedisConnection connection){
        // 往 netty EveentLoppGroup 添加延迟执行任务, 会延时 100 毫秒
        group.schedule(new Runnable() {
            @Override
            public void run() {
                doReConnect(group, connection, 1);
            }
        }, 100, TimeUnit.MILLISECONDS);
    }

    private void doReConnect(final EventLoopGroup group, final RedisConnection connection, final int attempts) {
        // 连接已关闭,直接返回
        if (connection.isClosed()) {
            return;
        }

        log.debug("reconnecting {} to {} ", connection, connection.getRedisClient().getAddr(), connection);

        // 尝试重新连接
        bootstrap.connect().addListener(new GenericFutureListener<ChannelFuture>() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                // 连接已关闭,直接返回
                if (connection.isClosed()) {
                    return;
                }

                // 连接成功,更新连接的 channel 
                if (future.isSuccess()) {
                    log.debug("{} connected to {}", connection, connection.getRedisClient().getAddr());
                    connection.updateChannel(future.channel());
                    return;
                }

                // 连接失败,计划下一次重试任务
                // 每次重试都会增加等待时间。 比如 2毫秒、4毫秒、8毫秒。。。。
                int timeout = 2 << attempts;
                group.schedule(new Runnable() {
                    @Override
                    public void run() {
                        doReConnect(group, connection, Math.min(BACKOFF_CAP, attempts + 1));
                    }
                }, timeout, TimeUnit.MILLISECONDS);
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.channel().close();
    }

}

5. 连接管理业务接口 ConnectionManager

这个接口定义了一组用于管理 Redis 连接的方法,提供了创建、管理和关闭连接的功能。不过接口定义的也是很奇怪,各种方法混在一起。

这个接口包含: - 客户端管理 - 集群下的 slot 位置计算 - 编解码配置 - 连接管理 - 客户端连接创建 - 发布订阅 - 事件管理

package org.redisson.connection;

import java.util.Collection;
import java.util.NavigableMap;
import java.util.concurrent.TimeUnit;

import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.misc.InfinitySemaphoreLatch;

import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;

public interface ConnectionManager {

    // 获取所有 Redis 客户端列表
    Collection<RedisClientEntry> getClients();

    // 异步关闭指定的 Redis 客户端
    void shutdownAsync(RedisClient client);

    // 计算给定 key 的槽位。 这个应该是集群环境中的。
    int calcSlot(String key);

    // 获取时间轮实例
    HashedWheelTimer getTimer();

    // 获取主从服务器配置
    MasterSlaveServersConfig getConfig();

    // 获取编解码器
    Codec getCodec();

    // 获取所有主从映射
    NavigableMap<Integer, MasterSlaveEntry> getEntries();

    // 创建新的 Promise
    <R> Promise<R> newPromise();

    // 释放读操作连接
    void releaseRead(int slot, RedisConnection connection);

    // 释放写操作连接
    void releaseWrite(int slot, RedisConnection connection);

    // 拿到读操作连接
    RedisConnection connectionReadOp(int slot);

    // 拿到写操作连接
    RedisConnection connectionWriteOp(int slot);

    // 创建释放读操作连接的监听器
    <T> FutureListener<T> createReleaseReadListener(int slot,
            RedisConnection conn, Timeout timeout);

    // 创建释放写操作连接的监听器
    <T> FutureListener<T> createReleaseWriteListener(int slot,
            RedisConnection conn, Timeout timeout);

    // 创建新的Redis客户端,指定IP、端口和超时时间
    RedisClient createClient(String host, int port, int timeout);

    // 创建新的Redis客户端,指定 IP 和端口
    RedisClient createClient(String host, int port);

    // 获取指定 channel 的Pub/Sub连接
    PubSubConnectionEntry getEntry(String channelName);

    // 订阅指定 channel
    PubSubConnectionEntry subscribe(String channelName);

    // 模式 channel 订阅
    PubSubConnectionEntry psubscribe(String pattern);

    // 订阅指定 channel , 添加监听器
    <V> void subscribe(RedisPubSubListener<V> listener, String channelName);

    // 取消订阅指定 channel
    void unsubscribe(String channelName);

    // 取消模式 channel订阅
    void punsubscribe(String channelName);

    // 关闭连接管理器
    void shutdown();

    // 获取 netty 的 EventLoopGroup
    EventLoopGroup getGroup();

    // 创建新的定时任务
    Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);

    // 拿到信号量锁
    InfinitySemaphoreLatch getShutdownLatch();

}