七的博客

Redisson2.0源码分析13-分布式Set

源码分析

Redisson2.0源码分析13-分布式Set

1. 分布式Set RSet

跟 V1 版本差不多。逻辑有一部分替换成 lua 实现。

1.1 源码分析

package org.redisson;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;

import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.core.RSet;

import io.netty.util.concurrent.Future;


public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {

    protected RedissonSet(CommandExecutor commandExecutor, String name) {
        super(commandExecutor, name);
    }

    // 获取集合的大小
    @Override
    public int size() {
        return get(sizeAsync());
    }

    @Override
    public Future<Integer> sizeAsync() {
        // 使用Redis的SCARD命令获取集合的元素数量
        return commandExecutor.readAsync(getName(), RedisCommands.SCARD, getName());
    }

    @Override
    public boolean isEmpty() {
        return size() == 0;
    }

    // 检查集合中是否包含指定的元素
    @Override
    public boolean contains(Object o) {
        return get(containsAsync(o));
    }

    @Override
    public Future<Boolean> containsAsync(Object o) {
        // 使用 Redis 的 SISMEMBER 命令实现
        return commandExecutor.readAsync(getName(), RedisCommands.SISMEMBER, getName(), o);
    }


    // 返回一个迭代器,用于遍历集合中的所有元素
    private ListScanResult<V> scanIterator(long startPos) {
        // SSCAN 命令实现分批次遍历
        return commandExecutor.read(getName(), RedisCommands.SSCAN, getName(), startPos);
    }

    // 实现的迭代器
    @Override
    public Iterator<V> iterator() {
        return new Iterator<V>() {

            private Iterator<V> iter;
            private Long iterPos;

            private boolean removeExecuted;
            private V value;

            @Override
            public boolean hasNext() {
                if (iter == null) {
                    ListScanResult<V> res = scanIterator(0);
                    iter = res.getValues().iterator();
                    iterPos = res.getPos();
                } else if (!iter.hasNext() && iterPos != 0) {
                    ListScanResult<V> res = scanIterator(iterPos);
                    iter = res.getValues().iterator();
                    iterPos = res.getPos();
                }
                return iter.hasNext();
            }

            @Override
            public V next() {
                if (!hasNext()) {
                    throw new NoSuchElementException("No such element at index");
                }

                value = iter.next();
                removeExecuted = false;
                return value;
            }

            @Override
            public void remove() {
                if (removeExecuted) {
                    throw new IllegalStateException("Element been already deleted");
                }

                // lazy init iterator
//                hasNext();
                iter.remove();
                RedissonSet.this.remove(value);
                removeExecuted = true;
            }

        };
    }

    // 异步地读取集合中的所有元素
    @Override
    public Future<Collection<V>> readAllAsync() {
        // SMEMBERS命令实现
        return commandExecutor.readAsync(getName(), RedisCommands.SMEMBERS, getName());
    }

    // 将集合中的元素转换为数组
    @Override
    public Object[] toArray() {
        List<Object> res = (List<Object>) get(readAllAsync());
        return res.toArray();
    }

    @Override
    public <T> T[] toArray(T[] a) {
        List<Object> res = (List<Object>) get(readAllAsync());
        return res.toArray(a);
    }

    //将元素添加到集合中
    @Override
    public boolean add(V e) {
        return get(addAsync(e));
    }

    @Override
    public Future<Boolean> addAsync(V e) {
        // SADD 命令实现
        return commandExecutor.writeAsync(getName(), RedisCommands.SADD_SINGLE, getName(), e);
    }

    // 移除并返回集合中的一个随机元素
    @Override
    public V removeRandom() {
        return get(removeRandomAsync());
    }

    @Override
    public Future<V> removeRandomAsync() {
        // SPOP命令实现
        return commandExecutor.writeAsync(getName(), RedisCommands.SPOP_SINGLE, getName());
    }

    // 移除指定的元素
    @Override
    public Future<Boolean> removeAsync(Object o) {
        // SREM命令实现
        return commandExecutor.writeAsync(getName(), RedisCommands.SREM_SINGLE, getName(), o);
    }

    @Override
    public boolean remove(Object value) {
        return get(removeAsync((V)value));
    }

    // 检查集合是否包含指定集合中的所有元素
    @Override
    public boolean containsAll(Collection<?> c) {
        return get(containsAllAsync(c));
    }

    @Override
    public Future<Boolean> containsAllAsync(Collection<?> c) {
        // 使用Lua脚本遍历集合元素进行检查
        return commandExecutor.evalReadAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
                "local s = redis.call('smembers', KEYS[1]);" +
                        "for i = 0, table.getn(s), 1 do " +
                            "for j = 0, table.getn(ARGV), 1 do "
                            + "if ARGV[j] == s[i] "
                            + "then table.remove(ARGV, j) end "
                        + "end; "
                       + "end;"
                       + "return table.getn(ARGV) == 0; ",
                Collections.<Object>singletonList(getName()), c.toArray());
    }

    // 将指定集合中的所有元素添加到当前集合
    @Override
    public boolean addAll(Collection<? extends V> c) {
        if (c.isEmpty()) {
            return false;
        }

        return get(addAllAsync(c));
    }

    @Override
    public Future<Boolean> addAllAsync(Collection<? extends V> c) {
        List<Object> args = new ArrayList<Object>(c.size() + 1);
        args.add(getName());
        args.addAll(c);
        return commandExecutor.writeAsync(getName(), RedisCommands.SADD, args.toArray());
    }

    // 保留指定的集合元素
    @Override
    public boolean retainAll(Collection<?> c) {
        return get(retainAllAsync(c));
    }

    @Override
    public Future<Boolean> retainAllAsync(Collection<?> c) {
        // 使用Lua脚本遍历并移除不在指定集合中的元素
        return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
                    "local changed = false " +
                    "local s = redis.call('smembers', KEYS[1]) "
                       + "local i = 0 "
                       + "while i <= table.getn(s) do "
                            + "local element = s[i] "
                            + "local isInAgrs = false "
                            + "for j = 0, table.getn(ARGV), 1 do "
                                + "if ARGV[j] == element then "
                                    + "isInAgrs = true "
                                    + "break "
                                + "end "
                            + "end "
                            + "if isInAgrs == false then "
                                + "redis.call('SREM', KEYS[1], element) "
                                + "changed = true "
                            + "end "
                            + "i = i + 1 "
                       + "end "
                       + "return changed ",
                Collections.<Object>singletonList(getName()), c.toArray());
    }

    // 移除当前集合中包含在指定集合中的所有元素
    @Override
    public Future<Boolean> removeAllAsync(Collection<?> c) {
        return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
                        "local v = false " +
                        "for i = 0, table.getn(ARGV), 1 do "
                            + "if redis.call('srem', KEYS[1], ARGV[i]) == 1 "
                            + "then v = true end "
                        +"end "
                       + "return v ",
                Collections.<Object>singletonList(getName()), c.toArray());
    }

    @Override
    public boolean removeAll(Collection<?> c) {
        return get(removeAllAsync(c));
    }

    // 清空集合中的所有元素
    @Override
    public void clear() {
        delete();
    }

}

1.2 核心 lua 脚本分析

1.2.1 containsAllAsync

  • KEYS[1]:集合的名称
  • ARGV:要检查的元素数组
--- 使用 smembers 命令获取集合的所有元素
local s = redis.call('smembers', KEYS[1]);

--- 遍历集合元素和指定的元素数组,移除匹配的元素
for i = 0, table.getn(s), 1 do 
    for j = 0, table.getn(ARGV), 1 do 
        if ARGV[j] == s[i] then 
            table.remove(ARGV, j) 
        end 
    end; 
end;

--- 如果指定的元素数组为空,返回true,表示集合包含所有指定元素.
---- 否则返回false
return table.getn(ARGV) == 0;

1.2.2 保留集合中包含在指定集合中的元素 retainAllAsync

  • KEYS[1]:集合的名称
  • ARGV:要保留的元素数组
local changed = false --- 标记集合是否发生了变化

--- 使用 smembers 命令获取 Redis 集合中所有的元素保存到 s 中
local s = redis.call('smembers', KEYS[1]) 

local i = 0   --- 初始化一个计数器i

--- i小于等于集合s的长度. table.getn(s)返回表s的长度
while i <= table.getn(s) do 
    local element = s[i]   --- 获取集合中的第i个元素
    local isInAgrs = false  --- 标记当前元素是否在参数列表中

    --- 历参数列表ARGV. table.getn(ARGV)返回参数列表的长度
    for j = 0, table.getn(ARGV), 1 do 

        --- 相等就设置为在参数列表中,跳出本次循环
        if ARGV[j] == element then 
            isInAgrs = true 
            break 
        end 
    end 

    --- 当前元素不在参数列表中
    if isInAgrs == false then 
        redis.call('SREM', KEYS[1], element)   --- 使用SREM命令从集合中移除该元素
        changed = true --- 变更标记设置为 true
    end 
    i = i + 1 
end 
return changed   --- true表示集合发生了变化,有元素被移除,false表示集合没有变化

1.2.3 移除集合中包含在指定集合中的所有元素 removeAllAsync

  • KEYS[1]:集合的名称
  • ARGV:要移除的元素数组
local v = false --- 移除元素标记

--- 遍历指定的元素数组
for i = 0, table.getn(ARGV), 1 do 
    ---- 每个元素使用 SREM 命令尝试从集合中移除
    if redis.call('srem', KEYS[1], ARGV[i]) == 1 then 
        --- 成功移除至少一个元素,标记为true
        v = true 
    end 
end 

--- 如果成功移除至少一个元素,返回true, 否则返回false
return v

2. 分布式SortSet RSortedSet

这个类主要是管理排序集合。

2.1 源码分析

package org.redisson;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.security.MessageDigest;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.SortedSet;
import java.util.concurrent.TimeUnit;

import org.redisson.client.RedisConnection;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.RSortedSet;

import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;


public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V> {

    // 自然顺序比较器
    private static class NaturalComparator<V> implements Comparator<V>, Serializable {

        private static final long serialVersionUID = 7207038068494060240L;

        static final NaturalComparator NATURAL_ORDER = new NaturalComparator();

        public int compare(V c1, V c2) {
            Comparable<Object> c1co = (Comparable<Object>) c1;
            Comparable<Object> c2co = (Comparable<Object>) c2;
            return c1co.compareTo(c2co);
        }

    }

    // 保存二分搜索的结果,包括找到的值和索引
    public static class BinarySearchResult<V> {

        private V value;
        private int index;

        public BinarySearchResult(V value) {
            super();
            this.value = value;
        }

        public BinarySearchResult() {
        }

        public void setIndex(int index) {
            this.index = index;
        }
        public int getIndex() {
            return index;
        }

        public V getValue() {
            return value;
        }


    }

    // 用于对集合中的元素进行排序。 默认使用自然排序
    private Comparator<? super V> comparator = NaturalComparator.NATURAL_ORDER;

    protected RedissonSortedSet(CommandExecutor commandExecutor, String name) {
        super(commandExecutor, name);

        loadComparator();
        // 使用 SETNX 命令在Redis中设置 key 的初始值为0
        commandExecutor.write(getName(), StringCodec.INSTANCE, RedisCommands.SETNX, getCurrentVersionKey(), 0L);
    }

    private void loadComparator() {
        commandExecutor.read(getName(), new SyncOperation<Void>() {
            @Override
            public Void execute(Codec codec, RedisConnection conn) {
                loadComparator(conn);
                return null;
            }
        });
    }

    private void loadComparator(RedisConnection connection) {
        try {
            // 获取比较器的签名
            String comparatorSign = connection.sync(StringCodec.INSTANCE, RedisCommands.GET, getComparatorKeyName());

            // 签名不为空则校验
            if (comparatorSign != null) {
                // 切分为类名和签名值
                String[] parts = comparatorSign.split(":");
                String className = parts[0];
                String sign = parts[1];

                // 计算类签名并进行比较
                String result = calcClassSign(className);
                // 不一致则抛出异常
                if (!result.equals(sign)) {
                    throw new IllegalStateException("Local class signature of " + className + " differs from used by this SortedSet!");
                }

                // 实例化比较器类赋值给 comparator
                Class<?> clazz = Class.forName(className);
                comparator = (Comparator<V>) clazz.newInstance();
            }
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }

    // 计算给定类的签名
    private static String calcClassSign(String name) {
        try {
            // 查找类
            Class<?> clazz = Class.forName(name);

            // 将对象序列化成字节数组
            ByteArrayOutputStream result = new ByteArrayOutputStream();
            ObjectOutputStream outputStream = new ObjectOutputStream(result);
            outputStream.writeObject(clazz);
            outputStream.close();

            // 使用 SHA-1 摘要算法计算字节数组的哈希值
            MessageDigest crypt = MessageDigest.getInstance("SHA-1");
            crypt.reset();
            crypt.update(result.toByteArray());

            // 将哈希值转换为十六进制字符串返回
            return new BigInteger(1, crypt.digest()).toString(16);
        } catch (Exception e) {
            throw new IllegalStateException("Can't calculate sign of " + name, e);
        }
    }

    // 返回集合的大小
    @Override
    public int size() {
        // LLEN命令获取列表的长度
        return commandExecutor.read(getName(), RedisCommands.LLEN, getName());
    }

    private int size(RedisConnection connection) {
        return connection.sync(RedisCommands.LLEN, getName()).intValue();
    }


    // 检查集合是否为空
    @Override
    public boolean isEmpty() {
        return size() == 0;
    }



    // 检查集合中是否包含指定的对象
    @Override
    public boolean contains(final Object o) {
        return commandExecutor.read(getName(), new SyncOperation<Boolean>() {
            @Override
            public Boolean execute(Codec codec, RedisConnection conn) {
                // 通过二分搜索查找对象,返回索引是否大于等于零
                return binarySearch((V)o, codec, conn).getIndex() >= 0;
            }
        });
    }

    // 内部迭代器,用于遍历集合中的元素
    public Iterator<V> iterator() {
        final int ind = 0;

        // 实现 JDK 迭代器
        return new Iterator<V>() {

            private int currentIndex = ind - 1;
            private boolean removeExecuted;  // 当前元素是否执行过移除

            @Override
            public boolean hasNext() {
                 // 获取集合的大小
                int size = size();
                // 判断有没有下一个元素
                return currentIndex+1 < size && size > 0;
            }

            @Override
            public V next() {
                // 没有下一个元素就抛出异常
                if (!hasNext()) {
                    throw new NoSuchElementException("No such element at index " + currentIndex);
                }
                // 当前索引要往前
                currentIndex++;
                // 重置移除标记
                removeExecuted = false;
                // 返回当前索引的元素
                return RedissonSortedSet.this.get(currentIndex);
            }

            @Override
            public void remove() {
                // 当前元素已经执行过移除
                if (removeExecuted) {
                    throw new IllegalStateException("Element been already deleted");
                }
                // 移除当前元素
                RedissonSortedSet.this.remove(currentIndex);
                // 当前下标往前移
                currentIndex--;
                // 设置移除标记
                removeExecuted = true;
            }

        };

    }


    // 移除指定索引元素
    private void remove(final int index) {
        commandExecutor.write(getName(), new SyncOperation<V>() {
            @Override
            public V execute(Codec codec, RedisConnection conn) {
                // 索引为0
                if (index == 0) {
                    // LPOP命令移除并返回列表的第一个元素
                    return conn.sync(codec, RedisCommands.LPOP, getName());
                }

                // 循环处理
                while (true) {
                    // 监视列表防止数据变动
                    conn.sync(RedisCommands.WATCH, getName());

                    // LRANGE 获取索引之后的所有元素
                    List<Object> tail = conn.sync(codec, RedisCommands.LRANGE, getName(), index + 1, size());

                    // 开启事务
                    conn.sync(RedisCommands.MULTI);

                    // LTRIM 保留索引之前的元素
                    conn.sync(codec, RedisCommands.LTRIM, getName(), 0, index - 1);

                    // 尾部空
                    if (tail.isEmpty()) {
                        // 事务执行成功返回 null
                        if (((List<Object>)conn.sync(codec, RedisCommands.EXEC)).size() == 1) {
                            return null;
                        }
                    } else {
                        // 将列表名称添加到头部
                        tail.add(0, getName());
                        /// RPUSH 将尾部元素重新推入列表
                        conn.sync(codec, RedisCommands.RPUSH, tail.toArray());
                         // 事务执行成功返回 null
                        if (((List<Object>)conn.sync(codec, RedisCommands.EXEC)).size() == 2) {
                            return null;
                        }
                    }
                }
            }
        });
    }

    // 获取指定索引元素
    private V get(final int index) {
        // LINDEX 命令获取
        return commandExecutor.read(getName(), RedisCommands.LINDEX, getName(), index);
    }

    // 以数组形式返回集合中所有元素
    @Override
    public Object[] toArray() {
        // 使用LRANGE命令获取列表的所有元素
        List<V> res = commandExecutor.read(getName(), RedisCommands.LRANGE, getName(), 0, -1);
        return res.toArray();
    }

    // 以数组形式返回集合中所有元素
    @Override
    public <T> T[] toArray(T[] a) {
        // 使用LRANGE命令获取列表的所有元素
        List<V> res = commandExecutor.read(getName(), RedisCommands.LRANGE, getName(), 0, -1);
        return res.toArray(a);
    }

    private String getCurrentVersionKey() {
        return "redisson__sortedset__version__{" + getName() + "}";
    }

    private Long getCurrentVersion(Codec codec, RedisConnection simpleConnection) {
        return simpleConnection.sync(LongCodec.INSTANCE, RedisCommands.GET, getCurrentVersionKey());
    }

    // 往向集合中添加一个元素
    @Override
    public boolean add(final V value) {
        return commandExecutor.write(getName(), new SyncOperation<Boolean>() {
            @Override
            public Boolean execute(Codec codec, RedisConnection conn) {
                return add(value, codec, conn);
            }
        });
    }

    // 异步往向集合中添加一个元素
    public Future<Boolean> addAsync(final V value) {
        EventLoop loop = commandExecutor.getConnectionManager().getGroup().next();
        final Promise<Boolean> promise = loop.newPromise();

        loop.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    boolean result = add(value);
                    promise.setSuccess(result);
                } catch (Exception e) {
                    promise.setFailure(e);
                }
            }
        });

        return promise;
    }

    boolean add(V value, Codec codec, RedisConnection connection) {
        // 无限循环,直到成功添加
        while (true) {
           // 监视集合和比较器键,防止数据变更
            connection.sync(RedisCommands.WATCH, getName(), getComparatorKeyName());

            checkComparator(connection);

            // 获取当前版本
            Long version = getCurrentVersion(codec, connection);

            // 执行二分搜索,查找插入位置
            BinarySearchResult<V> res = binarySearch(value, codec, connection);
            // 元素不存在
            if (res.getIndex() < 0) {
                // 如果版本不匹配,取消监视, 重新尝试
                if (!version.equals(getCurrentVersion(codec, connection))) {
                    connection.sync(RedisCommands.UNWATCH);
                    continue;
                }

                // 插入位置的基准元素
                V pivot = null;
                // 标记为在基准元素之前插入
                boolean before = false;
                int index = -(res.getIndex() + 1);  // 计算插入索引

                // 如果索引在集合范围内
                if (index < size()) {
                    before = true;  // 标记为在基准元素之前插入
                    // 获取基准元素
                    pivot = connection.sync(codec, RedisCommands.LINDEX, getName(), index);
                }

                // 开始事务
                connection.sync(RedisCommands.MULTI);
                // 索引超出集合范围
                if (index >= size()) {
                    // RPUSH 在列表末尾插入元素
                    connection.sync(codec, RedisCommands.RPUSH, getName(), value);
                } else {
                    // 在基准元素之前或之后插入元素
                    connection.sync(codec, RedisCommands.LINSERT, getName(), before ? "BEFORE" : "AFTER", pivot, value);
                }

                // 增加版本号
                connection.sync(RedisCommands.INCR, getCurrentVersionKey());
                // 执行事务
                List<Object> re = connection.sync(codec, RedisCommands.EXEC);
                // 事务执行成功返回成功
                if (re.size() == 2) {
                    return true;
                }
            } else {
                // 元素已存在,就取消监视,返回失败
                connection.sync(RedisCommands.UNWATCH);
                return false;
            }
        }
    }

    private void checkComparator(RedisConnection connection) {
        String comparatorSign = connection.sync(StringCodec.INSTANCE, RedisCommands.GET, getComparatorKeyName());
        if (comparatorSign != null) {
            String[] vals = comparatorSign.split(":");
            String className = vals[0];
            if (!comparator.getClass().getName().equals(className)) {
                    loadComparator(connection);
            }
        }
    }

    public static double calcIncrement(double value) {
        BigDecimal b = BigDecimal.valueOf(value);
        BigDecimal r = b.remainder(BigDecimal.ONE);
        if (r.compareTo(BigDecimal.ZERO) == 0) {
            return 1;
        }
        double res = 1/Math.pow(10, r.scale());
        return res;
    }

    // 异步地从集合中移除一个元素
    @Override
    public Future<Boolean> removeAsync(final V value) {
        EventLoopGroup group = commandExecutor.getConnectionManager().getGroup();
        final Promise<Boolean> promise = group.next().newPromise();

        group.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    // 调用同步的remove方法
                    boolean result = remove(value);
                    // 设置调用成功
                    promise.setSuccess(result);
                } catch (Exception e) {
                    promise.setFailure(e);
                }
            }
        });

        return promise;
    }

    // 从集合中移除一个元素
    @Override
    public boolean remove(final Object value) {
        return commandExecutor.write(getName(), new SyncOperation<Boolean>() {
            @Override
            public Boolean execute(Codec codec, RedisConnection conn) {
                return remove(value, codec, conn);
            }
        });
    }

    // 从集合中移除元素
    boolean remove(Object value, Codec codec, RedisConnection conn) {
         // 无限循环,直到成功移除
        while (true) {
            // 监视集合
            conn.sync(RedisCommands.WATCH, getName());

            // 执行二分搜索,查找元素位置
            BinarySearchResult<V> res = binarySearch((V) value, codec, conn);
            // 没有找到元素,取消监视,返回失败
            if (res.getIndex() < 0) {
                conn.sync(RedisCommands.UNWATCH);
                return false;
            }

            // 如果在第一个位置
            if (res.getIndex() == 0) {
                // 开启事务 >  使用LPOP命令移除第一个元素 > 判断事务执行成功直接返回成功
                conn.sync(RedisCommands.MULTI);
                conn.sync(codec, RedisCommands.LPOP, getName());
                if (((List<Object>)conn.sync(codec, RedisCommands.EXEC)).size() == 1) {
                    return true;
                }
            }

            // LRANGE 获取索引之后的所有元素
            List<Object> tail = conn.sync(codec, RedisCommands.LRANGE, getName(), res.getIndex() + 1, size());

            // 开启事务
            conn.sync(RedisCommands.MULTI);
            // LTRIM 裁剪列表, 只保留索引之前的元素
            conn.sync(RedisCommands.LTRIM, getName(), 0, res.getIndex() - 1);

            // 尾部元素为空
            if (tail.isEmpty()) {
                // 事务执行成功就直接返回成功
                if (((List<Object>)conn.sync(codec, RedisCommands.EXEC)).size() == 1) {
                    return true;
                }
            } else {
                // 尾部不为空将列表名称添加到头部
                tail.add(0, getName());

                // 将尾部元素重新推入列表  > 判断事务执行成功就返回成功
                conn.sync(codec, RedisCommands.RPUSH, tail.toArray());
                if (((List<Object>)conn.sync(codec, RedisCommands.EXEC)).size() == 2) {
                    return true;
                }
            }
        }
    }

    // 判断当前集合是否包含指定集合中的所有元素
    @Override
    public boolean containsAll(Collection<?> c) {
        for (Object object : c) {
            if (!contains(object)) {
                return false;
            }
        }
        return true;
    }


    // 将指定集合中的所有元素添加到当前集合中
    @Override
    public boolean addAll(Collection<? extends V> c) {
        boolean changed = false;  // 变更标记
        for (V v : c) { // 遍历集合元素处理
            if (add(v)) { // 尝试添加到集合中
                changed = true;  // 添加成功就设置变更标记
            }
        }
        return changed; // 返回变化标记
    }


    // 保留当前集合中同时存在于指定集合中的元素
    @Override
    public boolean retainAll(Collection<?> c) {
        boolean changed = false;
        // 迭代器遍历当前集合
        for (Iterator iterator = iterator(); iterator.hasNext();) {
            // 获取下一个元素判断是否在当前集合
            Object object = (Object) iterator.next();
            if (!c.contains(object)) {
                // 移除当前元素
                iterator.remove();
                changed = true;  
            }
        }
        return changed;  // 返回变更标记
    }

    // 移除指定的集合元素
    @Override
    public boolean removeAll(Collection<?> c) {
        boolean changed = false;
        // 遍历元素进行移除
        for (Object obj : c) {
            if (remove(obj)) {
                changed = true;
            }
        }
        return changed;
    }

    // 清空集合
    @Override
    public void clear() {
        delete();
    }

    @Override
    public Comparator<? super V> comparator() {
        return comparator;
    }

    // 返回当前集合中指定范围的子集
    @Override
    public SortedSet<V> subSet(V fromElement, V toElement) {
        throw new UnsupportedOperationException();
    }

    // 返回当前集合中小于指定元素的子集
    @Override
    public SortedSet<V> headSet(V toElement) {
        return subSet(null, toElement);
    }

    // 返回当前集合中大于等于指定元素的子集
    @Override
    public SortedSet<V> tailSet(V fromElement) {
        return subSet(fromElement, null);
    }

    //  获取集合中第一个元素
    @Override
    public V first() {
        // 使用LINDEX命令获取列表的第一个元素
        V res = commandExecutor.read(getName(), RedisCommands.LINDEX, getName(), 0);
        if (res == null) {
            throw new NoSuchElementException();
        }
        return res;
    }

    //  获取集合中最后一个元素
    @Override
    public V last() {
        // 使用LINDEX命令获取列表的最后一个元素
        V res = commandExecutor.read(getName(), RedisCommands.LINDEX, getName(), -1);
        if (res == null) {
            throw new NoSuchElementException();
        }
        return res;
    }

    private String getScoreKeyName(int index) {
        return "redisson__sortedset__score__" + getName() + "__" + index;
    }

    private String getComparatorKeyName() {
        return "redisson__sortedset__comparator__{" + getName() + "}";
    }


    @Override
    public boolean trySetComparator(Comparator<? super V> comparator) {
        String className = comparator.getClass().getName();
        final String comparatorSign = className + ":" + calcClassSign(className);

        Boolean res = commandExecutor.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN,
                "if redis.call('llen', KEYS[1]) == 0 then redis.call('set', KEYS[2], ARGV[1]); return true; "
                + "else return false; end",
                Arrays.<Object>asList(getName(), getComparatorKeyName()), comparatorSign);
        if (res) {
            this.comparator = comparator;
        }
        return res;
    }

    // 获取指定索引位置的值
    private V getAtIndex(Codec codec, int index, RedisConnection connection) {
        return connection.sync(codec, RedisCommands.LINDEX, getName(), index);
    }

 
    // 二分查找
    private BinarySearchResult<V> binarySearch(V value, Codec codec, RedisConnection connection, int lowerIndex, int upperIndex) {

        // 当低位索引小于等于高位索引时,继续搜索
        while (lowerIndex <= upperIndex) {
            // 计算中间索引,避免溢出
            int index = lowerIndex + (upperIndex - lowerIndex) / 2;

            // 获取中间元素
            V res = getAtIndex(codec, index, connection);
            // 比较目标值跟中间元素
            int cmp = comparator.compare(value, res);

            // 相等则找到了目标值
            if (cmp == 0) {
                // 封装结果返回
                BinarySearchResult<V> indexRes = new BinarySearchResult<V>();
                indexRes.setIndex(index);
                return indexRes;
            } else if (cmp < 0) {
                // 小于说明目标值在左半部分,调整搜索范围至左半部分
                upperIndex = index - 1;
            } else {
                // 大于说明目标值在右半部分,调整搜索范围至右半部分
                lowerIndex = index + 1;
            }
        }

        // 没有找到的情况, 返回一个负数,表示目标值的插入点
        BinarySearchResult<V> indexRes = new BinarySearchResult<V>();
        indexRes.setIndex(-(lowerIndex + 1));
        return indexRes;
    }

    public BinarySearchResult<V> binarySearch(V value, Codec codec, RedisConnection connection) {
        int upperIndex = size(connection) - 1;
        return binarySearch(value, codec, connection, 0, upperIndex);
    }

    double score(V value, RedisConnection connection, int indexDiff, boolean tail) {
        return -1;
    }

    public String toString() {
        Iterator<V> it = iterator();
        if (! it.hasNext())
            return "[]";

        StringBuilder sb = new StringBuilder();
        sb.append('[');
        for (;;) {
            V e = it.next();
            sb.append(e == this ? "(this Collection)" : e);
            if (! it.hasNext())
                return sb.append(']').toString();
            sb.append(',').append(' ');
        }
    }

}

2.2 核心 lua 脚本源码分析

2.2.1 设置比较器

  • KEYS[1]:集合的键,用于检查集合是否为空。
  • KEYS[2]:比较器键,用于存储比较器的签名。
  • ARGV[1]:比较器的签名,由类名和类签名组成。
--- 使用redis.call('llen', KEYS[1])获取集合的长度
--- 如果长度为0,表示集合为空
if redis.call('llen', KEYS[1]) == 0 then
    --- 使用 redis.call('set', KEYS[2], ARGV[1])将比较器签名存储在Redis中
    redis.call('set', KEYS[2], ARGV[1])
    return true -- true表示成功设置
else
    return false --- false,表示无法设置新的比较器
end