七的博客

Redisson2.0源码分析11-分布式Queue

源码分析

Redisson2.0源码分析11-分布式Queue

几个分布式 Queue 相关的工具类,底层的数据存储跟 RList 保持一致,也是采用 Redis List 进行数据存储。

在数据操作上,也是调用 Redis List 相关的命令进行操作。

1. 分布式阻塞队列 RBlockingQueue

依旧是对 Redis List 结构的封装。

1.1 源码分析

package org.redisson;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.decoder.ListDrainToDecoder;
import org.redisson.core.RBlockingQueue;

import io.netty.util.concurrent.Future;


public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlockingQueue<V> {

    protected RedissonBlockingQueue(CommandExecutor commandExecutor, String name) {
        super(commandExecutor, name);
    }

    @Override
    public Future<Boolean> putAsync(V e) {
        return offerAsync(e);
    }

    @Override
    public void put(V e) throws InterruptedException {
        offer(e);
    }

    @Override
    public boolean offer(V e, long timeout, TimeUnit unit) throws InterruptedException {
        return offer(e);
    }

    // 从队列中获取并移除头部元素
    @Override
    public Future<V> takeAsync() {
        // 使用 Redis 的 BLPOP 命令实现阻塞获取操作
        return commandExecutor.writeAsync(getName(), RedisCommands.BLPOP_VALUE, getName(), 0);
    }

    @Override
    public V take() throws InterruptedException {
        Future<V> res = takeAsync();
        return res.await().getNow();
    }

    // 从队列中获取并移除头部元素,带有超时机制
    @Override
    public Future<V> pollAsync(long timeout, TimeUnit unit) {
        // 使用Redis的BLPOP命令,指定超时时间
        return commandExecutor.writeAsync(getName(), RedisCommands.BLPOP_VALUE, getName(), unit.toSeconds(timeout));
    }

    @Override
    public V poll(long timeout, TimeUnit unit) throws InterruptedException {
        Future<V> res = pollAsync(timeout, unit);
        return res.await().getNow();
    }

    @Override
    public V pollLastAndOfferFirstTo(RBlockingQueue<V> queue, long timeout, TimeUnit unit)
            throws InterruptedException {
        return pollLastAndOfferFirstTo(queue.getName(), timeout, unit);
    }

    // 从当前队列中移除尾部元素,并将其添加到另一个指定队列的头部
    @Override
    public Future<V> pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) {
        // 使用 Redis 的 BRPOPLPUSH 命令实现
        return commandExecutor.writeAsync(getName(), RedisCommands.BRPOPLPUSH, getName(), queueName, unit.toSeconds(timeout));
    }

    @Override
    public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException {
        Future<V> res = pollLastAndOfferFirstToAsync(queueName, timeout, unit);
        return res.await().getNow();
    }

    @Override
    public int remainingCapacity() {
        return Integer.MAX_VALUE;
    }

    // 将队列中的所有元素移除并添加到指定集合中
    @Override
    public int drainTo(Collection<? super V> c) {
        return get(drainToAsync(c));
    }

    // 将队列中的所有元素移除并添加到指定集合中
    @Override
    public Future<Integer> drainToAsync(Collection<? super V> c) {
        if (c == null) {
            throw new NullPointerException();
        }

        // 执行 lua 脚本
        // 主要是通过 LRANGE 命令获取所有元素,再通过 LTRIM 命令清空队列
        return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c)),
              "local vals = redis.call('lrange', KEYS[1], 0, -1); " +
              "redis.call('ltrim', KEYS[1], -1, 0); " +
              "return vals", Collections.<Object>singletonList(getName()));
    }

    @Override
    public int drainTo(Collection<? super V> c, int maxElements) {
        if (maxElements <= 0) {
            return 0;
        }

        return get(drainToAsync(c, maxElements));
    }

    // 将队列中最多 maxElements 个元素移除然后添加到指定集合中
    @Override
    public Future<Integer> drainToAsync(Collection<? super V> c, int maxElements) {
        if (c == null) {
            throw new NullPointerException();
        }

        // 通过 LRANGE 命令获取指定数量的元素,再通过 LTRIM 命令移除这些元素
        return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c)),
                "local elemNum = math.min(ARGV[1], redis.call('llen', KEYS[1])) - 1;" +
                        "local vals = redis.call('lrange', KEYS[1], 0, elemNum); " +
                        "redis.call('ltrim', KEYS[1], elemNum + 1, -1); " +
                        "return vals",
                Collections.<Object>singletonList(getName()), maxElements);
    }
}

1.2 lua 源码分析

主要注意下面这几个 lua 脚本

1.2.1 将Redis列表中的所有元素移除并返回 drainTo

-- 使用 LRANGE 命令获取列表中所有元素,保存在vals
local vals = redis.call('lrange', KEYS[1], 0, -1); 

--- 使用 LTRIM 命令清空列表。 -1, 0表示保留从索引-1到0之间的元素,清空列表
redis.call('ltrim', KEYS[1], -1, 0); 

--- 返回获取的所有元素
return vals

1.2.2 将Redis列表中的最多maxElements个元素移除并返回 drainTo

--- 计算要移除的元素数量。使用LLEN命令获取列表长度,并与ARGV[1] (即maxElements) 去进行比较,取较小值
-- 减1是因为Lua的索引从1开始,而Redis的索引从0开始。
local elemNum = math.min(ARGV[1], redis.call('llen', KEYS[1])) - 1;

--- 使用LRANGE命令获取从头部开始的指定个元素
local vals = redis.call('lrange', KEYS[1], 0, elemNum); 

--- 使用 LTRIM 命令移除已获取的元素,保留从 elemNum+1 到列表末尾的元素
redis.call('ltrim', KEYS[1], elemNum + 1, -1); 

---- 返回获取的元素
return vals

2. 分布式双端队列 RDeque

RBlockingQueue 的实现,说明还是基于 Redis List 实现,并且大部分的逻辑都已经在父类中了,这个类中不会有太多的逻辑。

package org.redisson;

import java.util.Iterator;
import java.util.NoSuchElementException;

import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.TrueReplayConvertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.connection.decoder.ListFirstObjectDecoder;
import org.redisson.core.RDeque;

import io.netty.util.concurrent.Future;


public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
    private static final RedisCommand<Void> LPUSH_VOID = new RedisCommand<Void>("LPUSH", new VoidReplayConvertor());
    private static final RedisCommand<Boolean> LPUSH_BOOLEAN = new RedisCommand<Boolean>("LPUSH", new TrueReplayConvertor());

    private static final RedisCommand<Void> RPUSH_VOID = new RedisCommand<Void>("RPUSH", new VoidReplayConvertor(), 2, ValueType.OBJECTS);

    private static final RedisCommand<Object> LRANGE_SINGLE = new RedisCommand<Object>("LRANGE", new ListFirstObjectDecoder());


    protected RedissonDeque(CommandExecutor commandExecutor, String name) {
        super(commandExecutor, name);
    }

    //同步将元素添加到队列的头部
    @Override
    public void addFirst(V e) {
        get(addFirstAsync(e));
    }

    @Override
    public Future<Void> addFirstAsync(V e) {
        //  LPUSH 命令实现
        return commandExecutor.writeAsync(getName(), LPUSH_VOID, getName(), e);
    }

    //同步将元素添加到队列的尾部
    @Override
    public void addLast(V e) {
        get(addLastAsync(e));
    }

    @Override
    public Future<Void> addLastAsync(V e) {
        // RPUSH命令实现
        return commandExecutor.writeAsync(getName(), RPUSH_VOID, getName(), e);
    }

    // 返回一个从队列尾部到头部的迭代器
    @Override
    public Iterator<V> descendingIterator() {
        return new Iterator<V>() {

            private int currentIndex = size();
            private boolean removeExecuted;

            @Override
            public boolean hasNext() {
                int size = size();
                return currentIndex > 0 && size > 0;
            }

            @Override
            public V next() {
                if (!hasNext()) {
                    throw new NoSuchElementException("No such element at index " + currentIndex);
                }
                currentIndex--;
                removeExecuted = false;
                return RedissonDeque.this.get(currentIndex);
            }

            @Override
            public void remove() {
                if (removeExecuted) {
                    throw new IllegalStateException("Element been already deleted");
                }
                RedissonDeque.this.remove(currentIndex);
                currentIndex++;
                removeExecuted = true;
            }

        };
    }

    // 获取队列的最后一个元素
    @Override
    public Future<V> getLastAsync() {
        // LRANGE命令获取最后一个元素
        return commandExecutor.readAsync(getName(), LRANGE_SINGLE, getName(), -1, -1);
    }

    @Override
    public V getLast() {
        V result = get(getLastAsync());
        if (result == null) {
            throw new NoSuchElementException();
        }
        return result;
    }

    // 尝试将元素添加到队列的头部
    @Override
    public boolean offerFirst(V e) {
        return get(offerFirstAsync(e));
    }

    @Override
    public Future<Boolean> offerFirstAsync(V e) {
        return commandExecutor.writeAsync(getName(), LPUSH_BOOLEAN, getName(), e);
    }

    // 尝试将元素添加到队列的尾部
    @Override
    public Future<Boolean> offerLastAsync(V e) {
        return offerAsync(e);
    }

    @Override
    public boolean offerLast(V e) {
        return get(offerLastAsync(e));
    }

    @Override
    public Future<V> peekFirstAsync() {
        return getAsync(0);
    }

    // 查看队列的第一个元素
    @Override
    public V peekFirst() {
        return get(peekFirstAsync());
    }

    @Override
    public Future<V> peekLastAsync() {
        return getLastAsync();
    }

    // 查看队列的最后个元素
    @Override
    public V peekLast() {
        return get(getLastAsync());
    }

    @Override
    public Future<V> pollFirstAsync() {
        return pollAsync();
    }

    // 移除队列头部元素
    @Override
    public V pollFirst() {
        return poll();
    }

    @Override
    public Future<V> pollLastAsync() {
        return commandExecutor.writeAsync(getName(), RedisCommands.RPOP, getName());
    }

    // 移除队列尾部元素
    @Override
    public V pollLast() {
        return get(pollLastAsync());
    }

    // 从队列的头部移除并返回元素
    @Override
    public Future<V> popAsync() {
        return pollAsync();
    }

    @Override
    public V pop() {
        return removeFirst();
    }

    // 将元素推入队列的头部
    @Override
    public Future<Void> pushAsync(V e) {
        return addFirstAsync(e);
    }

    @Override
    public void push(V e) {
        addFirst(e);
    }

    // 移除队列中第一次出现的指定元素
    @Override
    public Future<Boolean> removeFirstOccurrenceAsync(Object o) {
        return removeAsync(o, 1);
    }

    @Override
    public boolean removeFirstOccurrence(Object o) {
        return remove(o, 1);
    }

    // 从队列的头部移除元素
    @Override
    public Future<V> removeFirstAsync() {
        return pollAsync();
    }

    // 从队列的尾部移除元素
    @Override
    public Future<V> removeLastAsync() {
        return commandExecutor.writeAsync(getName(), RedisCommands.RPOP, getName());
    }

    @Override
    public V removeLast() {
        V value = get(removeLastAsync());
        if (value == null) {
            throw new NoSuchElementException();
        }
        return value;
    }

    // 移除队列中最后一次出现的指定元素
    @Override
    public Future<Boolean> removeLastOccurrenceAsync(Object o) {
        return removeAsync(o, -1);
    }

    @Override
    public boolean removeLastOccurrence(Object o) {
        return remove(o, -1);
    }

}

3. 分布式Queue RQueue

package org.redisson;

import java.util.NoSuchElementException;

import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.RQueue;

import io.netty.util.concurrent.Future;


public class RedissonQueue<V> extends RedissonList<V> implements RQueue<V> {

    protected RedissonQueue(CommandExecutor commandExecutor, String name) {
        super(commandExecutor, name);
    }

    // 将元素添加到队列中
    @Override
    public boolean offer(V e) {
        // 调用父类方法,实际上是 RPUSH 命令将元素添加到列表末尾
        return add(e);
    }


    @Override
    public Future<Boolean> offerAsync(V e) {
        return addAsync(e);
    }


    // 获取队列的第一个元素
    public V getFirst() {
        V value = getValue(0);
        if (value == null) {
            throw new NoSuchElementException();
        }
        return value;
    }


    // 移除队列的第一个元素并返回
    public V removeFirst() {
        V value = poll();
        if (value == null) {
            throw new NoSuchElementException();
        }
        return value;
    }

    @Override
    public V remove() {
        return removeFirst();
    }

    // 移除并返回队列的第一个元素
    @Override
    public Future<V> pollAsync() {
        // LPOP 命令实现
        return commandExecutor.writeAsync(getName(), RedisCommands.LPOP, getName());
    }

    @Override
    public V poll() {
        return get(pollAsync());
    }

    // 获取队列第一个元素
    @Override
    public V element() {
        return getFirst();
    }

    // 查看队列第一个元素
    @Override
    public Future<V> peekAsync() {
        return getAsync(0);
    }

    // 查看队列第一个元素
    @Override
    public V peek() {
        if (isEmpty()) {
            return null;
        }
        return get(0);
    }

    // 下面几个是元素转移方法

    // 移除队列的最后一个元素,然后添加到另一个队列的头部
    @Override
    public V pollLastAndOfferFirstTo(String queueName) {
        return get(pollLastAndOfferFirstToAsync(queueName));
    }

    @Override
    public Future<V> pollLastAndOfferFirstToAsync(String queueName) {
        // RPOPLPUSH 命令实现
        return commandExecutor.writeAsync(getName(), RedisCommands.RPOPLPUSH, getName(), queueName);
    }

    // 将当前队列的最后一个元素移到另一个 RQueue 的头部
    @Override
    public Future<V> pollLastAndOfferFirstToAsync(RQueue<V> queue) {
        return pollLastAndOfferFirstToAsync(queue.getName());
    }

    @Override
    public V pollLastAndOfferFirstTo(RQueue<V> queue) {
        return pollLastAndOfferFirstTo(queue.getName());
    }

}