RedissonV1.0源码分析-开篇
RedissonV1.0源码分析-开篇
最近项目中引用了 Redisson 这个组件,组件的特点是可以将 Redis 的操作像在 JAVA 的一些数据集合一样。同时这些数据集合是支持分布式的,意味着你在多个 JVM 实例上可以操作同一份数据。
对于 Redisson 是怎么实现这些分布式数据结构的有点好奇,这就是这次分析源码的目的。
没有目标盲目的去看源码,很容易不知道该看些什么。本次源码分析目标很明确,分析怎么实现的,这样在日常使用中更容易用对 API 以及处理相关问题。
1. 代码拉取
Redisson 从发布到现在已经很多年了,代码工程也是比较的复杂。 这里为了快速了解核心的原理,可以直接找到最初发布的稳定版本。这个版本的代码肯定不多,可以更加容易分析。
但是需要注意的是,开源软件在迭代过程中架构、实现机制都会有点变化。 但是像 Redisson 这种无论怎么改动肯定都还是对 Redis 的封装,所以老版本不太影响原理分析。
首先从 Github 上下载代码:
git clone https://github.com/redisson/redisson.git
然后回退到 v1.0 分支 915c238b :
git reset --hard 915c238b
2. 代码结构
回退到指定分支的代码结构大致如下:
.
├── com
│ └── lambdaworks // 这个包下就是 Redis 客户端库 Lettuce 的源代码
│ ├── codec
│ │ └── Base16.java
│ └── redis // Redis 协议编解码、连接相关源码
│ ├── KeyValue.java
│ ├── RedisAsyncConnection.java
│ ├── RedisClient.java
│ ├── RedisCommandInterruptedException.java
│ ├── RedisConnection.java
│ ├── RedisException.java
│ ├── ScoredValue.java
│ ├── ScriptOutputType.java
│ ├── SortArgs.java
│ ├── ZStoreArgs.java
│ ├── codec // 编解码接口以及默认实现
│ │ ├── RedisCodec.java
│ │ └── Utf8StringCodec.java
│ ├── output // Redis 数据结构输出对象
│ │ ├── BooleanListOutput.java
│ │ ├── BooleanOutput.java
│ │ ├── ByteArrayOutput.java
│ │ ├── DateOutput.java
│ │ ├── DoubleOutput.java
│ │ ├── IntegerOutput.java
│ │ ├── KeyListOutput.java
│ │ ├── KeyOutput.java
│ │ ├── KeyValueOutput.java
│ │ ├── MapOutput.java
│ │ ├── MultiOutput.java
│ │ ├── NestedMultiOutput.java
│ │ ├── ScoredValueListOutput.java
│ │ ├── StatusOutput.java
│ │ ├── StringListOutput.java
│ │ ├── ValueListOutput.java
│ │ ├── ValueOutput.java
│ │ └── ValueSetOutput.java
│ ├── protocol // 协议编解码、看门狗等
│ │ ├── Charsets.java
│ │ ├── Command.java
│ │ ├── CommandArgs.java
│ │ ├── CommandHandler.java
│ │ ├── CommandKeyword.java
│ │ ├── CommandOutput.java
│ │ ├── CommandType.java
│ │ ├── ConnectionWatchdog.java
│ │ └── RedisStateMachine.java
│ └── pubsub // 发布订阅相关
│ ├── PubSubCommandHandler.java
│ ├── PubSubOutput.java
│ ├── RedisPubSubAdapter.java
│ ├── RedisPubSubConnection.java
│ └── RedisPubSubListener.java
└── org
└── redisson // redisson 的代码
├── Config.java // Redisson 配置
├── RedisPubSubTopicListenerWrapper.java // Redisson 对 lettuce 发布订阅监听器包裹类
├── Redisson.java // Redisson 工具主类
├── RedissonAtomicLong.java // AtomicLong 分布式实现
├── RedissonCountDownLatch.java // CountDownLatch 分布式实现
├── RedissonList.java // List 分布式实现
├── RedissonLock.java // Lock 分布式实现
├── RedissonMap.java // Map 分布式实现
├── RedissonObject.java // Redisson 分布式对象实现
├── RedissonQueue.java // Queue 分布式实现
├── RedissonSet.java // Set 分布式实现
├── RedissonTopic.java // Topic 分布式实现
├── codec // 编解码
│ ├── JsonJacksonCodec.java // Jackson Json 序列化反序列化实现
│ ├── RedisCodecWrapper.java // 对 lettuce 的编解码接口包裹
│ ├── RedissonCodec.java // 序列化反序列化接口
│ └── SerializationCodec.java // JDK 默认序列化反序列化实现
├── connection // 连接管理相关
│ ├── ConnectionManager.java // 连接管理
│ ├── LoadBalancer.java // 负载均衡实现接口
│ ├── RandomLoadBalancer.java // 随机负载均衡实现
│ └── RoundRobinLoadBalancer.java // 轮训负载均衡实现
├── core // Redisson 相关分布式数据结构接口定义
│ ├── MessageListener.java // 消息监听器接口
│ ├── RAtomicLong.java // AtomicLong 接口
│ ├── RCountDownLatch.java // CountDownLatch 接口
│ ├── RList.java // List 分布式接口
│ ├── RLock.java // Lock 分布式接口
│ ├── RMap.java // Map 分布式接口
│ ├── RObject.java // Object 分布式接口
│ ├── RQueue.java // Queue 分布式接口
│ ├── RSet.java // Set 分布式接口
│ └── RTopic.java // Topic 分布式接口
└── misc // 杂项
├── ReclosableLatch.java // 可重复开关线程门栓
└── ReferenceMap.java // Redisson 定制的引用 HashMap 实现
从上面的结构中可以看到 , Redisson V1.0 banben 实际上是引用了一个开源客户端 lambdaworks 。 后来 lambdaworks 后来被更名为 Lettuce,并继续作为一个独立的 Redis 客户端库使用。
关于 Lettuce 在另外的文章中已经分析过了,不再单独分析。 这几篇文章主要还是以分析 Redisson 的实现为主,具体的 Redis 协议编解码以及主要的通信主要是在 lambdaworks 这个包下。
3. 源码分析思路
有了上面的这个结构,那么就可以划分出源码分析的路径。 按照下面的思路进行分析:
- Redisson 对数据的编解码是怎么实现的。
- Redisson 对于 Netty 连接怎么管理的,包括一些重连机制。
- Redisson 对于 Redis 服务的负载均衡怎么实现。
- Redisson 配置文件包含哪些属性,怎么配置。
- Redisson 一些工具类的实现,包括可重复开关的门闩工具、引用 Map 工具。
- Redisson 分布式接口怎么实现的。
针对上面分析,为了不让一篇文章内容过多,拆分出三篇文章:
- 编解码、连接管理、负载均衡、配置管理。
- Redisson 工具类实现。
- Redisson 分布式接口实现。
第一篇文章的内容将会直接本文中进行编写。其他两篇文章单独开一章。
3. 第一部分源码分析
3.1 编解码部分
编解码是指对数据进行序列化以及反序列化的过程,因为数据在 Redis 中是以字节数组的形式存储。
Redisson 跟 Redis 交互需要将对象转为字节数组,或者从字节数组转成 Java 对象。
3.1.1 编解码接口 RedissonCodec
这是 Redisson 定义的编解码器接口,用来实现数据的序列化和反序列化,通过实现这个接口可以实现不同的序列化算法。
这个接口的定义跟 leetuce 中的 org.redisson.codec.RedissonCodec 定义是一模一样的。
package org.redisson.codec;
import java.nio.ByteBuffer;
public interface RedissonCodec {
// 解码 key
Object decodeKey(ByteBuffer bytes);
// 解码 value
Object decodeValue(ByteBuffer bytes);
// 编码 key
byte[] encodeKey(Object key);
// 编码 value
byte[] encodeValue(Object value);
}
3.1.2 Lettuce 编解码接口包裹类 RedisCodecWrapper
RedisCodecWrapper 这个类继了 lettuce 的编解码接口 com.lambdaworks.redis.codec.RedisCodec。
主要作用是充当一个包装器,它将 RedissonCodec 接口适配到 com.lambdaworks.redis.codec.RedisCodec 接口。
通过这个包装器,Redisson 可以使用自己的编解码器接口,不需要直接依赖于lettuce的接口,后续也可以替换成其他实现。
package org.redisson.codec;
import java.nio.ByteBuffer;
import com.lambdaworks.redis.codec.RedisCodec;
public class RedisCodecWrapper extends RedisCodec<Object, Object> {
private final RedissonCodec redissonCodec;
public RedisCodecWrapper(RedissonCodec redissonCodec) {
this.redissonCodec = redissonCodec;
}
@Override
public Object decodeKey(ByteBuffer bytes) {
return redissonCodec.decodeKey(bytes);
}
@Override
public Object decodeValue(ByteBuffer bytes) {
return redissonCodec.decodeValue(bytes);
}
@Override
public byte[] encodeKey(Object key) {
return redissonCodec.encodeKey(key);
}
@Override
public byte[] encodeValue(Object value) {
return redissonCodec.encodeValue(value);
}
}
3.1.3 Java标准序列化机制编解码实现 SerializationCodec
这个类就是利用的 Java 中的序列化以及反序列化机制,效率不是特别的高。 优势在不依赖于第三方的库,只限于 Java 语言。
package org.redisson.codec;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
public class SerializationCodec implements RedissonCodec {
@Override
public Object decodeKey(ByteBuffer bytes) {
return decode(bytes);
}
@Override
public Object decodeValue(ByteBuffer bytes) {
return decode(bytes);
}
// 使用 ObjectInputStream 来反序列化 ByteBuffer 中的数据
private Object decode(ByteBuffer bytes) {
try {
ByteArrayInputStream in = new ByteArrayInputStream(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.limit());
ObjectInputStream inputStream = new ObjectInputStream(in);
return inputStream.readObject();
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
@Override
public byte[] encodeKey(Object key) {
return encodeValue(key);
}
// 创建 ByteArrayOutputStream 和 ObjectOutputStream
// 使用 writeObject() 方法来序列化对象
@Override
public byte[] encodeValue(Object value) {
try {
ByteArrayOutputStream result = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(result);
outputStream.writeObject(value);
outputStream.close();
return result.toByteArray();
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
}
3.1.4 Jackson Json 编解码实现 JsonJacksonCodec
调用 Jackson 来反序列化或者序列化成 JSON 的实现。 这是一种比较常用的编解码实现。
package org.redisson.codec;
import java.nio.ByteBuffer;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectMapper.DefaultTypeResolverBuilder;
import com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.jsontype.TypeResolverBuilder;
public class JsonJacksonCodec implements RedissonCodec {
// ObjectMapper 是一个主要的工具类,用于各种类型数据和 Json 类型的转换
private final ObjectMapper objectMapper = new ObjectMapper();
public JsonJacksonCodec() {
// 做一些 JSON 序列化反序列化的配置
// 包括序列化时只包含非 null 值 、 反序列化时忽略未知属性、BigDecimal 的写入方式、属性按字母顺序排序、包含类型信息的方式
objectMapper.setSerializationInclusion(Include.NON_NULL);
objectMapper.setVisibilityChecker(objectMapper.getSerializationConfig().getDefaultVisibilityChecker()
.withFieldVisibility(JsonAutoDetect.Visibility.ANY)
.withGetterVisibility(JsonAutoDetect.Visibility.NONE)
.withSetterVisibility(JsonAutoDetect.Visibility.NONE)
.withCreatorVisibility(JsonAutoDetect.Visibility.NONE));
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.configure(SerializationFeature.WRITE_BIGDECIMAL_AS_PLAIN, true);
objectMapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
// type info inclusion
TypeResolverBuilder<?> typer = new DefaultTypeResolverBuilder(DefaultTyping.NON_FINAL);
typer.init(JsonTypeInfo.Id.CLASS, null);
typer.inclusion(JsonTypeInfo.As.PROPERTY);
objectMapper.setDefaultTyping(typer);
}
@Override
public Object decodeKey(ByteBuffer bytes) {
return decode(bytes);
}
@Override
public Object decodeValue(ByteBuffer bytes) {
return decode(bytes);
}
// 反序列化 JSON 数据
private Object decode(ByteBuffer bytes) {
try {
return objectMapper.readValue(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.limit(), Object.class);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
@Override
public byte[] encodeKey(Object key) {
return encodeValue(key);
}
// 序列化 JSON 数据
@Override
public byte[] encodeValue(Object value) {
try {
return objectMapper.writeValueAsBytes(value);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
}
3.2 连接管理部分
Redisson 的连接管理主要是还是依赖于 Lettuce 的一些接口进行包装,适配 Redisson 自己的逻辑。
3.2.1 Redis连接管理 ConnectionManager
ConnectionManager 是 Redisson 管理 Redis 连接的核心类。
主要负责创建、维护和分配 Redis 连接,包括普通连接和发布/订阅(PubSub)连接,而这一块调用的还是 Lettuce 的相关 API 。
package org.redisson.connection;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import org.redisson.Config;
import org.redisson.codec.RedisCodecWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubListener;
public class ConnectionManager {
// 管理发布/订阅连接
public static class PubSubEntry {
// 定义信号量、连接和每个连接的订阅数量
private final Semaphore semaphore;
private final RedisPubSubConnection conn;
private final int subscriptionsPerConnection;
public PubSubEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) {
super();
this.conn = conn;
this.subscriptionsPerConnection = subscriptionsPerConnection;
this.semaphore = new Semaphore(subscriptionsPerConnection);
}
// 添加发布/订阅监听器
public void addListener(RedisPubSubListener listener) {
conn.addListener(listener);
}
// 移除发布/订阅监听器
public void removeListener(RedisPubSubListener listener) {
conn.removeListener(listener);
}
public boolean subscribe(RedisPubSubAdapter listener, Object channel) {
// 获取信号量成功就执行订阅操作
if (semaphore.tryAcquire()) {
conn.addListener(listener);
conn.subscribe(channel);
return true;
}
return false;
}
public void unsubscribe(Object channel) {
// 取消订阅,释放信号量
conn.unsubscribe(channel);
semaphore.release();
}
public boolean tryClose() {
if (semaphore.tryAcquire(subscriptionsPerConnection)) {
conn.close();
return true;
}
return false;
}
}
private final Logger log = LoggerFactory.getLogger(getClass());
// 保存可以重用的 Redis 连接。
private final Queue<RedisConnection> connections = new ConcurrentLinkedQueue<RedisConnection>();
// 发布订阅连接
private final Queue<PubSubEntry> pubSubConnections = new ConcurrentLinkedQueue<PubSubEntry>();
// Redis 客户端实例
private final List<RedisClient> clients = new ArrayList<RedisClient>();
private final Semaphore activeConnections; // 信号量控制活跃连接的数量 防止超过配置的连接池大小
private final RedisCodec codec; // 数据编解码策略
private final Config config; // 参数配置
private final LoadBalancer balancer; // 负载均衡策略
public ConnectionManager(Config config) {
// 遍历配置文件中的地址信息,给每个地址创建一个 RedisClient 实例,保存到 List 中备用。
for (URI address : config.getAddresses()) {
RedisClient client = new RedisClient(address.getHost(), address.getPort());
clients.add(client);
}
// 初始化负载均衡
balancer = config.getLoadBalancer();
balancer.init(clients);
codec = new RedisCodecWrapper(config.getCodec());
// 从配置文件中取最大连接池数量
activeConnections = new Semaphore(config.getConnectionPoolSize());
this.config = config;
}
public <K, V> RedisConnection<K, V> connection() {
// 尝试获取一个连接
acquireConnection();
// 从连接池中获取一个连接,没获取到就建立一个连接
RedisConnection<K, V> conn = connections.poll();
if (conn == null) {
conn = balancer.nextClient().connect(codec);
if (config.getPassword() != null) {
conn.auth(config.getPassword());
}
}
return conn;
}
// 频道订阅
public <K, V> PubSubEntry subscribe(RedisPubSubAdapter<K, V> listener, K channel) {
// 尝试在现有的连接上直接订阅,如果订阅成功就直接返回可用的 PubSubEntry
for (PubSubEntry entry : pubSubConnections) {
if (entry.subscribe(listener, channel)) {
return entry;
}
}
// 如果没有可用的连接,调用 acquireConnection 方法拿到新的连接。
acquireConnection();
// 从负载均衡列表中拿到一个连接
RedisPubSubConnection<K, V> conn = balancer.nextClient().connectPubSub(codec);
if (config.getPassword() != null) {
conn.auth(config.getPassword());
}
// 创建新的 PubSubEntry ,然后添加到队列中
PubSubEntry entry = new PubSubEntry(conn,config.getSubscriptionsPerConnection());
entry.subscribe(listener, channel);
pubSubConnections.add(entry);
return entry;
}
// 获取连接
private void acquireConnection() {
// 从信号量中获取一个许可,拿不到就使用 acquireUninterruptibly 方法阻塞当前线程
// 一直拿到到一个许可为止
if (!activeConnections.tryAcquire()) {
log.warn("Connection pool gets exhausted! Trying to acquire connection ...");
long time = System.currentTimeMillis();
activeConnections.acquireUninterruptibly();
long endTime = System.currentTimeMillis() - time;
log.warn("Connection acquired, time spended: {} ms", endTime);
}
}
// 取消指定频道的订阅
public <K> void unsubscribe(PubSubEntry entry, K channel) {
entry.unsubscribe(channel);
if (entry.tryClose()) {
pubSubConnections.remove(entry);
activeConnections.release();
}
}
// 释放连接
public void release(RedisConnection сonnection) {
// 释放信号量
activeConnections.release();
connections.add(сonnection);
}
// 遍历 clients 信息,循环调用每个 RedisClient 的 shutdown 方法
// 释放资源并关闭连接
public void shutdown() {
for (RedisClient client : clients) {
client.shutdown();
}
}
}
这个类算是很关键的一个组件了,其中设计到挺多的知识点,比如:
- 连接池机制。用连接池来管理多个 Redis 连接,减少连接建立的开销。
- 信号量,使用信号量来控制并发。
- 使用了负载均衡,这样可以自动选择合适的 Redis 节点。
3.3 负载均衡部分
负载均衡主要是指在连接 Redis 集群的时候,怎么样将请求分配到不同的 Redis 节点效率会更高。
可能集群节点配置不一样,有的节点高配,有的节点低配。 那么就需要高配的多分担一点请求,低配的少分担一点请求。
3.3.1 负载均衡接口定义 LoadBalancer
package org.redisson.connection;
import java.util.List;
import com.lambdaworks.redis.RedisClient;
public interface LoadBalancer {
void init(List<RedisClient> clients);
RedisClient nextClient();
}
3.3.2 随机策略实现 RandomLoadBalancer
这个比较简单就是随机从 Redis 节点列表中取一个返回。
package org.redisson.connection;
import java.util.List;
import java.util.Random;
import com.lambdaworks.redis.RedisClient;
public class RandomLoadBalancer implements LoadBalancer {
private final Random random = new Random();
private List<RedisClient> clients;
public void init(List<RedisClient> clients) {
this.clients = clients;
}
public RedisClient nextClient() {
// 生成一个随机数,随机数的范围在列表的大小范围内。 最后返回 clients 列表中该索引的对象
return clients.get(random.nextInt(clients.size()));
}
}
假设有三个 Redis 节点:
- 节点1:redis://127.0.0.1:6379
- 节点2:redis://127.0.0.2:6379
- 节点3:redis://127.0.0.3:6379
随机策略下,节点选择是随机的,可能会出现如下情况:
第一次请求:选择节点2
第二次请求:选择节点3
第三次请求:选择节点1
第四次请求:选择节点3
第五次请求:选择节点2
依此类推。。。
可以看到节点选择毫无规律,每一次都是随机选择。不过如果在大量请求的情况下,节点的选择还是会趋于均匀。
3.3.3 轮询策略实现 RoundRobinLoadBalancer
这种也比较好理解,就是请求按照顺序依次分配给每个 Redis 节点:
package org.redisson.connection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import com.lambdaworks.redis.RedisClient;
public class RoundRobinLoadBalancer implements LoadBalancer {
private final AtomicInteger index = new AtomicInteger(-1);
private List<RedisClient> clients;
@Override
public void init(List<RedisClient> clients) {
this.clients = clients;
}
@Override
public RedisClient nextClient() {
// 每次访问,上面这个 index 数值都会自增
// 使用取模运算, 确保下标在客户端列表范围内进行循环
// Math.abs() 确保下标为非负数
int ind = Math.abs(index.incrementAndGet() % clients.size());
return clients.get(ind);
}
}
假设有三个 Redis 节点:
- 节点1:redis://127.0.0.1:6379
- 节点2:redis://127.0.0.2:6379
- 节点3:redis://127.0.0.3:6379
轮询策略下,节点选择的顺序如下:
- 第一次请求:选择节点1
- 第二次请求:选择节点2
- 第三次请求:选择节点3
- 第四次请求:回到节点1
- 第五次请求:选择节点2
- 依此类推…
可以看出,轮训策略下,每一个节点分配的请求都是均匀的。
3.4 配置管理部分
Redisson 目前的配置管理还是比较简单的。
3.4.1 主要的参数配置类 Config
Redisson 参数配置类,目前这个版本配置还是相当的简单,可配置的属性少。
package org.redisson;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.codec.RedissonCodec;
import org.redisson.connection.LoadBalancer;
import org.redisson.connection.RoundRobinLoadBalancer;
public class Config {
// 负载均衡算法配置。 默认轮询算法
private LoadBalancer loadBalancer = new RoundRobinLoadBalancer();
// 数据编解码策略,默认 Jackson JSON 编解码
private RedissonCodec codec = new JsonJacksonCodec();
// 每个 Redis 连接的最大订阅数。 默认 5
private int subscriptionsPerConnection = 5;
// Redis 连接池的大小限制。 默认 100
private int connectionPoolSize = 100;
// 认证密码 默认为 null,表示不需要密码
private String password;
// Redis 服务器地址列表
private List<URI> addresses = new ArrayList<URI>();
public Config() {
}
Config(Config oldConf) {
setCodec(oldConf.getCodec());
setConnectionPoolSize(oldConf.getConnectionPoolSize());
setPassword(oldConf.getPassword());
setSubscriptionsPerConnection(oldConf.getSubscriptionsPerConnection());
setAddresses(oldConf.getAddresses());
setLoadBalancer(oldConf.getLoadBalancer());
}
public void setCodec(RedissonCodec codec) {
this.codec = codec;
}
public RedissonCodec getCodec() {
return codec;
}
public void setPassword(String password) {
this.password = password;
}
public String getPassword() {
return password;
}
public void setSubscriptionsPerConnection(int subscriptionsPerConnection) {
this.subscriptionsPerConnection = subscriptionsPerConnection;
}
public int getSubscriptionsPerConnection() {
return subscriptionsPerConnection;
}
public void setConnectionPoolSize(int connectionPoolSize) {
this.connectionPoolSize = connectionPoolSize;
}
public int getConnectionPoolSize() {
return connectionPoolSize;
}
// 添加 Redis 服务器地址 。 格式为 "host:port"
public void addAddress(String ... addressesVar) {
for (String address : addressesVar) {
try {
addresses.add(new URI("//" + address));
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Can't parse " + address);
}
}
}
public List<URI> getAddresses() {
return addresses;
}
void setAddresses(List<URI> addresses) {
this.addresses = addresses;
}
public void setLoadBalancer(LoadBalancer loadBalancer) {
this.loadBalancer = loadBalancer;
}
public LoadBalancer getLoadBalancer() {
return loadBalancer;
}
}