七的博客

Redisson2.0源码分析7-命令执行

源码分析

Redisson2.0源码分析7-命令执行

Redisson 中的命令执行也进行了封装,目前一共涉及到下面几个:

  • CommandExecutor 接口定义,定义了命令执行包含哪些方法。
  • CommandExecutorService 单个命令执行实现,也是逻辑最为复杂的一个类。
  • CommandBatchExecutorService 批量命令实现。

这几个类通常不对外进行使用,因为 Redisson 对外的工具类 org.redisson.Redisson 对这几个类进行了调用。

1. 命令执行器接口 CommandExecutor

这个接口提供了很多个方法来执行Redis命令,包括读、写和执行Lua脚本。同时支持同步和异步操作,允许使用不同的编解码器来处理数据的序列化和反序列化。 算是 Redisson 内部自己封装好的接口,然后提供给内部各个地方调用。

package org.redisson;

import java.util.Collection;
import java.util.List;

import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;

import io.netty.util.concurrent.Future;

public interface CommandExecutor {

    <T, R> Future<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params);

    <R, T> Future<R> writeAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params);

    ConnectionManager getConnectionManager();

    <T, R> Future<R> evalReadAsync(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

    <T, R> Future<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

    <T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

    <T, R> R evalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

    <T, R> Future<R> evalWriteAsync(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

    <T, R> Future<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

    <T, R> R evalWrite(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

    <T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);


    <R> R read(String key, SyncOperation<R> operation);

    <R> R write(String key, SyncOperation<R> operation);

    <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object ... params);

    <T, R> Future<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params);

    <T, R> R read(String key, RedisCommand<T> command, Object ... params);

    <T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params);

    <T, R> R write(String key, Codec codec, RedisCommand<T> command, Object ... params);

    <T, R> R write(String key, RedisCommand<T> command, Object ... params);

    <T, R> Future<Collection<R>> readAllAsync(RedisCommand<T> command, Object ... params);

    <T> Future<Void> writeAllAsync(RedisCommand<T> command, Object ... params);

    <T, R> Future<R> writeAsync(String key, RedisCommand<T> command, Object ... params);

    <T, R> Future<R> readAsync(String key, RedisCommand<T> command, Object ... params);

    <V> V get(Future<V> future);

    <T, R> Future<R> readRandomAsync(RedisCommand<T> command, Object ... params);

    <T, R> R write(Integer slot, Codec codec, RedisCommand<T> command, Object ... params);

}


2. 单个命令执行器实现 CommandExecutorService

这个类主要是对单个命令执行的逻辑封装。主要包括: - 执行 Redis 读、写、执行脚本等操作。 - 支持同步、异步操作。 - 执行命令过程中,出现异常可以进行自动重试,确保命令可以执行成功。 - 动态根据 Redis slot 来分发到对应的 Redis 节点。

package org.redisson;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.redisson.client.RedisConnection;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.connection.ConnectionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;


public class CommandExecutorService implements CommandExecutor {

    final Logger log = LoggerFactory.getLogger(getClass());

    // 负责管理与Redis的连接
    final ConnectionManager connectionManager;

    public CommandExecutorService(ConnectionManager connectionManager) {
        this.connectionManager = connectionManager;
    }

    @Override
    public ConnectionManager getConnectionManager() {
        return connectionManager;
    }

    // 在所有Redis节点上异步执行读命令
    public <T, R> Future<Collection<R>> readAllAsync(RedisCommand<T> command, Object ... params) {
        final Promise<Collection<R>> mainPromise = connectionManager.newPromise();

        // 处理从每个Redis节点读取的结果
        Promise<R> promise = new DefaultPromise<R>() {
            Queue<R> results = new ConcurrentLinkedQueue<R>();
            AtomicInteger counter = new AtomicInteger(connectionManager.getEntries().keySet().size());
            @Override
            public Promise<R> setSuccess(R result) {
                // 读取成功的时候回调函数
                // 主要是把结果放到队列中去
                if (result instanceof Collection) {
                    results.addAll((Collection)result);
                } else {
                    results.add(result);
                }

                // 如果所有节点都完成了读取操作,设置成功标记
                if (counter.decrementAndGet() == 0
                      && !mainPromise.isDone()) {
                    mainPromise.setSuccess(results);
                }
                return this;
            }

            @Override
            public Promise<R> setFailure(Throwable cause) {
                 // 如果有任何一个节点失败,将失败原因设置到 Promise 里面去 
                mainPromise.setFailure(cause);
                return this;
            }

        };

        // 遍历所有Redis节点,异步执行命令
        for (Integer slot : connectionManager.getEntries().keySet()) {
            async(true, slot, null, connectionManager.getCodec(), command, params, promise, 0);
        }
        return mainPromise;
    }

    // 随机从一个 Redis 节点读取数据
    public <T, R> Future<R> readRandomAsync(final RedisCommand<T> command, final Object ... params) {
        final Promise<R> mainPromise = connectionManager.newPromise();
        // 打乱 slot 的顺序更加随机一点
        final List<Integer> slots = new ArrayList<Integer>(connectionManager.getEntries().keySet());
        Collections.shuffle(slots);

        retryReadRandomAsync(command, mainPromise, slots, params);
        return mainPromise;
    }

     // 随机从一个 Redis 节点读取数据
    private <R, T> void retryReadRandomAsync(final RedisCommand<T> command, final Promise<R> mainPromise,
            final List<Integer> slots, final Object... params) {
        final Promise<R> attemptPromise = connectionManager.newPromise();
        attemptPromise.addListener(new FutureListener<R>() {
            @Override
            public void operationComplete(Future<R> future) throws Exception {
                if (future.isSuccess()) {
                    // 结果为空
                    if (future.getNow() == null) {
                        if (slots.isEmpty()) {
                            // 所有 slor 都尝试过了
                            mainPromise.setSuccess(null);
                        } else {
                            // 还有未尝试的 slor,继续尝试
                            retryReadRandomAsync(command, mainPromise, slots, params);
                        }
                    } else {
                        // 成功获取结果,将结果保存到 Promise
                        mainPromise.setSuccess(future.getNow());
                    }
                } else {
                    // 将失败原因设置到 Promise
                    mainPromise.setFailure(future.cause());
                }
            }
        });

        // 从随机打算的 slot 中获取首个元素
        Integer slot = slots.remove(0);
        async(true, slot, null, connectionManager.getCodec(), command, params, attemptPromise, 0);
    }

   // 下面 2 个函数是在所有Redis节点上异步执行一个写命令
    public <T> Future<Void> writeAllAsync(RedisCommand<T> command, Object ... params) {
        return writeAllAsync(command, null, params);
    }

    public <R, T> Future<R> writeAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params) {
        return allAsync(false, command, callback, params);
    }

    // 在所有节点上执行读或者写操作
    public <T, R> Future<R> allAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, Object ... params) {
        final Promise<R> mainPromise = connectionManager.newPromise();

        // 设置 Promise 回调
        Promise<T> promise = new DefaultPromise<T>() {
            AtomicInteger counter = new AtomicInteger(connectionManager.getEntries().keySet().size());
            @Override
            public Promise<T> setSuccess(T result) {
                // 有回调函数调用回调函数
                if (callback != null) {
                    callback.onSlotResult(result);
                }

                // 所有节点都处理完成
                if (counter.decrementAndGet() == 0) {
                    // 根据是否存在回调设置成功标记
                    if (callback != null) {
                        mainPromise.setSuccess(callback.onFinish());
                    } else {
                        mainPromise.setSuccess(null);
                    }
                }
                return this;
            }

            @Override
            public Promise<T> setFailure(Throwable cause) {
                mainPromise.setFailure(cause);
                return this;
            }
        };

        // 遍历所有Redis节点,异步执行命令
        for (Integer slot : connectionManager.getEntries().keySet()) {
            async(readOnlyMode, slot, null, connectionManager.getCodec(), command, params, promise, 0);
        }
        return mainPromise;
    }

    // 从Future中获取异步操作的结果
    public <V> V get(Future<V> future) {
        future.awaitUninterruptibly();
        if (future.isSuccess()) {
            return future.getNow();
        }
        throw future.cause() instanceof RedisException ?
                (RedisException) future.cause() :
                new RedisException("Unexpected exception while processing command", future.cause());
    }

    // 同步读取Redis数据
    public <T, R> R read(String key, RedisCommand<T> command, Object ... params) {
        return read(key, connectionManager.getCodec(), command, params);
    }

    public <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object ... params) {
        Future<R> res = readAsync(key, codec, command, params);
        return get(res);
    }

    // 异步读取Redis数据
    public <T, R> Future<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
        Promise<R> mainPromise = connectionManager.newPromise();
        // 计算 key 对应的槽位
        int slot = connectionManager.calcSlot(key);
        async(true, slot, null, codec, command, params, mainPromise, 0);
        return mainPromise;
    }

    // 同步写操作
    public <T, R> R write(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
        Future<R> res = writeAsync(slot, codec, command, params);
        return get(res);
    }

    // 异步写操作
    public <T, R> Future<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
        Promise<R> mainPromise = connectionManager.newPromise();
        async(false, slot, null, codec, command, params, mainPromise, 0);
        return mainPromise;
    }

    public <T, R> Future<R> readAsync(String key, RedisCommand<T> command, Object ... params) {
        return readAsync(key, connectionManager.getCodec(), command, params);
    }

    public <R> R write(String key, SyncOperation<R> operation) {
        int slot = connectionManager.calcSlot(key);
        return async(false, slot, operation, 0);
    }

    public <R> R read(String key, SyncOperation<R> operation) {
        int slot = connectionManager.calcSlot(key);
        return async(true, slot, operation, 0);
    }

    // 在指定的 Redis 节点上执行异步命令
    private <R> R async(boolean readOnlyMode, int slot, SyncOperation<R> operation, int attempt) {
        if (!connectionManager.getShutdownLatch().acquire()) {
            return null;
        }

        try {
            RedisConnection connection;
            // 根据是否为只读模式获取相应的 Redis 连接
            if (readOnlyMode) {
                connection = connectionManager.connectionReadOp(slot);
            } else {
                connection = connectionManager.connectionWriteOp(slot);
            }
            try {
                // 执行同步操作并返回结果
                return operation.execute(connectionManager.getCodec(), connection);
            } catch (RedisMovedException e) {
                return async(readOnlyMode, e.getSlot(), operation, attempt);
            } catch (RedisTimeoutException e) {
                // 超时重试
                if (attempt == connectionManager.getConfig().getRetryAttempts()) {
                    throw e;
                }
                attempt++;
                return async(readOnlyMode, slot, operation, attempt);
            } finally {
                // 释放连接关闭锁
                connectionManager.getShutdownLatch().release();
                if (readOnlyMode) {
                    connectionManager.releaseRead(slot, connection);
                } else {
                    connectionManager.releaseWrite(slot, connection);
                }
            }
        } catch (RedisException e) {
            // 到了最大重试次数抛异常
            if (attempt == connectionManager.getConfig().getRetryAttempts()) {
                throw e;
            }
            // 等待一段时间后重试
            try {
                Thread.sleep(connectionManager.getConfig().getRetryInterval());
            } catch (InterruptedException e1) {
                Thread.currentThread().interrupt();
            }
            // 增加重试次数
            attempt++;
            return async(readOnlyMode, slot, operation, attempt);
        }
    }

    public <T, R> Future<R> evalReadAsync(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        return evalAsync(true, key, connectionManager.getCodec(), evalCommandType, script, keys, params);
    }

    public <T, R> Future<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        return evalAsync(true, key, codec, evalCommandType, script, keys, params);
    }

    public <T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        return evalRead(key, connectionManager.getCodec(), evalCommandType, script, keys, params);
    }

    public <T, R> R evalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        Future<R> res = evalReadAsync(key, codec, evalCommandType, script, keys, params);
        return get(res);
    }

    public <T, R> Future<R> evalWriteAsync(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        return evalAsync(false, key, connectionManager.getCodec(), evalCommandType, script, keys, params);
    }

    public <T, R> Future<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        return evalAsync(false, key, codec, evalCommandType, script, keys, params);
    }

    public <T, R> Future<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params) {
        return evalAllAsync(false, command, callback, script, keys, params);
    }

    // 在所有 Redis 节点上异步执行 lua 脚本
    public <T, R> Future<R> evalAllAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params) {
        final Promise<R> mainPromise = connectionManager.newPromise();

        // Promise用于保存每个Redis节点的结果
        Promise<T> promise = new DefaultPromise<T>() {
            // 记录还有多少节点未返回结果
            AtomicInteger counter = new AtomicInteger(connectionManager.getEntries().keySet().size());
            @Override
            public Promise<T> setSuccess(T result) {
                callback.onSlotResult(result);

                // 所有节点都返回了结果,直接标记为成功
                if (counter.decrementAndGet() == 0
                      && !mainPromise.isDone()) {
                    mainPromise.setSuccess(callback.onFinish());
                }
                return this;
            }

            @Override
            public Promise<T> setFailure(Throwable cause) {
                mainPromise.setFailure(cause);
                return this;
            }
        };

        // 构建 lua 脚本参数
        List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
        args.add(script);
        args.add(keys.size());
        args.addAll(keys);
        args.addAll(Arrays.asList(params));
        // 在所有节点异步执行
        for (Integer slot : connectionManager.getEntries().keySet()) {
            async(readOnlyMode, slot, null, connectionManager.getCodec(), command, args.toArray(), promise, 0);
        }
        return mainPromise;
    }

    // 在特点节点执行 lua 脚本
    private <T, R> Future<R> evalAsync(boolean readOnlyMode, String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        Promise<R> mainPromise = connectionManager.newPromise();
        List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
        args.add(script);
        args.add(keys.size());
        args.addAll(keys);
        args.addAll(Arrays.asList(params));
        int slot = connectionManager.calcSlot(key);
        async(readOnlyMode, slot, null, codec, evalCommandType, args.toArray(), mainPromise, 0);
        return mainPromise;
    }

    public <T, R> R evalWrite(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        return evalWrite(key, connectionManager.getCodec(), evalCommandType, script, keys, params);
    }

    public <T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
        Future<R> res = evalWriteAsync(key, codec, evalCommandType, script, keys, params);
        return get(res);
    }

    public <T, R> R write(String key, RedisCommand<T> command, Object ... params) {
        Future<R> res = writeAsync(key, command, params);
        return get(res);
    }

    public <T, R> Future<R> writeAsync(String key, RedisCommand<T> command, Object ... params) {
        return writeAsync(key, connectionManager.getCodec(), command, params);
    }

    public <T, R> R write(String key, Codec codec, RedisCommand<T> command, Object ... params) {
        Future<R> res = writeAsync(key, codec, command, params);
        return get(res);
    }

    public <T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
        Promise<R> mainPromise = connectionManager.newPromise();
        int slot = connectionManager.calcSlot(key);
        async(false, slot, null, codec, command, params, mainPromise, 0);
        return mainPromise;
    }

    protected <V, R> void async(final boolean readOnlyMode, final int slot, final MultiDecoder<Object> messageDecoder, final Codec codec, final RedisCommand<V> command,
                            final Object[] params, final Promise<R> mainPromise, final int attempt) {
        if (!connectionManager.getShutdownLatch().acquire()) {
            mainPromise.setFailure(new IllegalStateException("Redisson is shutdown"));
            return;
        }

        // 处理当前尝试的结果
        final Promise<R> attemptPromise = connectionManager.newPromise();
        final AtomicReference<RedisException> ex = new AtomicReference<RedisException>();

        // 定义定时任务处理重试
        final TimerTask retryTimerTask = new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                // 尝试已经完成,直接返回
                if (attemptPromise.isDone()) {
                    return;
                }
                // 达到最大重试次数  直接返回
                if (attempt == connectionManager.getConfig().getRetryAttempts()) {
                    attemptPromise.setFailure(ex.get());
                    return;
                }
                // 尝试未取消 直接返回
                if (!attemptPromise.cancel(false)) {
                    return;
                }

                // 增加重试次数 再重试
                int count = attempt + 1;
                async(readOnlyMode, slot, messageDecoder, codec, command, params, mainPromise, count);
            }
        };

        try {
            org.redisson.client.RedisConnection connection;
            // 根据读写模式获取不同的连接
            if (readOnlyMode) {
                connection = connectionManager.connectionReadOp(slot);
            } else {
                connection = connectionManager.connectionWriteOp(slot);
            }
            log.debug("getting connection for command {} via slot {} using {}", command, slot, connection.getRedisClient().getAddr());

            // 发送命令到Redis
            ChannelFuture future = connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));

            ex.set(new RedisTimeoutException());
            final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);

            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        // 不成功超时任务就取消
                        timeout.cancel();
                        ex.set(new WriteRedisConnectionException(
                                "Can't send command: " + command + ", params: " + params + ", channel: " + future.channel(), future.cause()));
                        connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
                    }
                }
            });

            // 根据读写模式释放连接
            if (readOnlyMode) {
                attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout));
            } else {
                attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout));
            }
        } catch (RedisException e) {
            ex.set(e);
            connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
        }
        attemptPromise.addListener(new FutureListener<R>() {
            @Override
            public void operationComplete(Future<R> future) throws Exception {
                if (future.isCancelled()) {
                    return;
                }
                
                // 异常处理重试
                if (future.cause() instanceof RedisMovedException) {
                    RedisMovedException ex = (RedisMovedException)future.cause();
                    connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
                    async(readOnlyMode, ex.getSlot(), messageDecoder, codec, command, params, mainPromise, attempt);
                    return;
                }

                // 根据返回结果设置成功标记
                if (future.isSuccess()) {
                    mainPromise.setSuccess(future.getNow());
                } else {
                    mainPromise.setFailure(future.cause());
                }
            }
        });
    }

}

3. 批量命令执行器实现 CommandBatchExecutorService

这个类是 CommandExecutorService 的子类,在 CommandExecutorService 的基础上增加了批量执行的功能。主要做的逻辑有:

  • 支持将多个 Redis 命令发送到 Redis 服务端,提升效率。
  • 批量命令执行时按照添加顺序进行发送,返回结果时也会按照返回顺序进行处理。
  • 支持动态选择读写操作节点。
  • 支持执行异常时重试。

需要注意的是,因为是批量执行的,所以命令在添加时不会立即执行。 先暂存起来,等到到调用 execute() 或 executeAsync() 方法时才会批量执行。

因为是批量执行,所以这个类中实现的一些同步的方法默认都是不支持的。

package org.redisson;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.connection.ConnectionManager;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;

public class CommandBatchExecutorService extends CommandExecutorService {

    // 保存命令以及执行数据
    public static class CommandEntry implements Comparable<CommandEntry> {

        final CommandData<?, ?> command;  // 命令数据
        final int index;  // 命令顺序索引

        public CommandEntry(CommandData<?, ?> command, int index) {
            super();
            this.command = command;
            this.index = index;
        }

        public CommandData<?, ?> getCommand() {
            return command;
        }

        @Override
        public int compareTo(CommandEntry o) {
            // 按照索引顺序比较
            return index - o.index;
        }

    }

    public static class Entry {
         // 多生产者单消费者队列
        Queue<CommandEntry> commands = PlatformDependent.newMpscQueue();
         // 是否为只读模式
        volatile boolean readOnlyMode = true;

        public Queue<CommandEntry> getCommands() {
            return commands;
        }

        public void setReadOnlyMode(boolean readOnlyMode) {
            this.readOnlyMode = readOnlyMode;
        }

        public boolean isReadOnlyMode() {
            return readOnlyMode;
        }

    }
    // 用于生成命令的顺序索引
    private final AtomicInteger index = new AtomicInteger();
    // 保存命令 Map
    private ConcurrentMap<Integer, Entry> commands = PlatformDependent.newConcurrentHashMap();

    private boolean executed;   // 批处理执行标记

    public CommandBatchExecutorService(ConnectionManager connectionManager) {
        super(connectionManager);
    }

    @Override
    protected <V, R> void async(boolean readOnlyMode, int slot, MultiDecoder<Object> messageDecoder,
            Codec codec, RedisCommand<V> command, Object[] params, Promise<R> mainPromise, int attempt) {
        // 已经执行过抛异常
        if (executed) {
            throw new IllegalStateException("Batch already executed!");
        }

        // 获取指定槽位的命令条目
        Entry entry = commands.get(slot);
        // 不存在就创建
        if (entry == null) {
            entry = new Entry();
            Entry oldEntry = commands.putIfAbsent(slot, entry);
            // 存在就用已有的
            if (oldEntry != null) {
                entry = oldEntry;
            }
        }

        // 当前操作不是只读就设置非只读模式
        if (!readOnlyMode) {
            entry.setReadOnlyMode(false);
        }

        // 添加命令到队列中
        entry.getCommands().add(new CommandEntry(new CommandData<V, R>(mainPromise, messageDecoder, codec, command, params), index.incrementAndGet()));
    }

    public List<?> execute() {
        // 同步执行批处理命令并返回结果
        return get(executeAsync());
    }

    // 异步执行批处理命令
    public Future<List<?>> executeAsync() {
        // 如果已执行则抛出异常
        if (executed) {
            throw new IllegalStateException("Batch already executed!");
        }
        // 没有命令需要执行直接返回一个成功的Future
        if (commands.isEmpty()) {
            return connectionManager.getGroup().next().newSucceededFuture(null);
        }
        executed = true;  // 标记批处理已执行

        // 处理批处理的最终结果
        Promise<Void> voidPromise = connectionManager.newPromise();
        final Promise<List<?>> promise = connectionManager.newPromise();
        voidPromise.addListener(new FutureListener<Void>() {
            @Override
            public void operationComplete(Future<Void> future) throws Exception {
                // 失败设置Promise为失败状态
                if (!future.isSuccess()) {
                    promise.setFailure(future.cause());
                    return;
                }
                // 按顺序添加命令
                List<CommandEntry> entries = new ArrayList<CommandEntry>();
                for (Entry e : commands.values()) {
                    entries.addAll(e.getCommands());
                }
                // 排序
                Collections.sort(entries);
                List<Object> result = new ArrayList<Object>();
                // 结果也是按顺序
                for (CommandEntry commandEntry : entries) {
                    result.add(commandEntry.getCommand().getPromise().getNow());
                }
                promise.setSuccess(result);
                commands = null;
            }
        });
        // 依次执行
        for (java.util.Map.Entry<Integer, Entry> e : commands.entrySet()) {
            execute(e.getValue(), e.getKey(), voidPromise, new AtomicInteger(commands.size()), 0);
        }
        return promise;
    }

    // 在指定的Redis节点上执行批处理命令
    public void execute(final Entry entry, final int slot, final Promise<Void> mainPromise, final AtomicInteger slots, final int attempt) {
        if (!connectionManager.getShutdownLatch().acquire()) {
            mainPromise.setFailure(new IllegalStateException("Redisson is shutdown"));
            return;
        }

        final Promise<Void> attemptPromise = connectionManager.newPromise();
        final AtomicReference<RedisException> ex = new AtomicReference<RedisException>();

        // 跟上面套路差不多,定时重试
        final TimerTask retryTimerTask = new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                // 重试好了不重试
                if (attemptPromise.isDone()) {
                    return;
                }
                // 到了最大重试次数不重试
                if (attempt == connectionManager.getConfig().getRetryAttempts()) {
                    attemptPromise.setFailure(ex.get());
                    return;
                }
                attemptPromise.cancel(true);
                // 增加重试次数重试
                int count = attempt + 1;
                execute(entry, slot, mainPromise, slots, count);
            }
        };

        try {
            org.redisson.client.RedisConnection connection;
            // 根据模式选择不同的连接
            if (entry.isReadOnlyMode()) {
                connection = connectionManager.connectionReadOp(slot);
            } else {
                connection = connectionManager.connectionWriteOp(slot);
            }

            // 按顺序添加命令
            ArrayList<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(entry.getCommands().size());
            for (CommandEntry c : entry.getCommands()) {
                list.add(c.getCommand());
            }
            // 发送命令
            ChannelFuture future = connection.send(new CommandsData(attemptPromise, list));

            ex.set(new RedisTimeoutException());
            final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);

            // 添加回调处理
            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    // 发送不成功
                    if (!future.isSuccess()) {
                        // 取消超时任务
                        timeout.cancel();
                        ex.set(new WriteRedisConnectionException("channel: " + future.channel() + " closed"));
                        // 添加重试任务
                        connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
                    }
                }
            });
            // 根据读写模式添加监听器
            if (entry.isReadOnlyMode()) {
                attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout));
            } else {
                attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout));
            }
        } catch (RedisException e) {
            ex.set(e);
            connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
        }
        attemptPromise.addListener(new FutureListener<Void>() {
            @Override
            public void operationComplete(Future<Void> future) throws Exception {
                // 已经取消了不再处理
                if (future.isCancelled()) {
                    return;
                }
                // 元素位置移动了,就重新执行一次,从异常中获取槽位置
                if (future.cause() instanceof RedisMovedException) {
                    RedisMovedException ex = (RedisMovedException)future.cause();
                    execute(entry, ex.getSlot(), mainPromise, slots, attempt);
                    return;
                }
                // 成功并且所有的命令执行成功设置执行成功标记

                if (future.isSuccess()) {
                    if (slots.decrementAndGet() == 0) {
                        mainPromise.setSuccess(future.getNow());
                    }
                } else {
                    mainPromise.setFailure(future.cause());
                }
            }
        });
    }

    // 下面都是不支持的操作,因为这个类实现就是批量操作,所以不支持所有同步的方法。
    @Override
    public <T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys,
            Object... params) {
        throw new UnsupportedOperationException();
    }

    @Override
    public <T, R> R evalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script,
            List<Object> keys, Object... params) {
        throw new UnsupportedOperationException();
    }

    @Override
    public <T, R> R evalWrite(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys,
            Object... params) {
        throw new UnsupportedOperationException();
    }

    @Override
    public <T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script,
            List<Object> keys, Object... params) {
        throw new UnsupportedOperationException();
    }

    @Override
    public <R> R read(String key, SyncOperation<R> operation) {
        throw new UnsupportedOperationException();
    }

    @Override
    public <R> R write(String key, SyncOperation<R> operation) {
        throw new UnsupportedOperationException();
    }

    @Override
    public <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object... params) {
        throw new UnsupportedOperationException();
    }

    @Override
    public <T, R> R read(String key, RedisCommand<T> command, Object... params) {
        throw new UnsupportedOperationException();
    }

    @Override
    public <T, R> R write(String key, Codec codec, RedisCommand<T> command, Object... params) {
        throw new UnsupportedOperationException();
    }

    @Override
    public <T, R> R write(String key, RedisCommand<T> command, Object... params) {
        throw new UnsupportedOperationException();
    }

}