Redisson2.0源码分析7-命令执行
Redisson2.0源码分析7-命令执行
Redisson 中的命令执行也进行了封装,目前一共涉及到下面几个:
- CommandExecutor 接口定义,定义了命令执行包含哪些方法。
- CommandExecutorService 单个命令执行实现,也是逻辑最为复杂的一个类。
- CommandBatchExecutorService 批量命令实现。
这几个类通常不对外进行使用,因为 Redisson 对外的工具类 org.redisson.Redisson 对这几个类进行了调用。
1. 命令执行器接口 CommandExecutor
这个接口提供了很多个方法来执行Redis命令,包括读、写和执行Lua脚本。同时支持同步和异步操作,允许使用不同的编解码器来处理数据的序列化和反序列化。 算是 Redisson 内部自己封装好的接口,然后提供给内部各个地方调用。
package org.redisson;
import java.util.Collection;
import java.util.List;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
import io.netty.util.concurrent.Future;
public interface CommandExecutor {
<T, R> Future<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params);
<R, T> Future<R> writeAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params);
ConnectionManager getConnectionManager();
<T, R> Future<R> evalReadAsync(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> Future<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> R evalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> Future<R> evalWriteAsync(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> Future<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> R evalWrite(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<R> R read(String key, SyncOperation<R> operation);
<R> R write(String key, SyncOperation<R> operation);
<T, R> R read(String key, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> Future<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> R read(String key, RedisCommand<T> command, Object ... params);
<T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> R write(String key, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> R write(String key, RedisCommand<T> command, Object ... params);
<T, R> Future<Collection<R>> readAllAsync(RedisCommand<T> command, Object ... params);
<T> Future<Void> writeAllAsync(RedisCommand<T> command, Object ... params);
<T, R> Future<R> writeAsync(String key, RedisCommand<T> command, Object ... params);
<T, R> Future<R> readAsync(String key, RedisCommand<T> command, Object ... params);
<V> V get(Future<V> future);
<T, R> Future<R> readRandomAsync(RedisCommand<T> command, Object ... params);
<T, R> R write(Integer slot, Codec codec, RedisCommand<T> command, Object ... params);
}
2. 单个命令执行器实现 CommandExecutorService
这个类主要是对单个命令执行的逻辑封装。主要包括: - 执行 Redis 读、写、执行脚本等操作。 - 支持同步、异步操作。 - 执行命令过程中,出现异常可以进行自动重试,确保命令可以执行成功。 - 动态根据 Redis slot 来分发到对应的 Redis 节点。
package org.redisson;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.RedisConnection;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.connection.ConnectionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
public class CommandExecutorService implements CommandExecutor {
final Logger log = LoggerFactory.getLogger(getClass());
// 负责管理与Redis的连接
final ConnectionManager connectionManager;
public CommandExecutorService(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}
@Override
public ConnectionManager getConnectionManager() {
return connectionManager;
}
// 在所有Redis节点上异步执行读命令
public <T, R> Future<Collection<R>> readAllAsync(RedisCommand<T> command, Object ... params) {
final Promise<Collection<R>> mainPromise = connectionManager.newPromise();
// 处理从每个Redis节点读取的结果
Promise<R> promise = new DefaultPromise<R>() {
Queue<R> results = new ConcurrentLinkedQueue<R>();
AtomicInteger counter = new AtomicInteger(connectionManager.getEntries().keySet().size());
@Override
public Promise<R> setSuccess(R result) {
// 读取成功的时候回调函数
// 主要是把结果放到队列中去
if (result instanceof Collection) {
results.addAll((Collection)result);
} else {
results.add(result);
}
// 如果所有节点都完成了读取操作,设置成功标记
if (counter.decrementAndGet() == 0
&& !mainPromise.isDone()) {
mainPromise.setSuccess(results);
}
return this;
}
@Override
public Promise<R> setFailure(Throwable cause) {
// 如果有任何一个节点失败,将失败原因设置到 Promise 里面去
mainPromise.setFailure(cause);
return this;
}
};
// 遍历所有Redis节点,异步执行命令
for (Integer slot : connectionManager.getEntries().keySet()) {
async(true, slot, null, connectionManager.getCodec(), command, params, promise, 0);
}
return mainPromise;
}
// 随机从一个 Redis 节点读取数据
public <T, R> Future<R> readRandomAsync(final RedisCommand<T> command, final Object ... params) {
final Promise<R> mainPromise = connectionManager.newPromise();
// 打乱 slot 的顺序更加随机一点
final List<Integer> slots = new ArrayList<Integer>(connectionManager.getEntries().keySet());
Collections.shuffle(slots);
retryReadRandomAsync(command, mainPromise, slots, params);
return mainPromise;
}
// 随机从一个 Redis 节点读取数据
private <R, T> void retryReadRandomAsync(final RedisCommand<T> command, final Promise<R> mainPromise,
final List<Integer> slots, final Object... params) {
final Promise<R> attemptPromise = connectionManager.newPromise();
attemptPromise.addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
if (future.isSuccess()) {
// 结果为空
if (future.getNow() == null) {
if (slots.isEmpty()) {
// 所有 slor 都尝试过了
mainPromise.setSuccess(null);
} else {
// 还有未尝试的 slor,继续尝试
retryReadRandomAsync(command, mainPromise, slots, params);
}
} else {
// 成功获取结果,将结果保存到 Promise
mainPromise.setSuccess(future.getNow());
}
} else {
// 将失败原因设置到 Promise
mainPromise.setFailure(future.cause());
}
}
});
// 从随机打算的 slot 中获取首个元素
Integer slot = slots.remove(0);
async(true, slot, null, connectionManager.getCodec(), command, params, attemptPromise, 0);
}
// 下面 2 个函数是在所有Redis节点上异步执行一个写命令
public <T> Future<Void> writeAllAsync(RedisCommand<T> command, Object ... params) {
return writeAllAsync(command, null, params);
}
public <R, T> Future<R> writeAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params) {
return allAsync(false, command, callback, params);
}
// 在所有节点上执行读或者写操作
public <T, R> Future<R> allAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, Object ... params) {
final Promise<R> mainPromise = connectionManager.newPromise();
// 设置 Promise 回调
Promise<T> promise = new DefaultPromise<T>() {
AtomicInteger counter = new AtomicInteger(connectionManager.getEntries().keySet().size());
@Override
public Promise<T> setSuccess(T result) {
// 有回调函数调用回调函数
if (callback != null) {
callback.onSlotResult(result);
}
// 所有节点都处理完成
if (counter.decrementAndGet() == 0) {
// 根据是否存在回调设置成功标记
if (callback != null) {
mainPromise.setSuccess(callback.onFinish());
} else {
mainPromise.setSuccess(null);
}
}
return this;
}
@Override
public Promise<T> setFailure(Throwable cause) {
mainPromise.setFailure(cause);
return this;
}
};
// 遍历所有Redis节点,异步执行命令
for (Integer slot : connectionManager.getEntries().keySet()) {
async(readOnlyMode, slot, null, connectionManager.getCodec(), command, params, promise, 0);
}
return mainPromise;
}
// 从Future中获取异步操作的结果
public <V> V get(Future<V> future) {
future.awaitUninterruptibly();
if (future.isSuccess()) {
return future.getNow();
}
throw future.cause() instanceof RedisException ?
(RedisException) future.cause() :
new RedisException("Unexpected exception while processing command", future.cause());
}
// 同步读取Redis数据
public <T, R> R read(String key, RedisCommand<T> command, Object ... params) {
return read(key, connectionManager.getCodec(), command, params);
}
public <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Future<R> res = readAsync(key, codec, command, params);
return get(res);
}
// 异步读取Redis数据
public <T, R> Future<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
// 计算 key 对应的槽位
int slot = connectionManager.calcSlot(key);
async(true, slot, null, codec, command, params, mainPromise, 0);
return mainPromise;
}
// 同步写操作
public <T, R> R write(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
Future<R> res = writeAsync(slot, codec, command, params);
return get(res);
}
// 异步写操作
public <T, R> Future<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
async(false, slot, null, codec, command, params, mainPromise, 0);
return mainPromise;
}
public <T, R> Future<R> readAsync(String key, RedisCommand<T> command, Object ... params) {
return readAsync(key, connectionManager.getCodec(), command, params);
}
public <R> R write(String key, SyncOperation<R> operation) {
int slot = connectionManager.calcSlot(key);
return async(false, slot, operation, 0);
}
public <R> R read(String key, SyncOperation<R> operation) {
int slot = connectionManager.calcSlot(key);
return async(true, slot, operation, 0);
}
// 在指定的 Redis 节点上执行异步命令
private <R> R async(boolean readOnlyMode, int slot, SyncOperation<R> operation, int attempt) {
if (!connectionManager.getShutdownLatch().acquire()) {
return null;
}
try {
RedisConnection connection;
// 根据是否为只读模式获取相应的 Redis 连接
if (readOnlyMode) {
connection = connectionManager.connectionReadOp(slot);
} else {
connection = connectionManager.connectionWriteOp(slot);
}
try {
// 执行同步操作并返回结果
return operation.execute(connectionManager.getCodec(), connection);
} catch (RedisMovedException e) {
return async(readOnlyMode, e.getSlot(), operation, attempt);
} catch (RedisTimeoutException e) {
// 超时重试
if (attempt == connectionManager.getConfig().getRetryAttempts()) {
throw e;
}
attempt++;
return async(readOnlyMode, slot, operation, attempt);
} finally {
// 释放连接关闭锁
connectionManager.getShutdownLatch().release();
if (readOnlyMode) {
connectionManager.releaseRead(slot, connection);
} else {
connectionManager.releaseWrite(slot, connection);
}
}
} catch (RedisException e) {
// 到了最大重试次数抛异常
if (attempt == connectionManager.getConfig().getRetryAttempts()) {
throw e;
}
// 等待一段时间后重试
try {
Thread.sleep(connectionManager.getConfig().getRetryInterval());
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
// 增加重试次数
attempt++;
return async(readOnlyMode, slot, operation, attempt);
}
}
public <T, R> Future<R> evalReadAsync(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return evalAsync(true, key, connectionManager.getCodec(), evalCommandType, script, keys, params);
}
public <T, R> Future<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return evalAsync(true, key, codec, evalCommandType, script, keys, params);
}
public <T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return evalRead(key, connectionManager.getCodec(), evalCommandType, script, keys, params);
}
public <T, R> R evalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
Future<R> res = evalReadAsync(key, codec, evalCommandType, script, keys, params);
return get(res);
}
public <T, R> Future<R> evalWriteAsync(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return evalAsync(false, key, connectionManager.getCodec(), evalCommandType, script, keys, params);
}
public <T, R> Future<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return evalAsync(false, key, codec, evalCommandType, script, keys, params);
}
public <T, R> Future<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params) {
return evalAllAsync(false, command, callback, script, keys, params);
}
// 在所有 Redis 节点上异步执行 lua 脚本
public <T, R> Future<R> evalAllAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params) {
final Promise<R> mainPromise = connectionManager.newPromise();
// Promise用于保存每个Redis节点的结果
Promise<T> promise = new DefaultPromise<T>() {
// 记录还有多少节点未返回结果
AtomicInteger counter = new AtomicInteger(connectionManager.getEntries().keySet().size());
@Override
public Promise<T> setSuccess(T result) {
callback.onSlotResult(result);
// 所有节点都返回了结果,直接标记为成功
if (counter.decrementAndGet() == 0
&& !mainPromise.isDone()) {
mainPromise.setSuccess(callback.onFinish());
}
return this;
}
@Override
public Promise<T> setFailure(Throwable cause) {
mainPromise.setFailure(cause);
return this;
}
};
// 构建 lua 脚本参数
List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
args.add(script);
args.add(keys.size());
args.addAll(keys);
args.addAll(Arrays.asList(params));
// 在所有节点异步执行
for (Integer slot : connectionManager.getEntries().keySet()) {
async(readOnlyMode, slot, null, connectionManager.getCodec(), command, args.toArray(), promise, 0);
}
return mainPromise;
}
// 在特点节点执行 lua 脚本
private <T, R> Future<R> evalAsync(boolean readOnlyMode, String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
args.add(script);
args.add(keys.size());
args.addAll(keys);
args.addAll(Arrays.asList(params));
int slot = connectionManager.calcSlot(key);
async(readOnlyMode, slot, null, codec, evalCommandType, args.toArray(), mainPromise, 0);
return mainPromise;
}
public <T, R> R evalWrite(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return evalWrite(key, connectionManager.getCodec(), evalCommandType, script, keys, params);
}
public <T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
Future<R> res = evalWriteAsync(key, codec, evalCommandType, script, keys, params);
return get(res);
}
public <T, R> R write(String key, RedisCommand<T> command, Object ... params) {
Future<R> res = writeAsync(key, command, params);
return get(res);
}
public <T, R> Future<R> writeAsync(String key, RedisCommand<T> command, Object ... params) {
return writeAsync(key, connectionManager.getCodec(), command, params);
}
public <T, R> R write(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Future<R> res = writeAsync(key, codec, command, params);
return get(res);
}
public <T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key);
async(false, slot, null, codec, command, params, mainPromise, 0);
return mainPromise;
}
protected <V, R> void async(final boolean readOnlyMode, final int slot, final MultiDecoder<Object> messageDecoder, final Codec codec, final RedisCommand<V> command,
final Object[] params, final Promise<R> mainPromise, final int attempt) {
if (!connectionManager.getShutdownLatch().acquire()) {
mainPromise.setFailure(new IllegalStateException("Redisson is shutdown"));
return;
}
// 处理当前尝试的结果
final Promise<R> attemptPromise = connectionManager.newPromise();
final AtomicReference<RedisException> ex = new AtomicReference<RedisException>();
// 定义定时任务处理重试
final TimerTask retryTimerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 尝试已经完成,直接返回
if (attemptPromise.isDone()) {
return;
}
// 达到最大重试次数 直接返回
if (attempt == connectionManager.getConfig().getRetryAttempts()) {
attemptPromise.setFailure(ex.get());
return;
}
// 尝试未取消 直接返回
if (!attemptPromise.cancel(false)) {
return;
}
// 增加重试次数 再重试
int count = attempt + 1;
async(readOnlyMode, slot, messageDecoder, codec, command, params, mainPromise, count);
}
};
try {
org.redisson.client.RedisConnection connection;
// 根据读写模式获取不同的连接
if (readOnlyMode) {
connection = connectionManager.connectionReadOp(slot);
} else {
connection = connectionManager.connectionWriteOp(slot);
}
log.debug("getting connection for command {} via slot {} using {}", command, slot, connection.getRedisClient().getAddr());
// 发送命令到Redis
ChannelFuture future = connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));
ex.set(new RedisTimeoutException());
final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// 不成功超时任务就取消
timeout.cancel();
ex.set(new WriteRedisConnectionException(
"Can't send command: " + command + ", params: " + params + ", channel: " + future.channel(), future.cause()));
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
}
});
// 根据读写模式释放连接
if (readOnlyMode) {
attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout));
} else {
attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout));
}
} catch (RedisException e) {
ex.set(e);
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
attemptPromise.addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
if (future.isCancelled()) {
return;
}
// 异常处理重试
if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause();
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
async(readOnlyMode, ex.getSlot(), messageDecoder, codec, command, params, mainPromise, attempt);
return;
}
// 根据返回结果设置成功标记
if (future.isSuccess()) {
mainPromise.setSuccess(future.getNow());
} else {
mainPromise.setFailure(future.cause());
}
}
});
}
}
3. 批量命令执行器实现 CommandBatchExecutorService
这个类是 CommandExecutorService 的子类,在 CommandExecutorService 的基础上增加了批量执行的功能。主要做的逻辑有:
- 支持将多个 Redis 命令发送到 Redis 服务端,提升效率。
- 批量命令执行时按照添加顺序进行发送,返回结果时也会按照返回顺序进行处理。
- 支持动态选择读写操作节点。
- 支持执行异常时重试。
需要注意的是,因为是批量执行的,所以命令在添加时不会立即执行。 先暂存起来,等到到调用 execute() 或 executeAsync() 方法时才会批量执行。
因为是批量执行,所以这个类中实现的一些同步的方法默认都是不支持的。
package org.redisson;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.connection.ConnectionManager;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
public class CommandBatchExecutorService extends CommandExecutorService {
// 保存命令以及执行数据
public static class CommandEntry implements Comparable<CommandEntry> {
final CommandData<?, ?> command; // 命令数据
final int index; // 命令顺序索引
public CommandEntry(CommandData<?, ?> command, int index) {
super();
this.command = command;
this.index = index;
}
public CommandData<?, ?> getCommand() {
return command;
}
@Override
public int compareTo(CommandEntry o) {
// 按照索引顺序比较
return index - o.index;
}
}
public static class Entry {
// 多生产者单消费者队列
Queue<CommandEntry> commands = PlatformDependent.newMpscQueue();
// 是否为只读模式
volatile boolean readOnlyMode = true;
public Queue<CommandEntry> getCommands() {
return commands;
}
public void setReadOnlyMode(boolean readOnlyMode) {
this.readOnlyMode = readOnlyMode;
}
public boolean isReadOnlyMode() {
return readOnlyMode;
}
}
// 用于生成命令的顺序索引
private final AtomicInteger index = new AtomicInteger();
// 保存命令 Map
private ConcurrentMap<Integer, Entry> commands = PlatformDependent.newConcurrentHashMap();
private boolean executed; // 批处理执行标记
public CommandBatchExecutorService(ConnectionManager connectionManager) {
super(connectionManager);
}
@Override
protected <V, R> void async(boolean readOnlyMode, int slot, MultiDecoder<Object> messageDecoder,
Codec codec, RedisCommand<V> command, Object[] params, Promise<R> mainPromise, int attempt) {
// 已经执行过抛异常
if (executed) {
throw new IllegalStateException("Batch already executed!");
}
// 获取指定槽位的命令条目
Entry entry = commands.get(slot);
// 不存在就创建
if (entry == null) {
entry = new Entry();
Entry oldEntry = commands.putIfAbsent(slot, entry);
// 存在就用已有的
if (oldEntry != null) {
entry = oldEntry;
}
}
// 当前操作不是只读就设置非只读模式
if (!readOnlyMode) {
entry.setReadOnlyMode(false);
}
// 添加命令到队列中
entry.getCommands().add(new CommandEntry(new CommandData<V, R>(mainPromise, messageDecoder, codec, command, params), index.incrementAndGet()));
}
public List<?> execute() {
// 同步执行批处理命令并返回结果
return get(executeAsync());
}
// 异步执行批处理命令
public Future<List<?>> executeAsync() {
// 如果已执行则抛出异常
if (executed) {
throw new IllegalStateException("Batch already executed!");
}
// 没有命令需要执行直接返回一个成功的Future
if (commands.isEmpty()) {
return connectionManager.getGroup().next().newSucceededFuture(null);
}
executed = true; // 标记批处理已执行
// 处理批处理的最终结果
Promise<Void> voidPromise = connectionManager.newPromise();
final Promise<List<?>> promise = connectionManager.newPromise();
voidPromise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
// 失败设置Promise为失败状态
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
// 按顺序添加命令
List<CommandEntry> entries = new ArrayList<CommandEntry>();
for (Entry e : commands.values()) {
entries.addAll(e.getCommands());
}
// 排序
Collections.sort(entries);
List<Object> result = new ArrayList<Object>();
// 结果也是按顺序
for (CommandEntry commandEntry : entries) {
result.add(commandEntry.getCommand().getPromise().getNow());
}
promise.setSuccess(result);
commands = null;
}
});
// 依次执行
for (java.util.Map.Entry<Integer, Entry> e : commands.entrySet()) {
execute(e.getValue(), e.getKey(), voidPromise, new AtomicInteger(commands.size()), 0);
}
return promise;
}
// 在指定的Redis节点上执行批处理命令
public void execute(final Entry entry, final int slot, final Promise<Void> mainPromise, final AtomicInteger slots, final int attempt) {
if (!connectionManager.getShutdownLatch().acquire()) {
mainPromise.setFailure(new IllegalStateException("Redisson is shutdown"));
return;
}
final Promise<Void> attemptPromise = connectionManager.newPromise();
final AtomicReference<RedisException> ex = new AtomicReference<RedisException>();
// 跟上面套路差不多,定时重试
final TimerTask retryTimerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 重试好了不重试
if (attemptPromise.isDone()) {
return;
}
// 到了最大重试次数不重试
if (attempt == connectionManager.getConfig().getRetryAttempts()) {
attemptPromise.setFailure(ex.get());
return;
}
attemptPromise.cancel(true);
// 增加重试次数重试
int count = attempt + 1;
execute(entry, slot, mainPromise, slots, count);
}
};
try {
org.redisson.client.RedisConnection connection;
// 根据模式选择不同的连接
if (entry.isReadOnlyMode()) {
connection = connectionManager.connectionReadOp(slot);
} else {
connection = connectionManager.connectionWriteOp(slot);
}
// 按顺序添加命令
ArrayList<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(entry.getCommands().size());
for (CommandEntry c : entry.getCommands()) {
list.add(c.getCommand());
}
// 发送命令
ChannelFuture future = connection.send(new CommandsData(attemptPromise, list));
ex.set(new RedisTimeoutException());
final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
// 添加回调处理
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 发送不成功
if (!future.isSuccess()) {
// 取消超时任务
timeout.cancel();
ex.set(new WriteRedisConnectionException("channel: " + future.channel() + " closed"));
// 添加重试任务
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
}
});
// 根据读写模式添加监听器
if (entry.isReadOnlyMode()) {
attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout));
} else {
attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout));
}
} catch (RedisException e) {
ex.set(e);
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
attemptPromise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
// 已经取消了不再处理
if (future.isCancelled()) {
return;
}
// 元素位置移动了,就重新执行一次,从异常中获取槽位置
if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause();
execute(entry, ex.getSlot(), mainPromise, slots, attempt);
return;
}
// 成功并且所有的命令执行成功设置执行成功标记
if (future.isSuccess()) {
if (slots.decrementAndGet() == 0) {
mainPromise.setSuccess(future.getNow());
}
} else {
mainPromise.setFailure(future.cause());
}
}
});
}
// 下面都是不支持的操作,因为这个类实现就是批量操作,所以不支持所有同步的方法。
@Override
public <T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys,
Object... params) {
throw new UnsupportedOperationException();
}
@Override
public <T, R> R evalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script,
List<Object> keys, Object... params) {
throw new UnsupportedOperationException();
}
@Override
public <T, R> R evalWrite(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys,
Object... params) {
throw new UnsupportedOperationException();
}
@Override
public <T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script,
List<Object> keys, Object... params) {
throw new UnsupportedOperationException();
}
@Override
public <R> R read(String key, SyncOperation<R> operation) {
throw new UnsupportedOperationException();
}
@Override
public <R> R write(String key, SyncOperation<R> operation) {
throw new UnsupportedOperationException();
}
@Override
public <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object... params) {
throw new UnsupportedOperationException();
}
@Override
public <T, R> R read(String key, RedisCommand<T> command, Object... params) {
throw new UnsupportedOperationException();
}
@Override
public <T, R> R write(String key, Codec codec, RedisCommand<T> command, Object... params) {
throw new UnsupportedOperationException();
}
@Override
public <T, R> R write(String key, RedisCommand<T> command, Object... params) {
throw new UnsupportedOperationException();
}
}