七的博客

Redisson2.0源码分析10-分布式List

源码分析

Redisson2.0源码分析10-分布式List

相比于 Redisson V1 版本,主要是将 Java 层面的 for 循环处理,提取到了 lua 脚本中一次性处理掉。

在实现上,依旧是采用 Redis 的 List 结构去存储,同时对 Java List 的操作也就是使用 Redis List 的相关命令实现。

1. 源码分析

package org.redisson;

import static org.redisson.client.protocol.RedisCommands.EVAL_OBJECT;
import static org.redisson.client.protocol.RedisCommands.LINDEX;
import static org.redisson.client.protocol.RedisCommands.LLEN;
import static org.redisson.client.protocol.RedisCommands.LPOP;
import static org.redisson.client.protocol.RedisCommands.LPUSH;
import static org.redisson.client.protocol.RedisCommands.LRANGE;
import static org.redisson.client.protocol.RedisCommands.LREM_SINGLE;
import static org.redisson.client.protocol.RedisCommands.RPUSH;
import static org.redisson.client.protocol.RedisCommands.RPUSH_BOOLEAN;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;

import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanNumberReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.core.RList;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;


public class RedissonList<V> extends RedissonExpirable implements RList<V> {

    protected RedissonList(CommandExecutor commandExecutor, String name) {
        super(commandExecutor, name);
    }

    // LLEN 命令获取 List 大小
    @Override
    public int size() {
        return get(sizeAsync());
    }

    public Future<Integer> sizeAsync() {
        return commandExecutor.readAsync(getName(), LLEN, getName());
    }

    // 调用 LLEN 命令获取 List 大小是否为0
    @Override
    public boolean isEmpty() {
        return size() == 0;
    }

    @Override
    public boolean contains(Object o) {
        return get(containsAsync(o));
    }

    @Override
    public Iterator<V> iterator() {
        return listIterator();
    }

    @Override
    public Object[] toArray() {
        List<V> list = readAll();
        return list.toArray();
    }

    private List<V> readAll() {
        return (List<V>) get(readAllAsync());
    }

    // 
    @Override
    public Future<Collection<V>> readAllAsync() {
        return commandExecutor.readAsync(getName(), LRANGE, getName(), 0, -1);
    }

    @Override
    public <T> T[] toArray(T[] a) {
        List<V> list = readAll();
        return list.toArray(a);
    }

    // 调用 Redis 命令 RPUSH 添加元素到队尾
    @Override
    public boolean add(V e) {
        return get(addAsync(e));
    }

    @Override
    public Future<Boolean> addAsync(V e) {
        return commandExecutor.writeAsync(getName(), RPUSH_BOOLEAN, getName(), e);
    }

    // 调用 Redis 命令 LREM 移除元素
    @Override
    public boolean remove(Object o) {
        return remove(o, 1);
    }

    @Override
    public Future<Boolean> removeAsync(Object o) {
        return removeAsync(o, 1);
    }

    protected Future<Boolean> removeAsync(Object o, int count) {
        return commandExecutor.writeAsync(getName(), LREM_SINGLE, getName(), count, o);
    }

    protected boolean remove(Object o, int count) {
        return get(removeAsync(o, count));
    }

    // 使用Lua脚本遍历列表和参数集合,检查所有元素是否存在
    @Override
    public Future<Boolean> containsAllAsync(Collection<?> c) {
        return commandExecutor.evalReadAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
                "local items = redis.call('lrange', KEYS[1], 0, -1) " +
                "for i=1, #items do " +
                    "for j = 0, table.getn(ARGV), 1 do " +
                        "if items[i] == ARGV[j] then " +
                            "table.remove(ARGV, j) " +
                        "end " +
                    "end " +
                "end " +
                "return table.getn(ARGV) == 0",
                Collections.<Object>singletonList(getName()), c.toArray());
    }

    @Override
    public boolean containsAll(Collection<?> c) {
        return get(containsAllAsync(c));
    }

    @Override
    public boolean addAll(Collection<? extends V> c) {
        return get(addAllAsync(c));
    }

    // 使用 Redis 命令 RPUSH 添加所有元素到 List 中
    @Override
    public Future<Boolean> addAllAsync(final Collection<? extends V> c) {
        final Promise<Boolean> promise = newPromise();
        if (c.isEmpty()) {
            promise.setSuccess(false);
            return promise;
        }
        final int listSize = size();
        List<Object> args = new ArrayList<Object>(c.size() + 1);
        args.add(getName());
        args.addAll(c);
        Future<Long> res = commandExecutor.writeAsync(getName(), RPUSH, args.toArray());
        res.addListener(new FutureListener<Long>() {
            @Override
            public void operationComplete(Future<Long> future) throws Exception {
                if (future.isSuccess()) {
                    promise.setSuccess(listSize != future.getNow());
                } else {
                    promise.setFailure(future.cause());
                }
            }
        });
        return promise;
    }

    @Override
    public boolean addAll(final int index, final Collection<? extends V> coll) {
        checkPosition(index);
        if (coll.isEmpty()) {
            return false;
        }
        int size = size();
        if (index < size) {

            if (index == 0) { // prepend elements to list
                List<Object> elements = new ArrayList<Object>(coll);
                Collections.reverse(elements);
                elements.add(0, getName());

                Long newSize = commandExecutor.write(getName(), LPUSH, elements.toArray());
                return newSize != size;
            }

            // insert into middle of list

            List<Object> args = new ArrayList<Object>(coll.size() + 1);
            args.add(index);
            args.addAll(coll);
            return commandExecutor.evalWrite(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5),
                    "local ind = table.remove(ARGV, 1); " + // index is the first parameter
                            "local tail = redis.call('lrange', KEYS[1], ind, -1); " +
                            "redis.call('ltrim', KEYS[1], 0, ind - 1); " +
                            "for i, v in ipairs(ARGV) do redis.call('rpush', KEYS[1], v) end;" +
                            "for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;" +
                            "return true",
                    Collections.<Object>singletonList(getName()), args.toArray());
        } else {
            // append to list
            return addAll(coll);
        }
    }

    // 使用 Lua 脚本遍历参数集合,再调用 Redis LREM 命令移除每个元素
    @Override
    public Future<Boolean> removeAllAsync(Collection<?> c) {
        return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
                        "local v = false " +
                        "for i = 0, table.getn(ARGV), 1 do "
                            + "if redis.call('lrem', KEYS[1], 0, ARGV[i]) == 1 "
                            + "then v = true end "
                        +"end "
                       + "return v ",
                Collections.<Object>singletonList(getName()), c.toArray());
    }

    @Override
    public boolean removeAll(Collection<?> c) {
        return get(removeAllAsync(c));
    }

    // 保留 List 中包含在指定集合中的元素
    @Override
    public boolean retainAll(Collection<?> c) {
        return get(retainAllAsync(c));
    }

    @Override
    public Future<Boolean> retainAllAsync(Collection<?> c) {
        // 使用 Lua 脚本遍历 list ,移除不在参数集合中的元素
        return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
                "local changed = false " +
                "local items = redis.call('lrange', KEYS[1], 0, -1) "
                   + "local i = 1 "
                   + "local s = table.getn(items) "
                   + "while i <= s do "
                        + "local element = items[i] "
                        + "local isInAgrs = false "
                        + "for j = 0, table.getn(ARGV), 1 do "
                            + "if ARGV[j] == element then "
                                + "isInAgrs = true "
                                + "break "
                            + "end "
                        + "end "
                        + "if isInAgrs == false then "
                            + "redis.call('LREM', KEYS[1], 0, element) "
                            + "changed = true "
                        + "end "
                        + "i = i + 1 "
                   + "end "
                   + "return changed ",
                Collections.<Object>singletonList(getName()), c.toArray());
    }

    // 清空 list 
    @Override
    public void clear() {
        delete();
    }

    @Override
    public Future<V> getAsync(int index) {
        // 调用 Redis LINDEX 命令获取指定下标的元素
        return commandExecutor.readAsync(getName(), LINDEX, getName(), index);
    }

    @Override
    public V get(int index) {
        checkIndex(index);
        return getValue(index);
    }

    V getValue(int index) {
        return get(getAsync(index));
    }

    private void checkIndex(int index) {
        int size = size();
        if (!isInRange(index, size))
            throw new IndexOutOfBoundsException("index: " + index + " but current size: "+ size);
    }

    private boolean isInRange(int index, int size) {
        return index >= 0 && index < size;
    }

    private void checkPosition(int index) {
        int size = size();
        if (!isPositionInRange(index, size))
            throw new IndexOutOfBoundsException("index: " + index + " but current size: "+ size);
    }

    private boolean isPositionInRange(int index, int size) {
        return index >= 0 && index <= size;
    }

    // 给 list 指定下标设置元素
    @Override
    public V set(int index, V element) {
        checkIndex(index);
        return get(setAsync(index, element));
    }

    @Override
    public Future<V> setAsync(int index, V element) {
        // 执行 lua 脚本设置元素
        return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Object>("EVAL", 5),
                "local v = redis.call('lindex', KEYS[1], ARGV[1]); " +
                        "redis.call('lset', KEYS[1], ARGV[1], ARGV[2]); " +
                        "return v",
                Collections.<Object>singletonList(getName()), index, element);
    }

    @Override
    public void fastSet(int index, V element) {
        checkIndex(index);
        get(fastSetAsync(index, element));
    }

    @Override
    public Future<Void> fastSetAsync(int index, V element) {
        return commandExecutor.writeAsync(getName(), RedisCommands.LSET, getName(), index, element);
    }

    @Override
    public void add(int index, V element) {
        addAll(index, Collections.singleton(element));
    }

    // 移除某个下标的元素
    @Override
    public V remove(int index) {
        checkIndex(index);

        // 如果移除的是第一个元素,直接 LPOP 弹出即可
        if (index == 0) {
            return commandExecutor.write(getName(), LPOP, getName());
        }

        // 执行 lua 脚本
        // 先拿到尾部的元素,将目标元素删除后,把删除元素后面的元素重新加到 list 
        return commandExecutor.evalWrite(getName(), EVAL_OBJECT,
                "local v = redis.call('lindex', KEYS[1], ARGV[1]); " +
                        "local tail = redis.call('lrange', KEYS[1], ARGV[1]);" +
                        "redis.call('ltrim', KEYS[1], 0, ARGV[1] - 1);" +
                        "for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;" +
                        "return v",
                Collections.<Object>singletonList(getName()), index);
    }

    @Override
    public int indexOf(Object o) {
        return get(indexOfAsync(o));
    }

    @Override
    public Future<Boolean> containsAsync(Object o) {
        return indexOfAsync(o, new BooleanNumberReplayConvertor());
    }

    // 获取元素的下标
    private <R> Future<R> indexOfAsync(Object o, Convertor<R> convertor) {
        return commandExecutor.evalReadAsync(getName(), new RedisCommand<R>("EVAL", convertor, 4),
                "local key = KEYS[1] " +
                "local obj = ARGV[1] " +
                "local items = redis.call('lrange', key, 0, -1) " +
                "for i=1,#items do " +
                    "if items[i] == obj then " +
                        "return i - 1 " +
                    "end " +
                "end " +
                "return -1",
                Collections.<Object>singletonList(getName()), o);
    }

    @Override
    public Future<Integer> indexOfAsync(Object o) {
        return indexOfAsync(o, new IntegerReplayConvertor());
    }

    @Override
    public int lastIndexOf(Object o) {
        return get(lastIndexOfAsync(o));
    }

    @Override
    public Future<Integer> lastIndexOfAsync(Object o) {
        return commandExecutor.evalReadAsync(getName(), new RedisCommand<Integer>("EVAL", new IntegerReplayConvertor(), 4),
                "local key = KEYS[1] " +
                "local obj = ARGV[1] " +
                "local items = redis.call('lrange', key, 0, -1) " +
                "for i = table.getn(items), 0, -1 do " +
                    "if items[i] == obj then " +
                        "return i - 1 " +
                    "end " +
                "end " +
                "return -1",
                Collections.<Object>singletonList(getName()), o);
    }

    @Override
    public ListIterator<V> listIterator() {
        return listIterator(0);
    }

    // List 迭代器,这里不详细解读
    @Override
    public ListIterator<V> listIterator(final int ind) {
        return new ListIterator<V>() {

            private V prevCurrentValue;
            private V nextCurrentValue;
            private V currentValueHasRead;
            private int currentIndex = ind - 1;
            private boolean removeExecuted;

            @Override
            public boolean hasNext() {
                V val = RedissonList.this.getValue(currentIndex+1);
                if (val != null) {
                    nextCurrentValue = val;
                }
                return val != null;
            }

            @Override
            public V next() {
                if (nextCurrentValue == null && !hasNext()) {
                    throw new NoSuchElementException("No such element at index " + currentIndex);
                }
                currentIndex++;
                currentValueHasRead = nextCurrentValue;
                nextCurrentValue = null;
                removeExecuted = false;
                return currentValueHasRead;
            }

            @Override
            public void remove() {
                if (currentValueHasRead == null) {
                    throw new IllegalStateException("Neither next nor previous have been called");
                }
                if (removeExecuted) {
                    throw new IllegalStateException("Element been already deleted");
                }
                RedissonList.this.remove(currentValueHasRead);
                currentIndex--;
                removeExecuted = true;
                currentValueHasRead = null;
            }

            @Override
            public boolean hasPrevious() {
                if (currentIndex < 0) {
                    return false;
                }
                V val = RedissonList.this.getValue(currentIndex);
                if (val != null) {
                    prevCurrentValue = val;
                }
                return val != null;
            }

            @Override
            public V previous() {
                if (prevCurrentValue == null && !hasPrevious()) {
                    throw new NoSuchElementException("No such element at index " + currentIndex);
                }
                currentIndex--;
                removeExecuted = false;
                currentValueHasRead = prevCurrentValue;
                prevCurrentValue = null;
                return currentValueHasRead;
            }

            @Override
            public int nextIndex() {
                return currentIndex + 1;
            }

            @Override
            public int previousIndex() {
                return currentIndex;
            }

            @Override
            public void set(V e) {
                if (currentIndex >= size()-1) {
                    throw new IllegalStateException();
                }
                RedissonList.this.set(currentIndex, e);
            }

            @Override
            public void add(V e) {
                RedissonList.this.add(currentIndex+1, e);
                currentIndex++;
            }
        };
    }

    // 截取指定下标到结束下标中的元素
    @Override
    public List<V> subList(int fromIndex, int toIndex) {
        int size = size();
        if (fromIndex < 0 || toIndex > size) {
            throw new IndexOutOfBoundsException("fromIndex: " + fromIndex + " toIndex: " + toIndex + " size: " + size);
        }
        if (fromIndex > toIndex) {
            throw new IllegalArgumentException("fromIndex: " + fromIndex + " toIndex: " + toIndex);
        }
        // 调用 Redis LRANGE 命令获取元素
        return commandExecutor.read(getName(), LRANGE, getName(), fromIndex, toIndex - 1);
    }

    public String toString() {
        Iterator<V> it = iterator();
        if (! it.hasNext())
            return "[]";

        StringBuilder sb = new StringBuilder();
        sb.append('[');
        for (;;) {
            V e = it.next();
            sb.append(e == this ? "(this Collection)" : e);
            if (! it.hasNext())
                return sb.append(']').toString();
            sb.append(',').append(' ');
        }
    }

}

2. 核心的 lua 脚本

比较核心的操作逻辑,依旧是替换成 lua 脚本进行实现。

2.1 检查Redis List中是否包含指定集合中的所有元素

参数示例:

  • KEYS: myList ,Redis List 的 key。
  • ARGV:[“element1”, “element2”],表示要检查的元素集合。
--- 使用LRANGE命令获取列表中所有元素,存储在 items 中。 KEYS[1]表示 list 的 key。
local items = redis.call('lrange', KEYS[1], 0, -1)

--- 双重循环。 外层循环遍历 items 中的每个元素
--- 内层循环遍历ARGV中的每个参数,即每一个待检查的集合元素
for i=1, #items do
    for j = 0, table.getn(ARGV), 1 do

        --- 如果items[i]等于ARGV[j],从ARGV中移除该元素
        if items[i] == ARGV[j] then
            table.remove(ARGV, j)
        end
    end
end

--- 如果所有元素都在列表中找到,返回true,否则返回false。
return table.getn(ARGV) == 0

2.2 在指定索引处插入多个元素

--- 从ARGV中移除并获取第一个元素,作为插入的索引
local ind = table.remove(ARGV, 1);

--- 使用 LRANGE 命令获取从索引 ind 到列表末尾的所有元素,存储在tail变量中
local tail = redis.call('lrange', KEYS[1], ind, -1);

--- 使用 LTRIM 命令保留列表中从开始到索引 ind-1 的元素
redis.call('ltrim', KEYS[1], 0, ind - 1);

--- 遍历 ARGV 中的剩余元素,使用 RPUSH 命令添加到列表中
for i, v in ipairs(ARGV) do redis.call('rpush', KEYS[1], v) end;

--- 遍历tail中的元素,使用RPUSH命令将它们添加回列表中
for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;


--- 返回true表示操作成功
return true

这个 lua 脚本支持任意下标位置插入元素,代价是要先删除目标下标后的位置。 再插入元素,再把原先删掉的元素还原回去。

2.3 保留Redis列表中存在于指定集合中的元素

-- 记录列表是否发生变化
local changed = false

-- 使用LRANGE命令获取列表中所有元素存在 items
local items = redis.call('lrange', KEYS[1], 0, -1)

----- 初始化索引变量i,用于遍历items
local i = 1

------ 获取items的长度,存储在s中
local s = table.getn(items)

--- 使用while循环遍历items中的每个元素
while i <= s do
    local element = items[i]  --- 获取当前遍历的元素
    local isInAgrs = false   --- 记录当前元素是否在参数集合中

    --- 遍历ARGV中的每个参数, ARGV是要保留的集合元素
    for j = 0, table.getn(ARGV), 1 do
        --- 如果当前参数等于当前元素
        if ARGV[j] == element then
            isInAgrs = true  --- 标记当前元素在参数集合中
            break   -- 退出内层循环
        end
    end

    --- 如果当前元素不在参数集合中,就直接 LREM 命令删除掉元素
    if isInAgrs == false then
        redis.call('LREM', KEYS[1], 0, element)
        changed = true   --- 切换标记表示列表发生了变化
    end
    i = i + 1   ---- 递增索引i  继续遍历下一个元素
end
return changed

2.4 替换列表中指定索引处的元素

--- 使用 LINDEX 命令获取指定索引处的当前元素,存储在v
local v = redis.call('lindex', KEYS[1], ARGV[1]);
--- 使用 LSET 命令将指定索引处的元素设置为新的值
redis.call('lset', KEYS[1], ARGV[1], ARGV[2]);
--- 返回旧的值
return v

2.5 移除列表中指定索引处的元素

--- 使用LINDEX命令获取指定索引处的元素,存储在v
local v = redis.call('lindex', KEYS[1], ARGV[1]);

--- 使用 LRANGE 命令获取从索引 ARGV[1] 到列表末尾的所有元素,存储在tail中
local tail = redis.call('lrange', KEYS[1], ARGV[1]);

--- 使用 LTRIM 命令保留列表中从开始到索引 ARGV[1]-1 的元素
redis.call('ltrim', KEYS[1], 0, ARGV[1] - 1);
 
--- 遍历 tail 中的元素,使用RPUSH命令将它们添加回列表中
for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;

-- 返回被移除的元素
return v

2.6 从Redis列表中移除指定集合中所有元素

参数示例:

  • KEYS:[“myList”],表示Redis列表的键名。
  • ARGV:[“element1”, “element2”],表示要移除的元素集合。
--- 记录是否有元素被移除
local v = false

-- 遍历ARGV中的每个元素
for i = 0, table.getn(ARGV), 1 do
    -- 使用 LREM 命令从列表中移除所有匹配的元素
    if redis.call('lrem', KEYS[1], 0, ARGV[i]) == 1 then
        v = true  -- 如果成功移除至少一个元素,将v设置为true
    end
end


-- 返回true表示至少有一个元素被移除, 否则返回false。
return v

2.7 查找列表中某个元素的索引

--- 分别获取列表的键名和要查找的对象
local key = KEYS[1]
local obj = ARGV[1]

--- 使用LRANGE命令获取列表中所有元素
local items = redis.call('lrange', key, 0, -1)

--- 从头到尾顺序遍历items中的每个元素
for i=1,#items do
    --- 如果找到与obj相等的元素,返回从0开始的索引
    if items[i] == obj then
        return i - 1
    end
end
--- 如果未找到,返回-1
return -1

2.8 查找列表中某个元素的最后一个索引

--- 分别获取列表的键名和要查找的对象
local key = KEYS[1]
local obj = ARGV[1]

--- 使用LRANGE命令获取列表中所有元素
local items = redis.call('lrange', key, 0, -1)

--- 从尾到头逆序遍历items中的每个元素
for i = table.getn(items), 0, -1 do
    --- 如果找到与obj相等的元素,返回从0开始的索引
    if items[i] == obj then
        return i - 1
    end
end

--- 如果未找到,返回-1
return -1