Redisson2.0源码分析10-分布式List
Redisson2.0源码分析10-分布式List
相比于 Redisson V1 版本,主要是将 Java 层面的 for 循环处理,提取到了 lua 脚本中一次性处理掉。
在实现上,依旧是采用 Redis 的 List 结构去存储,同时对 Java List 的操作也就是使用 Redis List 的相关命令实现。
1. 源码分析
package org.redisson;
import static org.redisson.client.protocol.RedisCommands.EVAL_OBJECT;
import static org.redisson.client.protocol.RedisCommands.LINDEX;
import static org.redisson.client.protocol.RedisCommands.LLEN;
import static org.redisson.client.protocol.RedisCommands.LPOP;
import static org.redisson.client.protocol.RedisCommands.LPUSH;
import static org.redisson.client.protocol.RedisCommands.LRANGE;
import static org.redisson.client.protocol.RedisCommands.LREM_SINGLE;
import static org.redisson.client.protocol.RedisCommands.RPUSH;
import static org.redisson.client.protocol.RedisCommands.RPUSH_BOOLEAN;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanNumberReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.core.RList;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
public class RedissonList<V> extends RedissonExpirable implements RList<V> {
protected RedissonList(CommandExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
// LLEN 命令获取 List 大小
@Override
public int size() {
return get(sizeAsync());
}
public Future<Integer> sizeAsync() {
return commandExecutor.readAsync(getName(), LLEN, getName());
}
// 调用 LLEN 命令获取 List 大小是否为0
@Override
public boolean isEmpty() {
return size() == 0;
}
@Override
public boolean contains(Object o) {
return get(containsAsync(o));
}
@Override
public Iterator<V> iterator() {
return listIterator();
}
@Override
public Object[] toArray() {
List<V> list = readAll();
return list.toArray();
}
private List<V> readAll() {
return (List<V>) get(readAllAsync());
}
//
@Override
public Future<Collection<V>> readAllAsync() {
return commandExecutor.readAsync(getName(), LRANGE, getName(), 0, -1);
}
@Override
public <T> T[] toArray(T[] a) {
List<V> list = readAll();
return list.toArray(a);
}
// 调用 Redis 命令 RPUSH 添加元素到队尾
@Override
public boolean add(V e) {
return get(addAsync(e));
}
@Override
public Future<Boolean> addAsync(V e) {
return commandExecutor.writeAsync(getName(), RPUSH_BOOLEAN, getName(), e);
}
// 调用 Redis 命令 LREM 移除元素
@Override
public boolean remove(Object o) {
return remove(o, 1);
}
@Override
public Future<Boolean> removeAsync(Object o) {
return removeAsync(o, 1);
}
protected Future<Boolean> removeAsync(Object o, int count) {
return commandExecutor.writeAsync(getName(), LREM_SINGLE, getName(), count, o);
}
protected boolean remove(Object o, int count) {
return get(removeAsync(o, count));
}
// 使用Lua脚本遍历列表和参数集合,检查所有元素是否存在
@Override
public Future<Boolean> containsAllAsync(Collection<?> c) {
return commandExecutor.evalReadAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local items = redis.call('lrange', KEYS[1], 0, -1) " +
"for i=1, #items do " +
"for j = 0, table.getn(ARGV), 1 do " +
"if items[i] == ARGV[j] then " +
"table.remove(ARGV, j) " +
"end " +
"end " +
"end " +
"return table.getn(ARGV) == 0",
Collections.<Object>singletonList(getName()), c.toArray());
}
@Override
public boolean containsAll(Collection<?> c) {
return get(containsAllAsync(c));
}
@Override
public boolean addAll(Collection<? extends V> c) {
return get(addAllAsync(c));
}
// 使用 Redis 命令 RPUSH 添加所有元素到 List 中
@Override
public Future<Boolean> addAllAsync(final Collection<? extends V> c) {
final Promise<Boolean> promise = newPromise();
if (c.isEmpty()) {
promise.setSuccess(false);
return promise;
}
final int listSize = size();
List<Object> args = new ArrayList<Object>(c.size() + 1);
args.add(getName());
args.addAll(c);
Future<Long> res = commandExecutor.writeAsync(getName(), RPUSH, args.toArray());
res.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (future.isSuccess()) {
promise.setSuccess(listSize != future.getNow());
} else {
promise.setFailure(future.cause());
}
}
});
return promise;
}
@Override
public boolean addAll(final int index, final Collection<? extends V> coll) {
checkPosition(index);
if (coll.isEmpty()) {
return false;
}
int size = size();
if (index < size) {
if (index == 0) { // prepend elements to list
List<Object> elements = new ArrayList<Object>(coll);
Collections.reverse(elements);
elements.add(0, getName());
Long newSize = commandExecutor.write(getName(), LPUSH, elements.toArray());
return newSize != size;
}
// insert into middle of list
List<Object> args = new ArrayList<Object>(coll.size() + 1);
args.add(index);
args.addAll(coll);
return commandExecutor.evalWrite(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5),
"local ind = table.remove(ARGV, 1); " + // index is the first parameter
"local tail = redis.call('lrange', KEYS[1], ind, -1); " +
"redis.call('ltrim', KEYS[1], 0, ind - 1); " +
"for i, v in ipairs(ARGV) do redis.call('rpush', KEYS[1], v) end;" +
"for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;" +
"return true",
Collections.<Object>singletonList(getName()), args.toArray());
} else {
// append to list
return addAll(coll);
}
}
// 使用 Lua 脚本遍历参数集合,再调用 Redis LREM 命令移除每个元素
@Override
public Future<Boolean> removeAllAsync(Collection<?> c) {
return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local v = false " +
"for i = 0, table.getn(ARGV), 1 do "
+ "if redis.call('lrem', KEYS[1], 0, ARGV[i]) == 1 "
+ "then v = true end "
+"end "
+ "return v ",
Collections.<Object>singletonList(getName()), c.toArray());
}
@Override
public boolean removeAll(Collection<?> c) {
return get(removeAllAsync(c));
}
// 保留 List 中包含在指定集合中的元素
@Override
public boolean retainAll(Collection<?> c) {
return get(retainAllAsync(c));
}
@Override
public Future<Boolean> retainAllAsync(Collection<?> c) {
// 使用 Lua 脚本遍历 list ,移除不在参数集合中的元素
return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local changed = false " +
"local items = redis.call('lrange', KEYS[1], 0, -1) "
+ "local i = 1 "
+ "local s = table.getn(items) "
+ "while i <= s do "
+ "local element = items[i] "
+ "local isInAgrs = false "
+ "for j = 0, table.getn(ARGV), 1 do "
+ "if ARGV[j] == element then "
+ "isInAgrs = true "
+ "break "
+ "end "
+ "end "
+ "if isInAgrs == false then "
+ "redis.call('LREM', KEYS[1], 0, element) "
+ "changed = true "
+ "end "
+ "i = i + 1 "
+ "end "
+ "return changed ",
Collections.<Object>singletonList(getName()), c.toArray());
}
// 清空 list
@Override
public void clear() {
delete();
}
@Override
public Future<V> getAsync(int index) {
// 调用 Redis LINDEX 命令获取指定下标的元素
return commandExecutor.readAsync(getName(), LINDEX, getName(), index);
}
@Override
public V get(int index) {
checkIndex(index);
return getValue(index);
}
V getValue(int index) {
return get(getAsync(index));
}
private void checkIndex(int index) {
int size = size();
if (!isInRange(index, size))
throw new IndexOutOfBoundsException("index: " + index + " but current size: "+ size);
}
private boolean isInRange(int index, int size) {
return index >= 0 && index < size;
}
private void checkPosition(int index) {
int size = size();
if (!isPositionInRange(index, size))
throw new IndexOutOfBoundsException("index: " + index + " but current size: "+ size);
}
private boolean isPositionInRange(int index, int size) {
return index >= 0 && index <= size;
}
// 给 list 指定下标设置元素
@Override
public V set(int index, V element) {
checkIndex(index);
return get(setAsync(index, element));
}
@Override
public Future<V> setAsync(int index, V element) {
// 执行 lua 脚本设置元素
return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Object>("EVAL", 5),
"local v = redis.call('lindex', KEYS[1], ARGV[1]); " +
"redis.call('lset', KEYS[1], ARGV[1], ARGV[2]); " +
"return v",
Collections.<Object>singletonList(getName()), index, element);
}
@Override
public void fastSet(int index, V element) {
checkIndex(index);
get(fastSetAsync(index, element));
}
@Override
public Future<Void> fastSetAsync(int index, V element) {
return commandExecutor.writeAsync(getName(), RedisCommands.LSET, getName(), index, element);
}
@Override
public void add(int index, V element) {
addAll(index, Collections.singleton(element));
}
// 移除某个下标的元素
@Override
public V remove(int index) {
checkIndex(index);
// 如果移除的是第一个元素,直接 LPOP 弹出即可
if (index == 0) {
return commandExecutor.write(getName(), LPOP, getName());
}
// 执行 lua 脚本
// 先拿到尾部的元素,将目标元素删除后,把删除元素后面的元素重新加到 list
return commandExecutor.evalWrite(getName(), EVAL_OBJECT,
"local v = redis.call('lindex', KEYS[1], ARGV[1]); " +
"local tail = redis.call('lrange', KEYS[1], ARGV[1]);" +
"redis.call('ltrim', KEYS[1], 0, ARGV[1] - 1);" +
"for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;" +
"return v",
Collections.<Object>singletonList(getName()), index);
}
@Override
public int indexOf(Object o) {
return get(indexOfAsync(o));
}
@Override
public Future<Boolean> containsAsync(Object o) {
return indexOfAsync(o, new BooleanNumberReplayConvertor());
}
// 获取元素的下标
private <R> Future<R> indexOfAsync(Object o, Convertor<R> convertor) {
return commandExecutor.evalReadAsync(getName(), new RedisCommand<R>("EVAL", convertor, 4),
"local key = KEYS[1] " +
"local obj = ARGV[1] " +
"local items = redis.call('lrange', key, 0, -1) " +
"for i=1,#items do " +
"if items[i] == obj then " +
"return i - 1 " +
"end " +
"end " +
"return -1",
Collections.<Object>singletonList(getName()), o);
}
@Override
public Future<Integer> indexOfAsync(Object o) {
return indexOfAsync(o, new IntegerReplayConvertor());
}
@Override
public int lastIndexOf(Object o) {
return get(lastIndexOfAsync(o));
}
@Override
public Future<Integer> lastIndexOfAsync(Object o) {
return commandExecutor.evalReadAsync(getName(), new RedisCommand<Integer>("EVAL", new IntegerReplayConvertor(), 4),
"local key = KEYS[1] " +
"local obj = ARGV[1] " +
"local items = redis.call('lrange', key, 0, -1) " +
"for i = table.getn(items), 0, -1 do " +
"if items[i] == obj then " +
"return i - 1 " +
"end " +
"end " +
"return -1",
Collections.<Object>singletonList(getName()), o);
}
@Override
public ListIterator<V> listIterator() {
return listIterator(0);
}
// List 迭代器,这里不详细解读
@Override
public ListIterator<V> listIterator(final int ind) {
return new ListIterator<V>() {
private V prevCurrentValue;
private V nextCurrentValue;
private V currentValueHasRead;
private int currentIndex = ind - 1;
private boolean removeExecuted;
@Override
public boolean hasNext() {
V val = RedissonList.this.getValue(currentIndex+1);
if (val != null) {
nextCurrentValue = val;
}
return val != null;
}
@Override
public V next() {
if (nextCurrentValue == null && !hasNext()) {
throw new NoSuchElementException("No such element at index " + currentIndex);
}
currentIndex++;
currentValueHasRead = nextCurrentValue;
nextCurrentValue = null;
removeExecuted = false;
return currentValueHasRead;
}
@Override
public void remove() {
if (currentValueHasRead == null) {
throw new IllegalStateException("Neither next nor previous have been called");
}
if (removeExecuted) {
throw new IllegalStateException("Element been already deleted");
}
RedissonList.this.remove(currentValueHasRead);
currentIndex--;
removeExecuted = true;
currentValueHasRead = null;
}
@Override
public boolean hasPrevious() {
if (currentIndex < 0) {
return false;
}
V val = RedissonList.this.getValue(currentIndex);
if (val != null) {
prevCurrentValue = val;
}
return val != null;
}
@Override
public V previous() {
if (prevCurrentValue == null && !hasPrevious()) {
throw new NoSuchElementException("No such element at index " + currentIndex);
}
currentIndex--;
removeExecuted = false;
currentValueHasRead = prevCurrentValue;
prevCurrentValue = null;
return currentValueHasRead;
}
@Override
public int nextIndex() {
return currentIndex + 1;
}
@Override
public int previousIndex() {
return currentIndex;
}
@Override
public void set(V e) {
if (currentIndex >= size()-1) {
throw new IllegalStateException();
}
RedissonList.this.set(currentIndex, e);
}
@Override
public void add(V e) {
RedissonList.this.add(currentIndex+1, e);
currentIndex++;
}
};
}
// 截取指定下标到结束下标中的元素
@Override
public List<V> subList(int fromIndex, int toIndex) {
int size = size();
if (fromIndex < 0 || toIndex > size) {
throw new IndexOutOfBoundsException("fromIndex: " + fromIndex + " toIndex: " + toIndex + " size: " + size);
}
if (fromIndex > toIndex) {
throw new IllegalArgumentException("fromIndex: " + fromIndex + " toIndex: " + toIndex);
}
// 调用 Redis LRANGE 命令获取元素
return commandExecutor.read(getName(), LRANGE, getName(), fromIndex, toIndex - 1);
}
public String toString() {
Iterator<V> it = iterator();
if (! it.hasNext())
return "[]";
StringBuilder sb = new StringBuilder();
sb.append('[');
for (;;) {
V e = it.next();
sb.append(e == this ? "(this Collection)" : e);
if (! it.hasNext())
return sb.append(']').toString();
sb.append(',').append(' ');
}
}
}
2. 核心的 lua 脚本
比较核心的操作逻辑,依旧是替换成 lua 脚本进行实现。
2.1 检查Redis List中是否包含指定集合中的所有元素
参数示例:
- KEYS: myList ,Redis List 的 key。
- ARGV:[“element1”, “element2”],表示要检查的元素集合。
--- 使用LRANGE命令获取列表中所有元素,存储在 items 中。 KEYS[1]表示 list 的 key。
local items = redis.call('lrange', KEYS[1], 0, -1)
--- 双重循环。 外层循环遍历 items 中的每个元素
--- 内层循环遍历ARGV中的每个参数,即每一个待检查的集合元素
for i=1, #items do
for j = 0, table.getn(ARGV), 1 do
--- 如果items[i]等于ARGV[j],从ARGV中移除该元素
if items[i] == ARGV[j] then
table.remove(ARGV, j)
end
end
end
--- 如果所有元素都在列表中找到,返回true,否则返回false。
return table.getn(ARGV) == 0
2.2 在指定索引处插入多个元素
--- 从ARGV中移除并获取第一个元素,作为插入的索引
local ind = table.remove(ARGV, 1);
--- 使用 LRANGE 命令获取从索引 ind 到列表末尾的所有元素,存储在tail变量中
local tail = redis.call('lrange', KEYS[1], ind, -1);
--- 使用 LTRIM 命令保留列表中从开始到索引 ind-1 的元素
redis.call('ltrim', KEYS[1], 0, ind - 1);
--- 遍历 ARGV 中的剩余元素,使用 RPUSH 命令添加到列表中
for i, v in ipairs(ARGV) do redis.call('rpush', KEYS[1], v) end;
--- 遍历tail中的元素,使用RPUSH命令将它们添加回列表中
for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;
--- 返回true表示操作成功
return true
这个 lua 脚本支持任意下标位置插入元素,代价是要先删除目标下标后的位置。 再插入元素,再把原先删掉的元素还原回去。
2.3 保留Redis列表中存在于指定集合中的元素
-- 记录列表是否发生变化
local changed = false
-- 使用LRANGE命令获取列表中所有元素存在 items
local items = redis.call('lrange', KEYS[1], 0, -1)
----- 初始化索引变量i,用于遍历items
local i = 1
------ 获取items的长度,存储在s中
local s = table.getn(items)
--- 使用while循环遍历items中的每个元素
while i <= s do
local element = items[i] --- 获取当前遍历的元素
local isInAgrs = false --- 记录当前元素是否在参数集合中
--- 遍历ARGV中的每个参数, ARGV是要保留的集合元素
for j = 0, table.getn(ARGV), 1 do
--- 如果当前参数等于当前元素
if ARGV[j] == element then
isInAgrs = true --- 标记当前元素在参数集合中
break -- 退出内层循环
end
end
--- 如果当前元素不在参数集合中,就直接 LREM 命令删除掉元素
if isInAgrs == false then
redis.call('LREM', KEYS[1], 0, element)
changed = true --- 切换标记表示列表发生了变化
end
i = i + 1 ---- 递增索引i 继续遍历下一个元素
end
return changed
2.4 替换列表中指定索引处的元素
--- 使用 LINDEX 命令获取指定索引处的当前元素,存储在v
local v = redis.call('lindex', KEYS[1], ARGV[1]);
--- 使用 LSET 命令将指定索引处的元素设置为新的值
redis.call('lset', KEYS[1], ARGV[1], ARGV[2]);
--- 返回旧的值
return v
2.5 移除列表中指定索引处的元素
--- 使用LINDEX命令获取指定索引处的元素,存储在v
local v = redis.call('lindex', KEYS[1], ARGV[1]);
--- 使用 LRANGE 命令获取从索引 ARGV[1] 到列表末尾的所有元素,存储在tail中
local tail = redis.call('lrange', KEYS[1], ARGV[1]);
--- 使用 LTRIM 命令保留列表中从开始到索引 ARGV[1]-1 的元素
redis.call('ltrim', KEYS[1], 0, ARGV[1] - 1);
--- 遍历 tail 中的元素,使用RPUSH命令将它们添加回列表中
for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;
-- 返回被移除的元素
return v
2.6 从Redis列表中移除指定集合中所有元素
参数示例:
- KEYS:[“myList”],表示Redis列表的键名。
- ARGV:[“element1”, “element2”],表示要移除的元素集合。
--- 记录是否有元素被移除
local v = false
-- 遍历ARGV中的每个元素
for i = 0, table.getn(ARGV), 1 do
-- 使用 LREM 命令从列表中移除所有匹配的元素
if redis.call('lrem', KEYS[1], 0, ARGV[i]) == 1 then
v = true -- 如果成功移除至少一个元素,将v设置为true
end
end
-- 返回true表示至少有一个元素被移除, 否则返回false。
return v
2.7 查找列表中某个元素的索引
--- 分别获取列表的键名和要查找的对象
local key = KEYS[1]
local obj = ARGV[1]
--- 使用LRANGE命令获取列表中所有元素
local items = redis.call('lrange', key, 0, -1)
--- 从头到尾顺序遍历items中的每个元素
for i=1,#items do
--- 如果找到与obj相等的元素,返回从0开始的索引
if items[i] == obj then
return i - 1
end
end
--- 如果未找到,返回-1
return -1
2.8 查找列表中某个元素的最后一个索引
--- 分别获取列表的键名和要查找的对象
local key = KEYS[1]
local obj = ARGV[1]
--- 使用LRANGE命令获取列表中所有元素
local items = redis.call('lrange', key, 0, -1)
--- 从尾到头逆序遍历items中的每个元素
for i = table.getn(items), 0, -1 do
--- 如果找到与obj相等的元素,返回从0开始的索引
if items[i] == obj then
return i - 1
end
end
--- 如果未找到,返回-1
return -1