Redisson2.0源码分析13-分布式Set
Redisson2.0源码分析13-分布式Set
1. 分布式Set RSet
跟 V1 版本差不多。逻辑有一部分替换成 lua 实现。
1.1 源码分析
package org.redisson;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.core.RSet;
import io.netty.util.concurrent.Future;
public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
protected RedissonSet(CommandExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
// 获取集合的大小
@Override
public int size() {
return get(sizeAsync());
}
@Override
public Future<Integer> sizeAsync() {
// 使用Redis的SCARD命令获取集合的元素数量
return commandExecutor.readAsync(getName(), RedisCommands.SCARD, getName());
}
@Override
public boolean isEmpty() {
return size() == 0;
}
// 检查集合中是否包含指定的元素
@Override
public boolean contains(Object o) {
return get(containsAsync(o));
}
@Override
public Future<Boolean> containsAsync(Object o) {
// 使用 Redis 的 SISMEMBER 命令实现
return commandExecutor.readAsync(getName(), RedisCommands.SISMEMBER, getName(), o);
}
// 返回一个迭代器,用于遍历集合中的所有元素
private ListScanResult<V> scanIterator(long startPos) {
// SSCAN 命令实现分批次遍历
return commandExecutor.read(getName(), RedisCommands.SSCAN, getName(), startPos);
}
// 实现的迭代器
@Override
public Iterator<V> iterator() {
return new Iterator<V>() {
private Iterator<V> iter;
private Long iterPos;
private boolean removeExecuted;
private V value;
@Override
public boolean hasNext() {
if (iter == null) {
ListScanResult<V> res = scanIterator(0);
iter = res.getValues().iterator();
iterPos = res.getPos();
} else if (!iter.hasNext() && iterPos != 0) {
ListScanResult<V> res = scanIterator(iterPos);
iter = res.getValues().iterator();
iterPos = res.getPos();
}
return iter.hasNext();
}
@Override
public V next() {
if (!hasNext()) {
throw new NoSuchElementException("No such element at index");
}
value = iter.next();
removeExecuted = false;
return value;
}
@Override
public void remove() {
if (removeExecuted) {
throw new IllegalStateException("Element been already deleted");
}
// lazy init iterator
// hasNext();
iter.remove();
RedissonSet.this.remove(value);
removeExecuted = true;
}
};
}
// 异步地读取集合中的所有元素
@Override
public Future<Collection<V>> readAllAsync() {
// SMEMBERS命令实现
return commandExecutor.readAsync(getName(), RedisCommands.SMEMBERS, getName());
}
// 将集合中的元素转换为数组
@Override
public Object[] toArray() {
List<Object> res = (List<Object>) get(readAllAsync());
return res.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
List<Object> res = (List<Object>) get(readAllAsync());
return res.toArray(a);
}
//将元素添加到集合中
@Override
public boolean add(V e) {
return get(addAsync(e));
}
@Override
public Future<Boolean> addAsync(V e) {
// SADD 命令实现
return commandExecutor.writeAsync(getName(), RedisCommands.SADD_SINGLE, getName(), e);
}
// 移除并返回集合中的一个随机元素
@Override
public V removeRandom() {
return get(removeRandomAsync());
}
@Override
public Future<V> removeRandomAsync() {
// SPOP命令实现
return commandExecutor.writeAsync(getName(), RedisCommands.SPOP_SINGLE, getName());
}
// 移除指定的元素
@Override
public Future<Boolean> removeAsync(Object o) {
// SREM命令实现
return commandExecutor.writeAsync(getName(), RedisCommands.SREM_SINGLE, getName(), o);
}
@Override
public boolean remove(Object value) {
return get(removeAsync((V)value));
}
// 检查集合是否包含指定集合中的所有元素
@Override
public boolean containsAll(Collection<?> c) {
return get(containsAllAsync(c));
}
@Override
public Future<Boolean> containsAllAsync(Collection<?> c) {
// 使用Lua脚本遍历集合元素进行检查
return commandExecutor.evalReadAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local s = redis.call('smembers', KEYS[1]);" +
"for i = 0, table.getn(s), 1 do " +
"for j = 0, table.getn(ARGV), 1 do "
+ "if ARGV[j] == s[i] "
+ "then table.remove(ARGV, j) end "
+ "end; "
+ "end;"
+ "return table.getn(ARGV) == 0; ",
Collections.<Object>singletonList(getName()), c.toArray());
}
// 将指定集合中的所有元素添加到当前集合
@Override
public boolean addAll(Collection<? extends V> c) {
if (c.isEmpty()) {
return false;
}
return get(addAllAsync(c));
}
@Override
public Future<Boolean> addAllAsync(Collection<? extends V> c) {
List<Object> args = new ArrayList<Object>(c.size() + 1);
args.add(getName());
args.addAll(c);
return commandExecutor.writeAsync(getName(), RedisCommands.SADD, args.toArray());
}
// 保留指定的集合元素
@Override
public boolean retainAll(Collection<?> c) {
return get(retainAllAsync(c));
}
@Override
public Future<Boolean> retainAllAsync(Collection<?> c) {
// 使用Lua脚本遍历并移除不在指定集合中的元素
return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local changed = false " +
"local s = redis.call('smembers', KEYS[1]) "
+ "local i = 0 "
+ "while i <= table.getn(s) do "
+ "local element = s[i] "
+ "local isInAgrs = false "
+ "for j = 0, table.getn(ARGV), 1 do "
+ "if ARGV[j] == element then "
+ "isInAgrs = true "
+ "break "
+ "end "
+ "end "
+ "if isInAgrs == false then "
+ "redis.call('SREM', KEYS[1], element) "
+ "changed = true "
+ "end "
+ "i = i + 1 "
+ "end "
+ "return changed ",
Collections.<Object>singletonList(getName()), c.toArray());
}
// 移除当前集合中包含在指定集合中的所有元素
@Override
public Future<Boolean> removeAllAsync(Collection<?> c) {
return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local v = false " +
"for i = 0, table.getn(ARGV), 1 do "
+ "if redis.call('srem', KEYS[1], ARGV[i]) == 1 "
+ "then v = true end "
+"end "
+ "return v ",
Collections.<Object>singletonList(getName()), c.toArray());
}
@Override
public boolean removeAll(Collection<?> c) {
return get(removeAllAsync(c));
}
// 清空集合中的所有元素
@Override
public void clear() {
delete();
}
}
1.2 核心 lua 脚本分析
1.2.1 containsAllAsync
- KEYS[1]:集合的名称
- ARGV:要检查的元素数组
--- 使用 smembers 命令获取集合的所有元素
local s = redis.call('smembers', KEYS[1]);
--- 遍历集合元素和指定的元素数组,移除匹配的元素
for i = 0, table.getn(s), 1 do
for j = 0, table.getn(ARGV), 1 do
if ARGV[j] == s[i] then
table.remove(ARGV, j)
end
end;
end;
--- 如果指定的元素数组为空,返回true,表示集合包含所有指定元素.
---- 否则返回false
return table.getn(ARGV) == 0;
1.2.2 保留集合中包含在指定集合中的元素 retainAllAsync
- KEYS[1]:集合的名称
- ARGV:要保留的元素数组
local changed = false --- 标记集合是否发生了变化
--- 使用 smembers 命令获取 Redis 集合中所有的元素保存到 s 中
local s = redis.call('smembers', KEYS[1])
local i = 0 --- 初始化一个计数器i
--- i小于等于集合s的长度. table.getn(s)返回表s的长度
while i <= table.getn(s) do
local element = s[i] --- 获取集合中的第i个元素
local isInAgrs = false --- 标记当前元素是否在参数列表中
--- 历参数列表ARGV. table.getn(ARGV)返回参数列表的长度
for j = 0, table.getn(ARGV), 1 do
--- 相等就设置为在参数列表中,跳出本次循环
if ARGV[j] == element then
isInAgrs = true
break
end
end
--- 当前元素不在参数列表中
if isInAgrs == false then
redis.call('SREM', KEYS[1], element) --- 使用SREM命令从集合中移除该元素
changed = true --- 变更标记设置为 true
end
i = i + 1
end
return changed --- true表示集合发生了变化,有元素被移除,false表示集合没有变化
1.2.3 移除集合中包含在指定集合中的所有元素 removeAllAsync
- KEYS[1]:集合的名称
- ARGV:要移除的元素数组
local v = false --- 移除元素标记
--- 遍历指定的元素数组
for i = 0, table.getn(ARGV), 1 do
---- 每个元素使用 SREM 命令尝试从集合中移除
if redis.call('srem', KEYS[1], ARGV[i]) == 1 then
--- 成功移除至少一个元素,标记为true
v = true
end
end
--- 如果成功移除至少一个元素,返回true, 否则返回false
return v
2. 分布式SortSet RSortedSet
这个类主要是管理排序集合。
2.1 源码分析
package org.redisson;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.security.MessageDigest;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.SortedSet;
import java.util.concurrent.TimeUnit;
import org.redisson.client.RedisConnection;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.RSortedSet;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V> {
// 自然顺序比较器
private static class NaturalComparator<V> implements Comparator<V>, Serializable {
private static final long serialVersionUID = 7207038068494060240L;
static final NaturalComparator NATURAL_ORDER = new NaturalComparator();
public int compare(V c1, V c2) {
Comparable<Object> c1co = (Comparable<Object>) c1;
Comparable<Object> c2co = (Comparable<Object>) c2;
return c1co.compareTo(c2co);
}
}
// 保存二分搜索的结果,包括找到的值和索引
public static class BinarySearchResult<V> {
private V value;
private int index;
public BinarySearchResult(V value) {
super();
this.value = value;
}
public BinarySearchResult() {
}
public void setIndex(int index) {
this.index = index;
}
public int getIndex() {
return index;
}
public V getValue() {
return value;
}
}
// 用于对集合中的元素进行排序。 默认使用自然排序
private Comparator<? super V> comparator = NaturalComparator.NATURAL_ORDER;
protected RedissonSortedSet(CommandExecutor commandExecutor, String name) {
super(commandExecutor, name);
loadComparator();
// 使用 SETNX 命令在Redis中设置 key 的初始值为0
commandExecutor.write(getName(), StringCodec.INSTANCE, RedisCommands.SETNX, getCurrentVersionKey(), 0L);
}
private void loadComparator() {
commandExecutor.read(getName(), new SyncOperation<Void>() {
@Override
public Void execute(Codec codec, RedisConnection conn) {
loadComparator(conn);
return null;
}
});
}
private void loadComparator(RedisConnection connection) {
try {
// 获取比较器的签名
String comparatorSign = connection.sync(StringCodec.INSTANCE, RedisCommands.GET, getComparatorKeyName());
// 签名不为空则校验
if (comparatorSign != null) {
// 切分为类名和签名值
String[] parts = comparatorSign.split(":");
String className = parts[0];
String sign = parts[1];
// 计算类签名并进行比较
String result = calcClassSign(className);
// 不一致则抛出异常
if (!result.equals(sign)) {
throw new IllegalStateException("Local class signature of " + className + " differs from used by this SortedSet!");
}
// 实例化比较器类赋值给 comparator
Class<?> clazz = Class.forName(className);
comparator = (Comparator<V>) clazz.newInstance();
}
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
// 计算给定类的签名
private static String calcClassSign(String name) {
try {
// 查找类
Class<?> clazz = Class.forName(name);
// 将对象序列化成字节数组
ByteArrayOutputStream result = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(result);
outputStream.writeObject(clazz);
outputStream.close();
// 使用 SHA-1 摘要算法计算字节数组的哈希值
MessageDigest crypt = MessageDigest.getInstance("SHA-1");
crypt.reset();
crypt.update(result.toByteArray());
// 将哈希值转换为十六进制字符串返回
return new BigInteger(1, crypt.digest()).toString(16);
} catch (Exception e) {
throw new IllegalStateException("Can't calculate sign of " + name, e);
}
}
// 返回集合的大小
@Override
public int size() {
// LLEN命令获取列表的长度
return commandExecutor.read(getName(), RedisCommands.LLEN, getName());
}
private int size(RedisConnection connection) {
return connection.sync(RedisCommands.LLEN, getName()).intValue();
}
// 检查集合是否为空
@Override
public boolean isEmpty() {
return size() == 0;
}
// 检查集合中是否包含指定的对象
@Override
public boolean contains(final Object o) {
return commandExecutor.read(getName(), new SyncOperation<Boolean>() {
@Override
public Boolean execute(Codec codec, RedisConnection conn) {
// 通过二分搜索查找对象,返回索引是否大于等于零
return binarySearch((V)o, codec, conn).getIndex() >= 0;
}
});
}
// 内部迭代器,用于遍历集合中的元素
public Iterator<V> iterator() {
final int ind = 0;
// 实现 JDK 迭代器
return new Iterator<V>() {
private int currentIndex = ind - 1;
private boolean removeExecuted; // 当前元素是否执行过移除
@Override
public boolean hasNext() {
// 获取集合的大小
int size = size();
// 判断有没有下一个元素
return currentIndex+1 < size && size > 0;
}
@Override
public V next() {
// 没有下一个元素就抛出异常
if (!hasNext()) {
throw new NoSuchElementException("No such element at index " + currentIndex);
}
// 当前索引要往前
currentIndex++;
// 重置移除标记
removeExecuted = false;
// 返回当前索引的元素
return RedissonSortedSet.this.get(currentIndex);
}
@Override
public void remove() {
// 当前元素已经执行过移除
if (removeExecuted) {
throw new IllegalStateException("Element been already deleted");
}
// 移除当前元素
RedissonSortedSet.this.remove(currentIndex);
// 当前下标往前移
currentIndex--;
// 设置移除标记
removeExecuted = true;
}
};
}
// 移除指定索引元素
private void remove(final int index) {
commandExecutor.write(getName(), new SyncOperation<V>() {
@Override
public V execute(Codec codec, RedisConnection conn) {
// 索引为0
if (index == 0) {
// LPOP命令移除并返回列表的第一个元素
return conn.sync(codec, RedisCommands.LPOP, getName());
}
// 循环处理
while (true) {
// 监视列表防止数据变动
conn.sync(RedisCommands.WATCH, getName());
// LRANGE 获取索引之后的所有元素
List<Object> tail = conn.sync(codec, RedisCommands.LRANGE, getName(), index + 1, size());
// 开启事务
conn.sync(RedisCommands.MULTI);
// LTRIM 保留索引之前的元素
conn.sync(codec, RedisCommands.LTRIM, getName(), 0, index - 1);
// 尾部空
if (tail.isEmpty()) {
// 事务执行成功返回 null
if (((List<Object>)conn.sync(codec, RedisCommands.EXEC)).size() == 1) {
return null;
}
} else {
// 将列表名称添加到头部
tail.add(0, getName());
/// RPUSH 将尾部元素重新推入列表
conn.sync(codec, RedisCommands.RPUSH, tail.toArray());
// 事务执行成功返回 null
if (((List<Object>)conn.sync(codec, RedisCommands.EXEC)).size() == 2) {
return null;
}
}
}
}
});
}
// 获取指定索引元素
private V get(final int index) {
// LINDEX 命令获取
return commandExecutor.read(getName(), RedisCommands.LINDEX, getName(), index);
}
// 以数组形式返回集合中所有元素
@Override
public Object[] toArray() {
// 使用LRANGE命令获取列表的所有元素
List<V> res = commandExecutor.read(getName(), RedisCommands.LRANGE, getName(), 0, -1);
return res.toArray();
}
// 以数组形式返回集合中所有元素
@Override
public <T> T[] toArray(T[] a) {
// 使用LRANGE命令获取列表的所有元素
List<V> res = commandExecutor.read(getName(), RedisCommands.LRANGE, getName(), 0, -1);
return res.toArray(a);
}
private String getCurrentVersionKey() {
return "redisson__sortedset__version__{" + getName() + "}";
}
private Long getCurrentVersion(Codec codec, RedisConnection simpleConnection) {
return simpleConnection.sync(LongCodec.INSTANCE, RedisCommands.GET, getCurrentVersionKey());
}
// 往向集合中添加一个元素
@Override
public boolean add(final V value) {
return commandExecutor.write(getName(), new SyncOperation<Boolean>() {
@Override
public Boolean execute(Codec codec, RedisConnection conn) {
return add(value, codec, conn);
}
});
}
// 异步往向集合中添加一个元素
public Future<Boolean> addAsync(final V value) {
EventLoop loop = commandExecutor.getConnectionManager().getGroup().next();
final Promise<Boolean> promise = loop.newPromise();
loop.execute(new Runnable() {
@Override
public void run() {
try {
boolean result = add(value);
promise.setSuccess(result);
} catch (Exception e) {
promise.setFailure(e);
}
}
});
return promise;
}
boolean add(V value, Codec codec, RedisConnection connection) {
// 无限循环,直到成功添加
while (true) {
// 监视集合和比较器键,防止数据变更
connection.sync(RedisCommands.WATCH, getName(), getComparatorKeyName());
checkComparator(connection);
// 获取当前版本
Long version = getCurrentVersion(codec, connection);
// 执行二分搜索,查找插入位置
BinarySearchResult<V> res = binarySearch(value, codec, connection);
// 元素不存在
if (res.getIndex() < 0) {
// 如果版本不匹配,取消监视, 重新尝试
if (!version.equals(getCurrentVersion(codec, connection))) {
connection.sync(RedisCommands.UNWATCH);
continue;
}
// 插入位置的基准元素
V pivot = null;
// 标记为在基准元素之前插入
boolean before = false;
int index = -(res.getIndex() + 1); // 计算插入索引
// 如果索引在集合范围内
if (index < size()) {
before = true; // 标记为在基准元素之前插入
// 获取基准元素
pivot = connection.sync(codec, RedisCommands.LINDEX, getName(), index);
}
// 开始事务
connection.sync(RedisCommands.MULTI);
// 索引超出集合范围
if (index >= size()) {
// RPUSH 在列表末尾插入元素
connection.sync(codec, RedisCommands.RPUSH, getName(), value);
} else {
// 在基准元素之前或之后插入元素
connection.sync(codec, RedisCommands.LINSERT, getName(), before ? "BEFORE" : "AFTER", pivot, value);
}
// 增加版本号
connection.sync(RedisCommands.INCR, getCurrentVersionKey());
// 执行事务
List<Object> re = connection.sync(codec, RedisCommands.EXEC);
// 事务执行成功返回成功
if (re.size() == 2) {
return true;
}
} else {
// 元素已存在,就取消监视,返回失败
connection.sync(RedisCommands.UNWATCH);
return false;
}
}
}
private void checkComparator(RedisConnection connection) {
String comparatorSign = connection.sync(StringCodec.INSTANCE, RedisCommands.GET, getComparatorKeyName());
if (comparatorSign != null) {
String[] vals = comparatorSign.split(":");
String className = vals[0];
if (!comparator.getClass().getName().equals(className)) {
loadComparator(connection);
}
}
}
public static double calcIncrement(double value) {
BigDecimal b = BigDecimal.valueOf(value);
BigDecimal r = b.remainder(BigDecimal.ONE);
if (r.compareTo(BigDecimal.ZERO) == 0) {
return 1;
}
double res = 1/Math.pow(10, r.scale());
return res;
}
// 异步地从集合中移除一个元素
@Override
public Future<Boolean> removeAsync(final V value) {
EventLoopGroup group = commandExecutor.getConnectionManager().getGroup();
final Promise<Boolean> promise = group.next().newPromise();
group.execute(new Runnable() {
@Override
public void run() {
try {
// 调用同步的remove方法
boolean result = remove(value);
// 设置调用成功
promise.setSuccess(result);
} catch (Exception e) {
promise.setFailure(e);
}
}
});
return promise;
}
// 从集合中移除一个元素
@Override
public boolean remove(final Object value) {
return commandExecutor.write(getName(), new SyncOperation<Boolean>() {
@Override
public Boolean execute(Codec codec, RedisConnection conn) {
return remove(value, codec, conn);
}
});
}
// 从集合中移除元素
boolean remove(Object value, Codec codec, RedisConnection conn) {
// 无限循环,直到成功移除
while (true) {
// 监视集合
conn.sync(RedisCommands.WATCH, getName());
// 执行二分搜索,查找元素位置
BinarySearchResult<V> res = binarySearch((V) value, codec, conn);
// 没有找到元素,取消监视,返回失败
if (res.getIndex() < 0) {
conn.sync(RedisCommands.UNWATCH);
return false;
}
// 如果在第一个位置
if (res.getIndex() == 0) {
// 开启事务 > 使用LPOP命令移除第一个元素 > 判断事务执行成功直接返回成功
conn.sync(RedisCommands.MULTI);
conn.sync(codec, RedisCommands.LPOP, getName());
if (((List<Object>)conn.sync(codec, RedisCommands.EXEC)).size() == 1) {
return true;
}
}
// LRANGE 获取索引之后的所有元素
List<Object> tail = conn.sync(codec, RedisCommands.LRANGE, getName(), res.getIndex() + 1, size());
// 开启事务
conn.sync(RedisCommands.MULTI);
// LTRIM 裁剪列表, 只保留索引之前的元素
conn.sync(RedisCommands.LTRIM, getName(), 0, res.getIndex() - 1);
// 尾部元素为空
if (tail.isEmpty()) {
// 事务执行成功就直接返回成功
if (((List<Object>)conn.sync(codec, RedisCommands.EXEC)).size() == 1) {
return true;
}
} else {
// 尾部不为空将列表名称添加到头部
tail.add(0, getName());
// 将尾部元素重新推入列表 > 判断事务执行成功就返回成功
conn.sync(codec, RedisCommands.RPUSH, tail.toArray());
if (((List<Object>)conn.sync(codec, RedisCommands.EXEC)).size() == 2) {
return true;
}
}
}
}
// 判断当前集合是否包含指定集合中的所有元素
@Override
public boolean containsAll(Collection<?> c) {
for (Object object : c) {
if (!contains(object)) {
return false;
}
}
return true;
}
// 将指定集合中的所有元素添加到当前集合中
@Override
public boolean addAll(Collection<? extends V> c) {
boolean changed = false; // 变更标记
for (V v : c) { // 遍历集合元素处理
if (add(v)) { // 尝试添加到集合中
changed = true; // 添加成功就设置变更标记
}
}
return changed; // 返回变化标记
}
// 保留当前集合中同时存在于指定集合中的元素
@Override
public boolean retainAll(Collection<?> c) {
boolean changed = false;
// 迭代器遍历当前集合
for (Iterator iterator = iterator(); iterator.hasNext();) {
// 获取下一个元素判断是否在当前集合
Object object = (Object) iterator.next();
if (!c.contains(object)) {
// 移除当前元素
iterator.remove();
changed = true;
}
}
return changed; // 返回变更标记
}
// 移除指定的集合元素
@Override
public boolean removeAll(Collection<?> c) {
boolean changed = false;
// 遍历元素进行移除
for (Object obj : c) {
if (remove(obj)) {
changed = true;
}
}
return changed;
}
// 清空集合
@Override
public void clear() {
delete();
}
@Override
public Comparator<? super V> comparator() {
return comparator;
}
// 返回当前集合中指定范围的子集
@Override
public SortedSet<V> subSet(V fromElement, V toElement) {
throw new UnsupportedOperationException();
}
// 返回当前集合中小于指定元素的子集
@Override
public SortedSet<V> headSet(V toElement) {
return subSet(null, toElement);
}
// 返回当前集合中大于等于指定元素的子集
@Override
public SortedSet<V> tailSet(V fromElement) {
return subSet(fromElement, null);
}
// 获取集合中第一个元素
@Override
public V first() {
// 使用LINDEX命令获取列表的第一个元素
V res = commandExecutor.read(getName(), RedisCommands.LINDEX, getName(), 0);
if (res == null) {
throw new NoSuchElementException();
}
return res;
}
// 获取集合中最后一个元素
@Override
public V last() {
// 使用LINDEX命令获取列表的最后一个元素
V res = commandExecutor.read(getName(), RedisCommands.LINDEX, getName(), -1);
if (res == null) {
throw new NoSuchElementException();
}
return res;
}
private String getScoreKeyName(int index) {
return "redisson__sortedset__score__" + getName() + "__" + index;
}
private String getComparatorKeyName() {
return "redisson__sortedset__comparator__{" + getName() + "}";
}
@Override
public boolean trySetComparator(Comparator<? super V> comparator) {
String className = comparator.getClass().getName();
final String comparatorSign = className + ":" + calcClassSign(className);
Boolean res = commandExecutor.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN,
"if redis.call('llen', KEYS[1]) == 0 then redis.call('set', KEYS[2], ARGV[1]); return true; "
+ "else return false; end",
Arrays.<Object>asList(getName(), getComparatorKeyName()), comparatorSign);
if (res) {
this.comparator = comparator;
}
return res;
}
// 获取指定索引位置的值
private V getAtIndex(Codec codec, int index, RedisConnection connection) {
return connection.sync(codec, RedisCommands.LINDEX, getName(), index);
}
// 二分查找
private BinarySearchResult<V> binarySearch(V value, Codec codec, RedisConnection connection, int lowerIndex, int upperIndex) {
// 当低位索引小于等于高位索引时,继续搜索
while (lowerIndex <= upperIndex) {
// 计算中间索引,避免溢出
int index = lowerIndex + (upperIndex - lowerIndex) / 2;
// 获取中间元素
V res = getAtIndex(codec, index, connection);
// 比较目标值跟中间元素
int cmp = comparator.compare(value, res);
// 相等则找到了目标值
if (cmp == 0) {
// 封装结果返回
BinarySearchResult<V> indexRes = new BinarySearchResult<V>();
indexRes.setIndex(index);
return indexRes;
} else if (cmp < 0) {
// 小于说明目标值在左半部分,调整搜索范围至左半部分
upperIndex = index - 1;
} else {
// 大于说明目标值在右半部分,调整搜索范围至右半部分
lowerIndex = index + 1;
}
}
// 没有找到的情况, 返回一个负数,表示目标值的插入点
BinarySearchResult<V> indexRes = new BinarySearchResult<V>();
indexRes.setIndex(-(lowerIndex + 1));
return indexRes;
}
public BinarySearchResult<V> binarySearch(V value, Codec codec, RedisConnection connection) {
int upperIndex = size(connection) - 1;
return binarySearch(value, codec, connection, 0, upperIndex);
}
double score(V value, RedisConnection connection, int indexDiff, boolean tail) {
return -1;
}
public String toString() {
Iterator<V> it = iterator();
if (! it.hasNext())
return "[]";
StringBuilder sb = new StringBuilder();
sb.append('[');
for (;;) {
V e = it.next();
sb.append(e == this ? "(this Collection)" : e);
if (! it.hasNext())
return sb.append(']').toString();
sb.append(',').append(' ');
}
}
}
2.2 核心 lua 脚本源码分析
2.2.1 设置比较器
- KEYS[1]:集合的键,用于检查集合是否为空。
- KEYS[2]:比较器键,用于存储比较器的签名。
- ARGV[1]:比较器的签名,由类名和类签名组成。
--- 使用redis.call('llen', KEYS[1])获取集合的长度
--- 如果长度为0,表示集合为空
if redis.call('llen', KEYS[1]) == 0 then
--- 使用 redis.call('set', KEYS[2], ARGV[1])将比较器签名存储在Redis中
redis.call('set', KEYS[2], ARGV[1])
return true -- true表示成功设置
else
return false --- false,表示无法设置新的比较器
end