七的博客

Redisson2.0源码分析15-其他分布式相关工具

源码分析

Redisson2.0源码分析15-其他分布式相关工具

这篇主要是一些不太好具体分类的分布式工具或者特性支持。

目前有如下几个:

  • 分布式对象过期特性支持 RedissonExpirable
  • 单个值分布式对象 RBucket
  • 基数统计器 RHyperLogLog
  • 键管理 RKeys
  • 批量命令执行 RBatch
  • 执行Lua脚本 RScript

这些工具类的源码都不是特别的复杂,逻辑比较少。 稍微了解下即可。

1. 分布式对象过期特性支持 RedissonExpirable

package org.redisson;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.RExpirable;

import io.netty.util.concurrent.Future;

abstract class RedissonExpirable extends RedissonObject implements RExpirable {

    RedissonExpirable(CommandExecutor connectionManager, String name) {
        super(connectionManager, name);
    }

    @Override
    public boolean expire(long timeToLive, TimeUnit timeUnit) {
    	// 同步设置对象的过期时间
    	// 实际上调用异步方法并等待结果
        return commandExecutor.get(expireAsync(timeToLive, timeUnit));
    }

    @Override
    public Future<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit) {
    	// 异步设置对象的过期时间,使用 Redis 的 EXPIRE 命令
        return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.EXPIRE, getName(), timeUnit.toSeconds(timeToLive));
    }

    @Override
    public boolean expireAt(long timestamp) {
    	// 设置指定的时间过期
        return commandExecutor.get(expireAtAsync(timestamp));
    }

    @Override
    public Future<Boolean> expireAtAsync(long timestamp) {
        return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.EXPIREAT, getName(), timestamp);
    }

    @Override
    public boolean expireAt(Date timestamp) {
    	// 设置指定的时间过期,接收 Date 入参。 同步
        return expireAt(timestamp.getTime() / 1000);
    }

    @Override
    public Future<Boolean> expireAtAsync(Date timestamp) {
    	// 设置指定的时间过期,接收 Date 入参。 异步
        return expireAtAsync(timestamp.getTime() / 1000);
    }

    @Override
    public boolean clearExpire() {
    	// 清除对象的过期时间。 同步
        return commandExecutor.get(clearExpireAsync());
    }

    @Override
    public Future<Boolean> clearExpireAsync() {
    	// 清除对象的过期时间。使用Redis的PERSIST命令。 异步
        return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.PERSIST, getName());
    }

    @Override
    public long remainTimeToLive() {
    	// 获取对象的剩余生存时间。同步
        return commandExecutor.get(remainTimeToLiveAsync());
    }

    @Override
    public Future<Long> remainTimeToLiveAsync() {
    	// 获取对象的剩余生存时间。 使用Redis的TTL命令。 异步
        return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.TTL, getName());
    }

}

2. 单个值分布式对象 RBucket

存储和管理单个对象的 RBucket,没什么逻辑。

package org.redisson;

import java.util.concurrent.TimeUnit;

import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.RBucket;

import io.netty.util.concurrent.Future;

public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {

    protected RedissonBucket(CommandExecutor connectionManager, String name) {
        super(connectionManager, name);
    }

    // 使用 Redis 命令 GET 获取对象
    @Override
    public V get() {
        return get(getAsync());
    }

    @Override
    public Future<V> getAsync() {
        return commandExecutor.readAsync(getName(), RedisCommands.GET, getName());
    }

    // Redis命令SET保存对象
    @Override
    public void set(V value) {
        get(setAsync(value));
    }

    @Override
    public Future<Void> setAsync(V value) {
        return commandExecutor.writeAsync(getName(), RedisCommands.SET, getName(), value);
    }

    // Redis命令SET异步保存对象,指定过期时间
    @Override
    public void set(V value, long timeToLive, TimeUnit timeUnit) {
        get(setAsync(value, timeToLive, timeUnit));
    }

    @Override
    public Future<Void> setAsync(V value, long timeToLive, TimeUnit timeUnit) {
        return commandExecutor.writeAsync(getName(), RedisCommands.SETEX, getName(), timeUnit.toSeconds(timeToLive), value);
    }

    // Redis命令EXISTS 检查对象是否存在
    @Override
    public boolean exists() {
        return get(existsAsync());
    }

    @Override
    public Future<Boolean> existsAsync() {
        return commandExecutor.readAsync(getName(), RedisCommands.EXISTS, getName());
    }

}

3. 基数统计器 RHyperLogLog

HyperLogLog 本身是一种估算算法,但是 Redis 中将 HyperLogLog 定义成一种数据结构。

在 Redis 中 HyperLogLog 是一种用于估算集合中唯一元素数量的数据结构。 比如能够在内存占用很小的情况下,快速估算集合中不同元素的数量。

所以它的优点是占用内存小且计算速度快,缺点是结果是近似值,而不是精确值, 适合用于对精度要求不高的场景。

在实际项目中具备一定的使用场景,但是不是特别的常见。

package org.redisson;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;

import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.RHyperLogLog;

import io.netty.util.concurrent.Future;

public class RedissonHyperLogLog<V> extends RedissonExpirable implements RHyperLogLog<V> {

    protected RedissonHyperLogLog(CommandExecutor commandExecutor, String name) {
        super(commandExecutor, name);
    }

    // 将一个对象添加到 HyperLogLog 中
    @Override
    public boolean add(V obj) {
        return get(addAsync(obj));
    }

    //  将多个对象添加到 HyperLogLog 中
    @Override
    public boolean addAll(Collection<V> objects) {
        return get(addAllAsync(objects));
    }

    // 估算 HyperLogLog 中不重复元素的数量
    @Override
    public long count() {
        return get(countAsync());
    }

    // 估算当前 HyperLogLog 跟其他 HyperLogLog 合并基数
    @Override
    public long countWith(String... otherLogNames) {
        return get(countWithAsync(otherLogNames));
    }

    // 将当前的 HyperLogLog 跟其他的 HyperLogLog 合并
    @Override
    public void mergeWith(String... otherLogNames) {
        get(mergeWithAsync(otherLogNames));
    }

    @Override
    public Future<Boolean> addAsync(V obj) {
        return commandExecutor.writeAsync(getName(), RedisCommands.PFADD, getName(), obj);
    }

    @Override
    public Future<Boolean> addAllAsync(Collection<V> objects) {
        List<Object> args = new ArrayList<Object>(objects.size() + 1);
        args.add(getName());
        args.addAll(objects);
        // PFADD 命令添加对象
        return commandExecutor.writeAsync(getName(), RedisCommands.PFADD, getName(), args.toArray());
    }

    @Override
    public Future<Long> countAsync() {
        return commandExecutor.writeAsync(getName(), RedisCommands.PFCOUNT, getName());
    }

    @Override
    public Future<Long> countWithAsync(String... otherLogNames) {
        List<Object> args = new ArrayList<Object>(otherLogNames.length + 1);
        args.add(getName());
        args.addAll(Arrays.asList(otherLogNames));
        // PFCOUNT 命令获取基数估算值
        return commandExecutor.writeAsync(getName(), RedisCommands.PFCOUNT, args.toArray());
    }

    @Override
    public Future<Void> mergeWithAsync(String... otherLogNames) {
        List<Object> args = new ArrayList<Object>(otherLogNames.length + 1);
        args.add(getName());
        args.addAll(Arrays.asList(otherLogNames));
        // PFMERGE 合并多个 HyperLogLog
        return commandExecutor.writeAsync(getName(), RedisCommands.PFMERGE, args.toArray());
    }

}

4. 键管理 RKeys

这个类提供了对 Redis 键的管理操作,包括查找、删除和随机获取键等功能。

package org.redisson;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicLong;

import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.core.RKeys;
import org.redisson.misc.CompositeIterable;

import io.netty.util.concurrent.Future;

public class RedissonKeys implements RKeys {

    private final CommandExecutor commandExecutor;

    public RedissonKeys(CommandExecutor commandExecutor) {
        super();
        this.commandExecutor = commandExecutor;
    }

    // 获取匹配特定模式的所有键
    @Override
    public Iterable<String> getKeysByPattern(final String pattern) {
        List<Iterable<String>> iterables = new ArrayList<Iterable<String>>();
        for (final Integer slot : commandExecutor.getConnectionManager().getEntries().keySet()) {
            Iterable<String> iterable = new Iterable<String>() {
                @Override
                public Iterator<String> iterator() {
                    return createKeysIterator(slot, pattern);
                }
            };
            iterables.add(iterable);
        }
        return new CompositeIterable<String>(iterables);
    }

    // 取所有的键
    @Override
    public Iterable<String> getKeys() {
        List<Iterable<String>> iterables = new ArrayList<Iterable<String>>();
        for (final Integer slot : commandExecutor.getConnectionManager().getEntries().keySet()) {
            Iterable<String> iterable = new Iterable<String>() {
                @Override
                public Iterator<String> iterator() {
                    return createKeysIterator(slot, null);
                }
            };
            iterables.add(iterable);
        }
        return new CompositeIterable<String>(iterables);
    }

    // 获取一个遍历指定槽位中键的迭代器
    private ListScanResult<String> scanIterator(int slot, long startPos, String pattern) {
        if (pattern == null) {
            return commandExecutor.write(slot, StringCodec.INSTANCE, RedisCommands.SCAN, startPos);
        }
        return commandExecutor.write(slot, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern);
    }

    private Iterator<String> createKeysIterator(final int slot, final String pattern) {
        return new Iterator<String>() {

            private Iterator<String> iter;
            private Long iterPos;

            private boolean removeExecuted;
            private String value;

            @Override
            public boolean hasNext() {
                if (iter == null) {
                    ListScanResult<String> res = scanIterator(slot, 0, pattern);
                    iter = res.getValues().iterator();
                    iterPos = res.getPos();
                } else if (!iter.hasNext() && iterPos != 0) {
                    ListScanResult<String> res = scanIterator(slot, iterPos, pattern);
                    iter = res.getValues().iterator();
                    iterPos = res.getPos();
                }
                return iter.hasNext();
            }

            @Override
            public String next() {
                if (!hasNext()) {
                    throw new NoSuchElementException("No such element");
                }

                value = iter.next();
                removeExecuted = false;
                return value;
            }

            @Override
            public void remove() {
                if (removeExecuted) {
                    throw new IllegalStateException("Element been already deleted");
                }

                iter.remove();
                delete(value);
                removeExecuted = true;
            }

        };
    }

    // 随机获取一个 key
    @Override
    public String randomKey() {
        return commandExecutor.get(randomKeyAsync());
    }

    @Override
    public Future<String> randomKeyAsync() {
        // 使用 Redis 命令 RANDOMKEY 获取一个随机 key
        return commandExecutor.readRandomAsync(RedisCommands.RANDOM_KEY);
    }

   
    // 查找符合模式的 key 
    // 比如如下模式
    // h?llo 可以匹配 hello, hallo and hxllo
    // h*llo 可以匹配 hllo and heeeello
    // h[ae]llo s可以匹配  hello 跟 hallo, 但是不能匹配到 hillo

    @Override
    public Collection<String> findKeysByPattern(String pattern) {
        return commandExecutor.get(findKeysByPatternAsync(pattern));
    }

    
    @Override
    public Future<Collection<String>> findKeysByPatternAsync(String pattern) {
        // 使用 Redis 命令 KEYS 异步查找匹配的 key 
        return commandExecutor.readAllAsync(RedisCommands.KEYS, pattern);
    }

    // 删除指定模式的 key 
    // 模式跟上面扫描的是一样的
    @Override
    public long deleteByPattern(String pattern) {
        return commandExecutor.get(deleteByPatternAsync(pattern));
    }

    
    @Override
    public Future<Long> deleteByPatternAsync(String pattern) {
        return commandExecutor.evalWriteAllAsync(RedisCommands.EVAL_INTEGER, new SlotCallback<Long, Long>() {
            AtomicLong results = new AtomicLong();
            @Override
            public void onSlotResult(Long result) {
                results.addAndGet(result);
            }

            @Override
            public Long onFinish() {
                return results.get();
            }
            // 使用 Lua 脚本异步删除匹配的 key 
        }, "local keys = redis.call('keys', ARGV[1]) "
                + "local n = 0 "
                + "for i=1, table.getn(keys),5000 do "
                    + "n = n + redis.call('del', unpack(keys, i, math.min(i+4999, table.getn(keys)))) "
                + "end "
            + "return n;",Collections.emptyList(), pattern);
    }

    // 删除多个 key
    @Override
    public long delete(String ... keys) {
        return commandExecutor.get(deleteAsync(keys));
    }

   
    @Override
    public Future<Long> deleteAsync(String ... keys) {
        // 使用 Redis 命令 DEL 删除指定的 key
        return commandExecutor.writeAllAsync(RedisCommands.DEL, new SlotCallback<Long, Long>() {
            AtomicLong results = new AtomicLong();
            @Override
            public void onSlotResult(Long result) {
                results.addAndGet(result);
            }

            @Override
            public Long onFinish() {
                return results.get();
            }
        }, (Object[])keys);
    }


}

主要看这个删除符合模式的 key lua脚本:

假设要删除所有以 【user:*】 开头的 key,传递的参数 ARGV[1] 为 【user:*】。

--- 使用 Redis 命令 KEYS 查找所有匹配 ARGV[1] 模式的 key,将结果存储在 keys 中
local keys = redis.call('keys', ARGV[1])

--- 初始化一个计数器n,用于记录删除的键的数量。
local n = 0

--- for循环遍历 keys,每次步长为5000。  table.getn(keys) 返回 keys 的长度
for i=1, table.getn(keys),5000 do
    --- 使用DEL命令删除从 i 到 i+4999 之间的 key
    --- unpack 可以理解为从数组中取指定下标的元素值,确保不超过数组的长度
    --- 把删除 key 的数量累加到 n 里面
    n = n + redis.call('del', unpack(keys, i, math.min(i+4999, table.getn(keys))))
end
return n;

这里删除看着也有问题:

  • KEYS 命令会扫描 Redis 数据库中的所有 key,key 数量多的时候可能导致性能问题和阻塞。
  • 虽然做了每次最多删除 5000 个 key,但是批量删除在键数量非常多时 , 可能导致 Redis 负载过高。

不过即使替换成 scan 命令,在大批量删除的时候性能稍微好一点点。

5. 批量命令执行 RBatch

这个类主要是批量执行 Redis 命令,原理就是通过分布式对象的构造函数传入 CommandBatchExecutorService 来达到批量积攒命令的目的。

package org.redisson;

import java.util.List;

import org.redisson.connection.ConnectionManager;
import org.redisson.core.RAtomicLongAsync;
import org.redisson.core.RBatch;
import org.redisson.core.RBlockingQueueAsync;
import org.redisson.core.RBucketAsync;
import org.redisson.core.RDequeAsync;
import org.redisson.core.RHyperLogLogAsync;
import org.redisson.core.RKeysAsync;
import org.redisson.core.RListAsync;
import org.redisson.core.RMapAsync;
import org.redisson.core.RQueueAsync;
import org.redisson.core.RScriptAsync;
import org.redisson.core.RSetAsync;
import org.redisson.core.RTopicAsync;

import io.netty.util.concurrent.Future;

public class RedissonBatch implements RBatch {

    // 这个是核心,批量命令执行服务
    private final CommandBatchExecutorService executorService;

    public RedissonBatch(ConnectionManager connectionManager) {
        this.executorService = new CommandBatchExecutorService(connectionManager);
    }

    @Override
    public <V> RBucketAsync<V> getBucket(String name) {
        return new RedissonBucket<V>(executorService, name);
    }

    @Override
    public <V> RHyperLogLogAsync<V> getHyperLogLog(String name) {
        return new RedissonHyperLogLog<V>(executorService, name);
    }

    @Override
    public <V> RListAsync<V> getList(String name) {
        return new RedissonList<V>(executorService, name);
    }

    @Override
    public <K, V> RMapAsync<K, V> getMap(String name) {
        return new RedissonMap<K, V>(executorService, name);
    }

    @Override
    public <V> RSetAsync<V> getSet(String name) {
        return new RedissonSet<V>(executorService, name);
    }

    @Override
    public <M> RTopicAsync<M> getTopic(String name) {
        return new RedissonTopic<M>(executorService, name);
    }

    @Override
    public <V> RQueueAsync<V> getQueue(String name) {
        return new RedissonQueue<V>(executorService, name);
    }

    @Override
    public <V> RBlockingQueueAsync<V> getBlockingQueue(String name) {
        return new RedissonBlockingQueue<V>(executorService, name);
    }

    @Override
    public <V> RDequeAsync<V> getDequeAsync(String name) {
        return new RedissonDeque<V>(executorService, name);
    }

    @Override
    public RAtomicLongAsync getAtomicLongAsync(String name) {
        return new RedissonAtomicLong(executorService, name);
    }

    @Override
    public RScriptAsync getScript() {
        return new RedissonScript(executorService);
    }

    @Override
    public RKeysAsync getKeys() {
        return new RedissonKeys(executorService);
    }

    // 调用执行命令,这里实际上调用的是批量命令执行服务。
    // 会将队列里的命令一次性执行
    @Override
    public List<?> execute() {
        return executorService.execute();
    }

    @Override
    public Future<List<?>> executeAsync() {
        return executorService.executeAsync();
    }

}

6. 执行Lua脚本 RScript

这个类主要是用于在 Redis中执行Lua脚本。

package org.redisson;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.RScript;

import io.netty.util.concurrent.Future;

public class RedissonScript implements RScript {

    // 用于执行 Redis 命令执行
    private final CommandExecutor commandExecutor;

    protected RedissonScript(CommandExecutor commandExecutor) {
        this.commandExecutor = commandExecutor;
    }

    @Override
    public String scriptLoad(String luaScript) {
        return commandExecutor.get(scriptLoadAsync(luaScript));
    }

    public String scriptLoad(String key, String luaScript) {
        return commandExecutor.get(scriptLoadAsync(key, luaScript));
    }

    // 加载 Lua 脚本到所有 Redis 节点服务器,通过回调机制获取毁掉结果
    @Override
    public Future<String> scriptLoadAsync(String luaScript) {
        return commandExecutor.writeAllAsync(RedisCommands.SCRIPT_LOAD, new SlotCallback<String, String>() {
            volatile String result;
            @Override
            public void onSlotResult(String result) {
                this.result = result;
            }

            @Override
            public String onFinish() {
                return result;
            }
        }, luaScript);
    }

    // 加载 Lua 脚本到Redis服务器,并返回 lua 脚本的 SHA1 摘要
    public Future<String> scriptLoadAsync(String key, String luaScript) {
        return commandExecutor.writeAsync(key, RedisCommands.SCRIPT_LOAD, luaScript);
    }

    // 执行Lua脚本
    @Override
    public <R> R eval(Mode mode, String luaScript, ReturnType returnType) {
        // 传入null表示不指定键
        return eval(null, mode, luaScript, returnType);
    }

    public <R> R eval(String key, Mode mode, String luaScript, ReturnType returnType) {
        return eval(key, mode, luaScript, returnType, Collections.emptyList());
    }

    @Override
    public <R> R eval(Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values) {
        return eval(null, mode, luaScript, returnType, keys, values);
    }

    public <R> R eval(String key, Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values) {
        return (R) commandExecutor.get(evalAsync(key, mode, luaScript, returnType, keys, values));
    }

    @Override
    public <R> Future<R> evalAsync(Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values) {
        return evalAsync(null, mode, luaScript, returnType, keys, values);
    }

    public <R> Future<R> evalAsync(String key, Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values) {
        if (mode == Mode.READ_ONLY) {
            return commandExecutor.evalReadAsync(key, returnType.getCommand(), luaScript, keys, values);
        }
        return commandExecutor.evalWriteAsync(key, returnType.getCommand(), luaScript, keys, values);
    }


    // 执行已加载的指定 Lua 脚本,这里是通过 SHA1摘要 值指定一个脚本
    @Override
    public <R> R evalSha(Mode mode, String shaDigest, ReturnType returnType) {
        return evalSha(null, mode, shaDigest, returnType);
    }

    public <R> R evalSha(String key, Mode mode, String shaDigest, ReturnType returnType) {
        return evalSha(key, mode, shaDigest, returnType, Collections.emptyList());
    }

    @Override
    public <R> R evalSha(Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values) {
        return evalSha(null, mode, shaDigest, returnType, keys, values);
    }

    public <R> R evalSha(String key, Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values) {
        return (R) commandExecutor.get(evalShaAsync(key, mode, shaDigest, returnType, keys, values));
    }

    @Override
    public <R> Future<R> evalShaAsync(Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values) {
        return evalShaAsync(null, mode, shaDigest, returnType, keys, values);
    }

    public <R> Future<R> evalShaAsync(String key, Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values) {
        RedisCommand command = new RedisCommand(returnType.getCommand(), "EVALSHA");
        if (mode == Mode.READ_ONLY) {
            return commandExecutor.evalReadAsync(key, command, shaDigest, keys, values);
        }
        return commandExecutor.evalWriteAsync(key, command, shaDigest, keys, values);
    }

    // 终止当前正在执行的 lua 脚本
    @Override
    public void scriptKill() {
        commandExecutor.get(scriptKillAsync());
    }

    public void scriptKill(String key) {
        commandExecutor.get(scriptKillAsync(key));
    }

    @Override
    public Future<Void> scriptKillAsync() {
        return commandExecutor.writeAllAsync(RedisCommands.SCRIPT_KILL);
    }

    public Future<Void> scriptKillAsync(String key) {
        return commandExecutor.writeAsync(key, RedisCommands.SCRIPT_KILL);
    }

    // 查看指定的 Lua 脚本是否已加载到 Redis
    @Override
    public List<Boolean> scriptExists(String ... shaDigests) {
        return commandExecutor.get(scriptExistsAsync(shaDigests));
    }

    @Override
    public Future<List<Boolean>> scriptExistsAsync(final String ... shaDigests) {
        // SCRIPT_EXISTS 命令查看脚本是否存在
         return commandExecutor.writeAllAsync(RedisCommands.SCRIPT_EXISTS, new SlotCallback<List<Boolean>, List<Boolean>>() {
            volatile List<Boolean> result = new ArrayList<Boolean>(shaDigests.length);
            @Override
            public synchronized void onSlotResult(List<Boolean> result) {
                for (int i = 0; i < result.size(); i++) {
                    if (this.result.size() == i) {
                        this.result.add(false);
                    }
                    this.result.set(i, this.result.get(i) | result.get(i));
                }
            }

            @Override
            public List<Boolean> onFinish() {
                return new ArrayList<Boolean>(result);
            }
        }, (Object[])shaDigests);
    }

    public List<Boolean> scriptExists(String key, String ... shaDigests) {
        return commandExecutor.get(scriptExistsAsync(key, shaDigests));
    }

    public Future<List<Boolean>> scriptExistsAsync(String key, String ... shaDigests) {
        return commandExecutor.writeAsync(key, RedisCommands.SCRIPT_EXISTS, shaDigests);
    }


    // 清除Redis中所有已加载的Lua脚本
    @Override
    public void scriptFlush() {
        commandExecutor.get(scriptFlushAsync());
    }

    public void scriptFlush(String key) {
        commandExecutor.get(scriptFlushAsync(key));
    }

    @Override
    public Future<Void> scriptFlushAsync() {
        return commandExecutor.writeAllAsync(RedisCommands.SCRIPT_FLUSH);
    }

//    @Override
    public Future<Void> scriptFlushAsync(String key) {
        return commandExecutor.writeAsync(key, RedisCommands.SCRIPT_FLUSH);
    }

    @Override
    public <R> Future<R> evalShaAsync(Mode mode, String shaDigest, ReturnType returnType) {
        return evalShaAsync(null, mode, shaDigest, returnType, Collections.emptyList());
    }

    @Override
    public <R> Future<R> evalAsync(Mode mode, String luaScript, ReturnType returnType) {
        return evalAsync(null, mode, luaScript, returnType, Collections.emptyList());
    }

}