Redisson2.0源码分析11-分布式Queue
Redisson2.0源码分析11-分布式Queue
几个分布式 Queue 相关的工具类,底层的数据存储跟 RList 保持一致,也是采用 Redis List 进行数据存储。
在数据操作上,也是调用 Redis List 相关的命令进行操作。
1. 分布式阻塞队列 RBlockingQueue
依旧是对 Redis List 结构的封装。
1.1 源码分析
package org.redisson;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.decoder.ListDrainToDecoder;
import org.redisson.core.RBlockingQueue;
import io.netty.util.concurrent.Future;
public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlockingQueue<V> {
protected RedissonBlockingQueue(CommandExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
@Override
public Future<Boolean> putAsync(V e) {
return offerAsync(e);
}
@Override
public void put(V e) throws InterruptedException {
offer(e);
}
@Override
public boolean offer(V e, long timeout, TimeUnit unit) throws InterruptedException {
return offer(e);
}
// 从队列中获取并移除头部元素
@Override
public Future<V> takeAsync() {
// 使用 Redis 的 BLPOP 命令实现阻塞获取操作
return commandExecutor.writeAsync(getName(), RedisCommands.BLPOP_VALUE, getName(), 0);
}
@Override
public V take() throws InterruptedException {
Future<V> res = takeAsync();
return res.await().getNow();
}
// 从队列中获取并移除头部元素,带有超时机制
@Override
public Future<V> pollAsync(long timeout, TimeUnit unit) {
// 使用Redis的BLPOP命令,指定超时时间
return commandExecutor.writeAsync(getName(), RedisCommands.BLPOP_VALUE, getName(), unit.toSeconds(timeout));
}
@Override
public V poll(long timeout, TimeUnit unit) throws InterruptedException {
Future<V> res = pollAsync(timeout, unit);
return res.await().getNow();
}
@Override
public V pollLastAndOfferFirstTo(RBlockingQueue<V> queue, long timeout, TimeUnit unit)
throws InterruptedException {
return pollLastAndOfferFirstTo(queue.getName(), timeout, unit);
}
// 从当前队列中移除尾部元素,并将其添加到另一个指定队列的头部
@Override
public Future<V> pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) {
// 使用 Redis 的 BRPOPLPUSH 命令实现
return commandExecutor.writeAsync(getName(), RedisCommands.BRPOPLPUSH, getName(), queueName, unit.toSeconds(timeout));
}
@Override
public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException {
Future<V> res = pollLastAndOfferFirstToAsync(queueName, timeout, unit);
return res.await().getNow();
}
@Override
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
// 将队列中的所有元素移除并添加到指定集合中
@Override
public int drainTo(Collection<? super V> c) {
return get(drainToAsync(c));
}
// 将队列中的所有元素移除并添加到指定集合中
@Override
public Future<Integer> drainToAsync(Collection<? super V> c) {
if (c == null) {
throw new NullPointerException();
}
// 执行 lua 脚本
// 主要是通过 LRANGE 命令获取所有元素,再通过 LTRIM 命令清空队列
return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c)),
"local vals = redis.call('lrange', KEYS[1], 0, -1); " +
"redis.call('ltrim', KEYS[1], -1, 0); " +
"return vals", Collections.<Object>singletonList(getName()));
}
@Override
public int drainTo(Collection<? super V> c, int maxElements) {
if (maxElements <= 0) {
return 0;
}
return get(drainToAsync(c, maxElements));
}
// 将队列中最多 maxElements 个元素移除然后添加到指定集合中
@Override
public Future<Integer> drainToAsync(Collection<? super V> c, int maxElements) {
if (c == null) {
throw new NullPointerException();
}
// 通过 LRANGE 命令获取指定数量的元素,再通过 LTRIM 命令移除这些元素
return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c)),
"local elemNum = math.min(ARGV[1], redis.call('llen', KEYS[1])) - 1;" +
"local vals = redis.call('lrange', KEYS[1], 0, elemNum); " +
"redis.call('ltrim', KEYS[1], elemNum + 1, -1); " +
"return vals",
Collections.<Object>singletonList(getName()), maxElements);
}
}
1.2 lua 源码分析
主要注意下面这几个 lua 脚本
1.2.1 将Redis列表中的所有元素移除并返回 drainTo
-- 使用 LRANGE 命令获取列表中所有元素,保存在vals
local vals = redis.call('lrange', KEYS[1], 0, -1);
--- 使用 LTRIM 命令清空列表。 -1, 0表示保留从索引-1到0之间的元素,清空列表
redis.call('ltrim', KEYS[1], -1, 0);
--- 返回获取的所有元素
return vals
1.2.2 将Redis列表中的最多maxElements个元素移除并返回 drainTo
--- 计算要移除的元素数量。使用LLEN命令获取列表长度,并与ARGV[1] (即maxElements) 去进行比较,取较小值
-- 减1是因为Lua的索引从1开始,而Redis的索引从0开始。
local elemNum = math.min(ARGV[1], redis.call('llen', KEYS[1])) - 1;
--- 使用LRANGE命令获取从头部开始的指定个元素
local vals = redis.call('lrange', KEYS[1], 0, elemNum);
--- 使用 LTRIM 命令移除已获取的元素,保留从 elemNum+1 到列表末尾的元素
redis.call('ltrim', KEYS[1], elemNum + 1, -1);
---- 返回获取的元素
return vals
2. 分布式双端队列 RDeque
RBlockingQueue 的实现,说明还是基于 Redis List 实现,并且大部分的逻辑都已经在父类中了,这个类中不会有太多的逻辑。
package org.redisson;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.TrueReplayConvertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.connection.decoder.ListFirstObjectDecoder;
import org.redisson.core.RDeque;
import io.netty.util.concurrent.Future;
public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
private static final RedisCommand<Void> LPUSH_VOID = new RedisCommand<Void>("LPUSH", new VoidReplayConvertor());
private static final RedisCommand<Boolean> LPUSH_BOOLEAN = new RedisCommand<Boolean>("LPUSH", new TrueReplayConvertor());
private static final RedisCommand<Void> RPUSH_VOID = new RedisCommand<Void>("RPUSH", new VoidReplayConvertor(), 2, ValueType.OBJECTS);
private static final RedisCommand<Object> LRANGE_SINGLE = new RedisCommand<Object>("LRANGE", new ListFirstObjectDecoder());
protected RedissonDeque(CommandExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
//同步将元素添加到队列的头部
@Override
public void addFirst(V e) {
get(addFirstAsync(e));
}
@Override
public Future<Void> addFirstAsync(V e) {
// LPUSH 命令实现
return commandExecutor.writeAsync(getName(), LPUSH_VOID, getName(), e);
}
//同步将元素添加到队列的尾部
@Override
public void addLast(V e) {
get(addLastAsync(e));
}
@Override
public Future<Void> addLastAsync(V e) {
// RPUSH命令实现
return commandExecutor.writeAsync(getName(), RPUSH_VOID, getName(), e);
}
// 返回一个从队列尾部到头部的迭代器
@Override
public Iterator<V> descendingIterator() {
return new Iterator<V>() {
private int currentIndex = size();
private boolean removeExecuted;
@Override
public boolean hasNext() {
int size = size();
return currentIndex > 0 && size > 0;
}
@Override
public V next() {
if (!hasNext()) {
throw new NoSuchElementException("No such element at index " + currentIndex);
}
currentIndex--;
removeExecuted = false;
return RedissonDeque.this.get(currentIndex);
}
@Override
public void remove() {
if (removeExecuted) {
throw new IllegalStateException("Element been already deleted");
}
RedissonDeque.this.remove(currentIndex);
currentIndex++;
removeExecuted = true;
}
};
}
// 获取队列的最后一个元素
@Override
public Future<V> getLastAsync() {
// LRANGE命令获取最后一个元素
return commandExecutor.readAsync(getName(), LRANGE_SINGLE, getName(), -1, -1);
}
@Override
public V getLast() {
V result = get(getLastAsync());
if (result == null) {
throw new NoSuchElementException();
}
return result;
}
// 尝试将元素添加到队列的头部
@Override
public boolean offerFirst(V e) {
return get(offerFirstAsync(e));
}
@Override
public Future<Boolean> offerFirstAsync(V e) {
return commandExecutor.writeAsync(getName(), LPUSH_BOOLEAN, getName(), e);
}
// 尝试将元素添加到队列的尾部
@Override
public Future<Boolean> offerLastAsync(V e) {
return offerAsync(e);
}
@Override
public boolean offerLast(V e) {
return get(offerLastAsync(e));
}
@Override
public Future<V> peekFirstAsync() {
return getAsync(0);
}
// 查看队列的第一个元素
@Override
public V peekFirst() {
return get(peekFirstAsync());
}
@Override
public Future<V> peekLastAsync() {
return getLastAsync();
}
// 查看队列的最后个元素
@Override
public V peekLast() {
return get(getLastAsync());
}
@Override
public Future<V> pollFirstAsync() {
return pollAsync();
}
// 移除队列头部元素
@Override
public V pollFirst() {
return poll();
}
@Override
public Future<V> pollLastAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.RPOP, getName());
}
// 移除队列尾部元素
@Override
public V pollLast() {
return get(pollLastAsync());
}
// 从队列的头部移除并返回元素
@Override
public Future<V> popAsync() {
return pollAsync();
}
@Override
public V pop() {
return removeFirst();
}
// 将元素推入队列的头部
@Override
public Future<Void> pushAsync(V e) {
return addFirstAsync(e);
}
@Override
public void push(V e) {
addFirst(e);
}
// 移除队列中第一次出现的指定元素
@Override
public Future<Boolean> removeFirstOccurrenceAsync(Object o) {
return removeAsync(o, 1);
}
@Override
public boolean removeFirstOccurrence(Object o) {
return remove(o, 1);
}
// 从队列的头部移除元素
@Override
public Future<V> removeFirstAsync() {
return pollAsync();
}
// 从队列的尾部移除元素
@Override
public Future<V> removeLastAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.RPOP, getName());
}
@Override
public V removeLast() {
V value = get(removeLastAsync());
if (value == null) {
throw new NoSuchElementException();
}
return value;
}
// 移除队列中最后一次出现的指定元素
@Override
public Future<Boolean> removeLastOccurrenceAsync(Object o) {
return removeAsync(o, -1);
}
@Override
public boolean removeLastOccurrence(Object o) {
return remove(o, -1);
}
}
3. 分布式Queue RQueue
package org.redisson;
import java.util.NoSuchElementException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.RQueue;
import io.netty.util.concurrent.Future;
public class RedissonQueue<V> extends RedissonList<V> implements RQueue<V> {
protected RedissonQueue(CommandExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
// 将元素添加到队列中
@Override
public boolean offer(V e) {
// 调用父类方法,实际上是 RPUSH 命令将元素添加到列表末尾
return add(e);
}
@Override
public Future<Boolean> offerAsync(V e) {
return addAsync(e);
}
// 获取队列的第一个元素
public V getFirst() {
V value = getValue(0);
if (value == null) {
throw new NoSuchElementException();
}
return value;
}
// 移除队列的第一个元素并返回
public V removeFirst() {
V value = poll();
if (value == null) {
throw new NoSuchElementException();
}
return value;
}
@Override
public V remove() {
return removeFirst();
}
// 移除并返回队列的第一个元素
@Override
public Future<V> pollAsync() {
// LPOP 命令实现
return commandExecutor.writeAsync(getName(), RedisCommands.LPOP, getName());
}
@Override
public V poll() {
return get(pollAsync());
}
// 获取队列第一个元素
@Override
public V element() {
return getFirst();
}
// 查看队列第一个元素
@Override
public Future<V> peekAsync() {
return getAsync(0);
}
// 查看队列第一个元素
@Override
public V peek() {
if (isEmpty()) {
return null;
}
return get(0);
}
// 下面几个是元素转移方法
// 移除队列的最后一个元素,然后添加到另一个队列的头部
@Override
public V pollLastAndOfferFirstTo(String queueName) {
return get(pollLastAndOfferFirstToAsync(queueName));
}
@Override
public Future<V> pollLastAndOfferFirstToAsync(String queueName) {
// RPOPLPUSH 命令实现
return commandExecutor.writeAsync(getName(), RedisCommands.RPOPLPUSH, getName(), queueName);
}
// 将当前队列的最后一个元素移到另一个 RQueue 的头部
@Override
public Future<V> pollLastAndOfferFirstToAsync(RQueue<V> queue) {
return pollLastAndOfferFirstToAsync(queue.getName());
}
@Override
public V pollLastAndOfferFirstTo(RQueue<V> queue) {
return pollLastAndOfferFirstTo(queue.getName());
}
}