七的博客

RedissonV1.0源码分析-开篇

源码分析

RedissonV1.0源码分析-开篇

最近项目中引用了 Redisson 这个组件,组件的特点是可以将 Redis 的操作像在 JAVA 的一些数据集合一样。同时这些数据集合是支持分布式的,意味着你在多个 JVM 实例上可以操作同一份数据。

对于 Redisson 是怎么实现这些分布式数据结构的有点好奇,这就是这次分析源码的目的。

没有目标盲目的去看源码,很容易不知道该看些什么。本次源码分析目标很明确,分析怎么实现的,这样在日常使用中更容易用对 API 以及处理相关问题。

1. 代码拉取

Redisson 从发布到现在已经很多年了,代码工程也是比较的复杂。 这里为了快速了解核心的原理,可以直接找到最初发布的稳定版本。这个版本的代码肯定不多,可以更加容易分析。

但是需要注意的是,开源软件在迭代过程中架构、实现机制都会有点变化。 但是像 Redisson 这种无论怎么改动肯定都还是对 Redis 的封装,所以老版本不太影响原理分析。

首先从 Github 上下载代码:

git clone https://github.com/redisson/redisson.git

然后回退到 v1.0 分支 915c238b :

git reset --hard 915c238b

2. 代码结构

回退到指定分支的代码结构大致如下:

.
├── com
│   └── lambdaworks  // 这个包下就是 Redis 客户端库 Lettuce 的源代码
│       ├── codec
│       │   └── Base16.java
│       └── redis    // Redis 协议编解码、连接相关源码
│           ├── KeyValue.java
│           ├── RedisAsyncConnection.java
│           ├── RedisClient.java
│           ├── RedisCommandInterruptedException.java
│           ├── RedisConnection.java
│           ├── RedisException.java
│           ├── ScoredValue.java
│           ├── ScriptOutputType.java
│           ├── SortArgs.java
│           ├── ZStoreArgs.java
│           ├── codec    // 编解码接口以及默认实现
│           │   ├── RedisCodec.java
│           │   └── Utf8StringCodec.java
│           ├── output   // Redis 数据结构输出对象
│           │   ├── BooleanListOutput.java
│           │   ├── BooleanOutput.java
│           │   ├── ByteArrayOutput.java
│           │   ├── DateOutput.java
│           │   ├── DoubleOutput.java
│           │   ├── IntegerOutput.java
│           │   ├── KeyListOutput.java
│           │   ├── KeyOutput.java
│           │   ├── KeyValueOutput.java
│           │   ├── MapOutput.java
│           │   ├── MultiOutput.java
│           │   ├── NestedMultiOutput.java
│           │   ├── ScoredValueListOutput.java
│           │   ├── StatusOutput.java
│           │   ├── StringListOutput.java
│           │   ├── ValueListOutput.java
│           │   ├── ValueOutput.java
│           │   └── ValueSetOutput.java
│           ├── protocol   // 协议编解码、看门狗等
│           │   ├── Charsets.java
│           │   ├── Command.java
│           │   ├── CommandArgs.java
│           │   ├── CommandHandler.java
│           │   ├── CommandKeyword.java
│           │   ├── CommandOutput.java
│           │   ├── CommandType.java
│           │   ├── ConnectionWatchdog.java
│           │   └── RedisStateMachine.java
│           └── pubsub   // 发布订阅相关
│               ├── PubSubCommandHandler.java
│               ├── PubSubOutput.java
│               ├── RedisPubSubAdapter.java
│               ├── RedisPubSubConnection.java
│               └── RedisPubSubListener.java
└── org
    └── redisson    // redisson 的代码
        ├── Config.java   // Redisson 配置
        ├── RedisPubSubTopicListenerWrapper.java   // Redisson 对 lettuce 发布订阅监听器包裹类
        ├── Redisson.java   // Redisson 工具主类
        ├── RedissonAtomicLong.java   // AtomicLong 分布式实现
        ├── RedissonCountDownLatch.java   // CountDownLatch 分布式实现
        ├── RedissonList.java   // List 分布式实现
        ├── RedissonLock.java   // Lock 分布式实现
        ├── RedissonMap.java    // Map 分布式实现
        ├── RedissonObject.java  // Redisson 分布式对象实现
        ├── RedissonQueue.java  // Queue 分布式实现 
        ├── RedissonSet.java    // Set 分布式实现
        ├── RedissonTopic.java  //  Topic 分布式实现
        ├── codec   // 编解码
        │   ├── JsonJacksonCodec.java   // Jackson Json 序列化反序列化实现
        │   ├── RedisCodecWrapper.java  // 对 lettuce 的编解码接口包裹
        │   ├── RedissonCodec.java   // 序列化反序列化接口
        │   └── SerializationCodec.java   // JDK 默认序列化反序列化实现
        ├── connection  // 连接管理相关
        │   ├── ConnectionManager.java   // 连接管理
        │   ├── LoadBalancer.java      // 负载均衡实现接口
        │   ├── RandomLoadBalancer.java   // 随机负载均衡实现
        │   └── RoundRobinLoadBalancer.java  //  轮训负载均衡实现
        ├── core  // Redisson 相关分布式数据结构接口定义
        │   ├── MessageListener.java   // 消息监听器接口
        │   ├── RAtomicLong.java   // AtomicLong 接口
        │   ├── RCountDownLatch.java  // CountDownLatch 接口
        │   ├── RList.java  // List 分布式接口
        │   ├── RLock.java  // Lock 分布式接口
        │   ├── RMap.java   // Map 分布式接口
        │   ├── RObject.java  // Object 分布式接口
        │   ├── RQueue.java   // Queue 分布式接口
        │   ├── RSet.java     // Set 分布式接口
        │   └── RTopic.java   // Topic 分布式接口
        └── misc  // 杂项
            ├── ReclosableLatch.java  // 可重复开关线程门栓
            └── ReferenceMap.java   // Redisson 定制的引用 HashMap 实现

从上面的结构中可以看到 , Redisson V1.0 banben 实际上是引用了一个开源客户端 lambdaworks 。 后来 lambdaworks 后来被更名为 Lettuce,并继续作为一个独立的 Redis 客户端库使用。

关于 Lettuce 在另外的文章中已经分析过了,不再单独分析。 这几篇文章主要还是以分析 Redisson 的实现为主,具体的 Redis 协议编解码以及主要的通信主要是在 lambdaworks 这个包下。

3. 源码分析思路

有了上面的这个结构,那么就可以划分出源码分析的路径。 按照下面的思路进行分析:

  • Redisson 对数据的编解码是怎么实现的。
  • Redisson 对于 Netty 连接怎么管理的,包括一些重连机制。
  • Redisson 对于 Redis 服务的负载均衡怎么实现。
  • Redisson 配置文件包含哪些属性,怎么配置。
  • Redisson 一些工具类的实现,包括可重复开关的门闩工具、引用 Map 工具。
  • Redisson 分布式接口怎么实现的。

针对上面分析,为了不让一篇文章内容过多,拆分出三篇文章:

  • 编解码、连接管理、负载均衡、配置管理。
  • Redisson 工具类实现。
  • Redisson 分布式接口实现。

第一篇文章的内容将会直接本文中进行编写。其他两篇文章单独开一章。

3. 第一部分源码分析

3.1 编解码部分

编解码是指对数据进行序列化以及反序列化的过程,因为数据在 Redis 中是以字节数组的形式存储。

Redisson 跟 Redis 交互需要将对象转为字节数组,或者从字节数组转成 Java 对象。

3.1.1 编解码接口 RedissonCodec

这是 Redisson 定义的编解码器接口,用来实现数据的序列化和反序列化,通过实现这个接口可以实现不同的序列化算法。

这个接口的定义跟 leetuce 中的 org.redisson.codec.RedissonCodec 定义是一模一样的。

package org.redisson.codec;

import java.nio.ByteBuffer;


public interface RedissonCodec {
	// 解码 key 
    Object decodeKey(ByteBuffer bytes);
	// 解码 value
    Object decodeValue(ByteBuffer bytes);
    // 编码 key 
    byte[] encodeKey(Object key);
    // 编码 value
    byte[] encodeValue(Object value);

}


3.1.2 Lettuce 编解码接口包裹类 RedisCodecWrapper

RedisCodecWrapper 这个类继了 lettuce 的编解码接口 com.lambdaworks.redis.codec.RedisCodec。

主要作用是充当一个包装器,它将 RedissonCodec 接口适配到 com.lambdaworks.redis.codec.RedisCodec 接口。

通过这个包装器,Redisson 可以使用自己的编解码器接口,不需要直接依赖于lettuce的接口,后续也可以替换成其他实现。

package org.redisson.codec;

import java.nio.ByteBuffer;

import com.lambdaworks.redis.codec.RedisCodec;


public class RedisCodecWrapper extends RedisCodec<Object, Object> {

    private final RedissonCodec redissonCodec;

    public RedisCodecWrapper(RedissonCodec redissonCodec) {
        this.redissonCodec = redissonCodec;
    }

    @Override
    public Object decodeKey(ByteBuffer bytes) {
        return redissonCodec.decodeKey(bytes);
    }

    @Override
    public Object decodeValue(ByteBuffer bytes) {
        return redissonCodec.decodeValue(bytes);
    }

    @Override
    public byte[] encodeKey(Object key) {
        return redissonCodec.encodeKey(key);
    }

    @Override
    public byte[] encodeValue(Object value) {
        return redissonCodec.encodeValue(value);
    }

}


3.1.3 Java标准序列化机制编解码实现 SerializationCodec

这个类就是利用的 Java 中的序列化以及反序列化机制,效率不是特别的高。 优势在不依赖于第三方的库,只限于 Java 语言。

package org.redisson.codec;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;


public class SerializationCodec implements RedissonCodec {

    @Override
    public Object decodeKey(ByteBuffer bytes) {
        return decode(bytes);
    }

    @Override
    public Object decodeValue(ByteBuffer bytes) {
        return decode(bytes);
    }

    // 使用 ObjectInputStream 来反序列化 ByteBuffer 中的数据
    private Object decode(ByteBuffer bytes) {
        try {
            ByteArrayInputStream in = new ByteArrayInputStream(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.limit());
            ObjectInputStream inputStream = new ObjectInputStream(in);
            return inputStream.readObject();
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }

    @Override
    public byte[] encodeKey(Object key) {
        return encodeValue(key);
    }

    // 创建 ByteArrayOutputStream 和 ObjectOutputStream
    // 使用 writeObject() 方法来序列化对象
    @Override
    public byte[] encodeValue(Object value) {
        try {
            ByteArrayOutputStream result = new ByteArrayOutputStream();
            ObjectOutputStream outputStream = new ObjectOutputStream(result);
            outputStream.writeObject(value);
            outputStream.close();
            return result.toByteArray();
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }

}


3.1.4 Jackson Json 编解码实现 JsonJacksonCodec

调用 Jackson 来反序列化或者序列化成 JSON 的实现。 这是一种比较常用的编解码实现。

package org.redisson.codec;

import java.nio.ByteBuffer;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectMapper.DefaultTypeResolverBuilder;
import com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.jsontype.TypeResolverBuilder;

public class JsonJacksonCodec implements RedissonCodec {
	// ObjectMapper 是一个主要的工具类,用于各种类型数据和 Json 类型的转换 
    private final ObjectMapper objectMapper = new ObjectMapper();

    public JsonJacksonCodec() {
    	// 做一些 JSON 序列化反序列化的配置
    	// 包括序列化时只包含非 null 值 、 反序列化时忽略未知属性、BigDecimal 的写入方式、属性按字母顺序排序、包含类型信息的方式
        objectMapper.setSerializationInclusion(Include.NON_NULL);
        objectMapper.setVisibilityChecker(objectMapper.getSerializationConfig().getDefaultVisibilityChecker()
                                            .withFieldVisibility(JsonAutoDetect.Visibility.ANY)
                                            .withGetterVisibility(JsonAutoDetect.Visibility.NONE)
                                            .withSetterVisibility(JsonAutoDetect.Visibility.NONE)
                                            .withCreatorVisibility(JsonAutoDetect.Visibility.NONE));
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        objectMapper.configure(SerializationFeature.WRITE_BIGDECIMAL_AS_PLAIN, true);
        objectMapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);

        // type info inclusion
        TypeResolverBuilder<?> typer = new DefaultTypeResolverBuilder(DefaultTyping.NON_FINAL);
        typer.init(JsonTypeInfo.Id.CLASS, null);
        typer.inclusion(JsonTypeInfo.As.PROPERTY);
        objectMapper.setDefaultTyping(typer);
    }

    @Override
    public Object decodeKey(ByteBuffer bytes) {
        return decode(bytes);
    }

    @Override
    public Object decodeValue(ByteBuffer bytes) {
        return decode(bytes);
    }

    // 反序列化 JSON 数据
    private Object decode(ByteBuffer bytes) {
        try {
            return objectMapper.readValue(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.limit(), Object.class);
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }

    @Override
    public byte[] encodeKey(Object key) {
        return encodeValue(key);
    }

    // 序列化 JSON 数据
    @Override
    public byte[] encodeValue(Object value) {
        try {
            return objectMapper.writeValueAsBytes(value);
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }

}


3.2 连接管理部分

Redisson 的连接管理主要是还是依赖于 Lettuce 的一些接口进行包装,适配 Redisson 自己的逻辑。

3.2.1 Redis连接管理 ConnectionManager

ConnectionManager 是 Redisson 管理 Redis 连接的核心类。

主要负责创建、维护和分配 Redis 连接,包括普通连接和发布/订阅(PubSub)连接,而这一块调用的还是 Lettuce 的相关 API 。

package org.redisson.connection;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;

import org.redisson.Config;
import org.redisson.codec.RedisCodecWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubListener;


public class ConnectionManager {

	// 管理发布/订阅连接
    public static class PubSubEntry {
    	// 定义信号量、连接和每个连接的订阅数量
        private final Semaphore semaphore;
        private final RedisPubSubConnection conn;
        private final int subscriptionsPerConnection;

        public PubSubEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) {
            super();
            this.conn = conn;
            this.subscriptionsPerConnection = subscriptionsPerConnection;
            this.semaphore = new Semaphore(subscriptionsPerConnection);
        }

        // 添加发布/订阅监听器
        public void addListener(RedisPubSubListener listener) {
            conn.addListener(listener);
        }

        // 移除发布/订阅监听器
        public void removeListener(RedisPubSubListener listener) {
            conn.removeListener(listener);
        }

        public boolean subscribe(RedisPubSubAdapter listener, Object channel) {
        	// 获取信号量成功就执行订阅操作
            if (semaphore.tryAcquire()) {
                conn.addListener(listener);
                conn.subscribe(channel);
                return true;
            }
            return false;
        }

        public void unsubscribe(Object channel) {
        	// 取消订阅,释放信号量
            conn.unsubscribe(channel);
            semaphore.release();
        }

        public boolean tryClose() {
            if (semaphore.tryAcquire(subscriptionsPerConnection)) {
                conn.close();
                return true;
            }
            return false;
        }

    }

    private final Logger log = LoggerFactory.getLogger(getClass());

    // 保存可以重用的 Redis 连接。
    private final Queue<RedisConnection> connections = new ConcurrentLinkedQueue<RedisConnection>();
    // 发布订阅连接
    private final Queue<PubSubEntry> pubSubConnections = new ConcurrentLinkedQueue<PubSubEntry>();
    // Redis 客户端实例
    private final List<RedisClient> clients = new ArrayList<RedisClient>();


    private final Semaphore activeConnections;  // 信号量控制活跃连接的数量   防止超过配置的连接池大小
    private final RedisCodec codec;   // 数据编解码策略
    private final Config config;   // 参数配置
    private final LoadBalancer balancer;   // 负载均衡策略

    public ConnectionManager(Config config) {
    	// 遍历配置文件中的地址信息,给每个地址创建一个 RedisClient 实例,保存到 List 中备用。
        for (URI address : config.getAddresses()) {
            RedisClient client = new RedisClient(address.getHost(), address.getPort());
            clients.add(client);
        }
        // 初始化负载均衡
        balancer = config.getLoadBalancer();
        balancer.init(clients);

        codec = new RedisCodecWrapper(config.getCodec());
        // 从配置文件中取最大连接池数量
        activeConnections = new Semaphore(config.getConnectionPoolSize());
        this.config = config;
    }

    public <K, V> RedisConnection<K, V> connection() {
    	// 尝试获取一个连接
        acquireConnection();

        // 从连接池中获取一个连接,没获取到就建立一个连接
        RedisConnection<K, V> conn = connections.poll();
        if (conn == null) {
            conn = balancer.nextClient().connect(codec);
            if (config.getPassword() != null) {
                conn.auth(config.getPassword());
            }
        }
        return conn;
    }

    // 频道订阅
    public <K, V> PubSubEntry subscribe(RedisPubSubAdapter<K, V> listener, K channel) {
    	// 尝试在现有的连接上直接订阅,如果订阅成功就直接返回可用的 PubSubEntry
        for (PubSubEntry entry : pubSubConnections) {
            if (entry.subscribe(listener, channel)) {
                return entry;
            }
        }

        // 如果没有可用的连接,调用 acquireConnection 方法拿到新的连接。
        acquireConnection();

        // 从负载均衡列表中拿到一个连接
        RedisPubSubConnection<K, V> conn = balancer.nextClient().connectPubSub(codec);
        if (config.getPassword() != null) {
            conn.auth(config.getPassword());
        }
        // 创建新的 PubSubEntry ,然后添加到队列中
        PubSubEntry entry = new PubSubEntry(conn,config.getSubscriptionsPerConnection());
        entry.subscribe(listener, channel);
        pubSubConnections.add(entry);
        return entry;
    }

    // 获取连接
    private void acquireConnection() {
    	// 从信号量中获取一个许可,拿不到就使用 acquireUninterruptibly 方法阻塞当前线程
    	// 一直拿到到一个许可为止
        if (!activeConnections.tryAcquire()) {
            log.warn("Connection pool gets exhausted! Trying to acquire connection ...");
            long time = System.currentTimeMillis();
            activeConnections.acquireUninterruptibly();
            long endTime = System.currentTimeMillis() - time;
            log.warn("Connection acquired, time spended: {} ms", endTime);
        }
    }

    // 取消指定频道的订阅
    public <K> void unsubscribe(PubSubEntry entry, K channel) {
        entry.unsubscribe(channel);
        if (entry.tryClose()) {
            pubSubConnections.remove(entry);
            activeConnections.release();
        }
    }

    // 释放连接
    public void release(RedisConnection сonnection) {
    	// 释放信号量
        activeConnections.release();
        connections.add(сonnection);
    }

    // 遍历 clients 信息,循环调用每个 RedisClient 的 shutdown 方法
    // 释放资源并关闭连接
    public void shutdown() {
        for (RedisClient client : clients) {
            client.shutdown();
        }
    }

}


这个类算是很关键的一个组件了,其中设计到挺多的知识点,比如:

  • 连接池机制。用连接池来管理多个 Redis 连接,减少连接建立的开销。
  • 信号量,使用信号量来控制并发。
  • 使用了负载均衡,这样可以自动选择合适的 Redis 节点。

3.3 负载均衡部分

负载均衡主要是指在连接 Redis 集群的时候,怎么样将请求分配到不同的 Redis 节点效率会更高。

可能集群节点配置不一样,有的节点高配,有的节点低配。 那么就需要高配的多分担一点请求,低配的少分担一点请求。

3.3.1 负载均衡接口定义 LoadBalancer

package org.redisson.connection;

import java.util.List;

import com.lambdaworks.redis.RedisClient;

public interface LoadBalancer {

    void init(List<RedisClient> clients);

    RedisClient nextClient();
}

3.3.2 随机策略实现 RandomLoadBalancer

这个比较简单就是随机从 Redis 节点列表中取一个返回。

package org.redisson.connection;

import java.util.List;
import java.util.Random;

import com.lambdaworks.redis.RedisClient;

public class RandomLoadBalancer implements LoadBalancer {

    private final Random random = new Random();

    private List<RedisClient> clients;

    public void init(List<RedisClient> clients) {
        this.clients = clients;
    }

    public RedisClient nextClient() {
    	// 生成一个随机数,随机数的范围在列表的大小范围内。 最后返回 clients 列表中该索引的对象
        return clients.get(random.nextInt(clients.size()));
    }

}



假设有三个 Redis 节点:

  • 节点1:redis://127.0.0.1:6379
  • 节点2:redis://127.0.0.2:6379
  • 节点3:redis://127.0.0.3:6379

随机策略下,节点选择是随机的,可能会出现如下情况:

  • 第一次请求:选择节点2

  • 第二次请求:选择节点3

  • 第三次请求:选择节点1

  • 第四次请求:选择节点3

  • 第五次请求:选择节点2

  • 依此类推。。。

可以看到节点选择毫无规律,每一次都是随机选择。不过如果在大量请求的情况下,节点的选择还是会趋于均匀。

3.3.3 轮询策略实现 RoundRobinLoadBalancer

这种也比较好理解,就是请求按照顺序依次分配给每个 Redis 节点:

package org.redisson.connection;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import com.lambdaworks.redis.RedisClient;

public class RoundRobinLoadBalancer implements LoadBalancer {

    private final AtomicInteger index = new AtomicInteger(-1);

    private List<RedisClient> clients;

    @Override
    public void init(List<RedisClient> clients) {
        this.clients = clients;
    }

    @Override
    public RedisClient nextClient() {
    	// 每次访问,上面这个 index 数值都会自增
    	// 使用取模运算, 确保下标在客户端列表范围内进行循环
    	// Math.abs() 确保下标为非负数
    	
        int ind = Math.abs(index.incrementAndGet() % clients.size());
        return clients.get(ind);
    }

}

假设有三个 Redis 节点:

  • 节点1:redis://127.0.0.1:6379
  • 节点2:redis://127.0.0.2:6379
  • 节点3:redis://127.0.0.3:6379

轮询策略下,节点选择的顺序如下:

  • 第一次请求:选择节点1
  • 第二次请求:选择节点2
  • 第三次请求:选择节点3
  • 第四次请求:回到节点1
  • 第五次请求:选择节点2
  • 依此类推…

可以看出,轮训策略下,每一个节点分配的请求都是均匀的。

3.4 配置管理部分

Redisson 目前的配置管理还是比较简单的。

3.4.1 主要的参数配置类 Config

Redisson 参数配置类,目前这个版本配置还是相当的简单,可配置的属性少。

package org.redisson;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;

import org.redisson.codec.JsonJacksonCodec;
import org.redisson.codec.RedissonCodec;
import org.redisson.connection.LoadBalancer;
import org.redisson.connection.RoundRobinLoadBalancer;


public class Config {

    // 负载均衡算法配置。 默认轮询算法
    private LoadBalancer loadBalancer = new RoundRobinLoadBalancer();

    // 数据编解码策略,默认 Jackson JSON 编解码
    private RedissonCodec codec = new JsonJacksonCodec();

    // 每个 Redis 连接的最大订阅数。 默认 5
    private int subscriptionsPerConnection = 5;

    // Redis 连接池的大小限制。 默认 100
    private int connectionPoolSize = 100;

    // 认证密码 默认为 null,表示不需要密码
    private String password;

    // Redis 服务器地址列表
    private List<URI> addresses = new ArrayList<URI>();

    public Config() {
    }

    Config(Config oldConf) {
        setCodec(oldConf.getCodec());
        setConnectionPoolSize(oldConf.getConnectionPoolSize());
        setPassword(oldConf.getPassword());
        setSubscriptionsPerConnection(oldConf.getSubscriptionsPerConnection());
        setAddresses(oldConf.getAddresses());
        setLoadBalancer(oldConf.getLoadBalancer());
    }

    public void setCodec(RedissonCodec codec) {
        this.codec = codec;
    }
    public RedissonCodec getCodec() {
        return codec;
    }
    public void setPassword(String password) {
        this.password = password;
    }
    public String getPassword() {
        return password;
    }

    public void setSubscriptionsPerConnection(int subscriptionsPerConnection) {
        this.subscriptionsPerConnection = subscriptionsPerConnection;
    }
    public int getSubscriptionsPerConnection() {
        return subscriptionsPerConnection;
    }

    public void setConnectionPoolSize(int connectionPoolSize) {
        this.connectionPoolSize = connectionPoolSize;
    }
    public int getConnectionPoolSize() {
        return connectionPoolSize;
    }

    // 添加 Redis 服务器地址 。 格式为 "host:port"
    public void addAddress(String ... addressesVar) {
        for (String address : addressesVar) {
            try {
                addresses.add(new URI("//" + address));
            } catch (URISyntaxException e) {
                throw new IllegalArgumentException("Can't parse " + address);
            }
        }
    }
    public List<URI> getAddresses() {
        return addresses;
    }
    void setAddresses(List<URI> addresses) {
        this.addresses = addresses;
    }

    public void setLoadBalancer(LoadBalancer loadBalancer) {
        this.loadBalancer = loadBalancer;
    }
    public LoadBalancer getLoadBalancer() {
        return loadBalancer;
    }

}