Redisson2.0源码分析15-其他分布式相关工具
Redisson2.0源码分析15-其他分布式相关工具
这篇主要是一些不太好具体分类的分布式工具或者特性支持。
目前有如下几个:
- 分布式对象过期特性支持 RedissonExpirable
- 单个值分布式对象 RBucket
- 基数统计器 RHyperLogLog
- 键管理 RKeys
- 批量命令执行 RBatch
- 执行Lua脚本 RScript
这些工具类的源码都不是特别的复杂,逻辑比较少。 稍微了解下即可。
1. 分布式对象过期特性支持 RedissonExpirable
package org.redisson;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.RExpirable;
import io.netty.util.concurrent.Future;
abstract class RedissonExpirable extends RedissonObject implements RExpirable {
RedissonExpirable(CommandExecutor connectionManager, String name) {
super(connectionManager, name);
}
@Override
public boolean expire(long timeToLive, TimeUnit timeUnit) {
// 同步设置对象的过期时间
// 实际上调用异步方法并等待结果
return commandExecutor.get(expireAsync(timeToLive, timeUnit));
}
@Override
public Future<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit) {
// 异步设置对象的过期时间,使用 Redis 的 EXPIRE 命令
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.EXPIRE, getName(), timeUnit.toSeconds(timeToLive));
}
@Override
public boolean expireAt(long timestamp) {
// 设置指定的时间过期
return commandExecutor.get(expireAtAsync(timestamp));
}
@Override
public Future<Boolean> expireAtAsync(long timestamp) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.EXPIREAT, getName(), timestamp);
}
@Override
public boolean expireAt(Date timestamp) {
// 设置指定的时间过期,接收 Date 入参。 同步
return expireAt(timestamp.getTime() / 1000);
}
@Override
public Future<Boolean> expireAtAsync(Date timestamp) {
// 设置指定的时间过期,接收 Date 入参。 异步
return expireAtAsync(timestamp.getTime() / 1000);
}
@Override
public boolean clearExpire() {
// 清除对象的过期时间。 同步
return commandExecutor.get(clearExpireAsync());
}
@Override
public Future<Boolean> clearExpireAsync() {
// 清除对象的过期时间。使用Redis的PERSIST命令。 异步
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.PERSIST, getName());
}
@Override
public long remainTimeToLive() {
// 获取对象的剩余生存时间。同步
return commandExecutor.get(remainTimeToLiveAsync());
}
@Override
public Future<Long> remainTimeToLiveAsync() {
// 获取对象的剩余生存时间。 使用Redis的TTL命令。 异步
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.TTL, getName());
}
}
2. 单个值分布式对象 RBucket
存储和管理单个对象的 RBucket,没什么逻辑。
package org.redisson;
import java.util.concurrent.TimeUnit;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.RBucket;
import io.netty.util.concurrent.Future;
public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
protected RedissonBucket(CommandExecutor connectionManager, String name) {
super(connectionManager, name);
}
// 使用 Redis 命令 GET 获取对象
@Override
public V get() {
return get(getAsync());
}
@Override
public Future<V> getAsync() {
return commandExecutor.readAsync(getName(), RedisCommands.GET, getName());
}
// Redis命令SET保存对象
@Override
public void set(V value) {
get(setAsync(value));
}
@Override
public Future<Void> setAsync(V value) {
return commandExecutor.writeAsync(getName(), RedisCommands.SET, getName(), value);
}
// Redis命令SET异步保存对象,指定过期时间
@Override
public void set(V value, long timeToLive, TimeUnit timeUnit) {
get(setAsync(value, timeToLive, timeUnit));
}
@Override
public Future<Void> setAsync(V value, long timeToLive, TimeUnit timeUnit) {
return commandExecutor.writeAsync(getName(), RedisCommands.SETEX, getName(), timeUnit.toSeconds(timeToLive), value);
}
// Redis命令EXISTS 检查对象是否存在
@Override
public boolean exists() {
return get(existsAsync());
}
@Override
public Future<Boolean> existsAsync() {
return commandExecutor.readAsync(getName(), RedisCommands.EXISTS, getName());
}
}
3. 基数统计器 RHyperLogLog
HyperLogLog 本身是一种估算算法,但是 Redis 中将 HyperLogLog 定义成一种数据结构。
在 Redis 中 HyperLogLog 是一种用于估算集合中唯一元素数量的数据结构。 比如能够在内存占用很小的情况下,快速估算集合中不同元素的数量。
所以它的优点是占用内存小且计算速度快,缺点是结果是近似值,而不是精确值, 适合用于对精度要求不高的场景。
在实际项目中具备一定的使用场景,但是不是特别的常见。
package org.redisson;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.RHyperLogLog;
import io.netty.util.concurrent.Future;
public class RedissonHyperLogLog<V> extends RedissonExpirable implements RHyperLogLog<V> {
protected RedissonHyperLogLog(CommandExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
// 将一个对象添加到 HyperLogLog 中
@Override
public boolean add(V obj) {
return get(addAsync(obj));
}
// 将多个对象添加到 HyperLogLog 中
@Override
public boolean addAll(Collection<V> objects) {
return get(addAllAsync(objects));
}
// 估算 HyperLogLog 中不重复元素的数量
@Override
public long count() {
return get(countAsync());
}
// 估算当前 HyperLogLog 跟其他 HyperLogLog 合并基数
@Override
public long countWith(String... otherLogNames) {
return get(countWithAsync(otherLogNames));
}
// 将当前的 HyperLogLog 跟其他的 HyperLogLog 合并
@Override
public void mergeWith(String... otherLogNames) {
get(mergeWithAsync(otherLogNames));
}
@Override
public Future<Boolean> addAsync(V obj) {
return commandExecutor.writeAsync(getName(), RedisCommands.PFADD, getName(), obj);
}
@Override
public Future<Boolean> addAllAsync(Collection<V> objects) {
List<Object> args = new ArrayList<Object>(objects.size() + 1);
args.add(getName());
args.addAll(objects);
// PFADD 命令添加对象
return commandExecutor.writeAsync(getName(), RedisCommands.PFADD, getName(), args.toArray());
}
@Override
public Future<Long> countAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.PFCOUNT, getName());
}
@Override
public Future<Long> countWithAsync(String... otherLogNames) {
List<Object> args = new ArrayList<Object>(otherLogNames.length + 1);
args.add(getName());
args.addAll(Arrays.asList(otherLogNames));
// PFCOUNT 命令获取基数估算值
return commandExecutor.writeAsync(getName(), RedisCommands.PFCOUNT, args.toArray());
}
@Override
public Future<Void> mergeWithAsync(String... otherLogNames) {
List<Object> args = new ArrayList<Object>(otherLogNames.length + 1);
args.add(getName());
args.addAll(Arrays.asList(otherLogNames));
// PFMERGE 合并多个 HyperLogLog
return commandExecutor.writeAsync(getName(), RedisCommands.PFMERGE, args.toArray());
}
}
4. 键管理 RKeys
这个类提供了对 Redis 键的管理操作,包括查找、删除和随机获取键等功能。
package org.redisson;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicLong;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.core.RKeys;
import org.redisson.misc.CompositeIterable;
import io.netty.util.concurrent.Future;
public class RedissonKeys implements RKeys {
private final CommandExecutor commandExecutor;
public RedissonKeys(CommandExecutor commandExecutor) {
super();
this.commandExecutor = commandExecutor;
}
// 获取匹配特定模式的所有键
@Override
public Iterable<String> getKeysByPattern(final String pattern) {
List<Iterable<String>> iterables = new ArrayList<Iterable<String>>();
for (final Integer slot : commandExecutor.getConnectionManager().getEntries().keySet()) {
Iterable<String> iterable = new Iterable<String>() {
@Override
public Iterator<String> iterator() {
return createKeysIterator(slot, pattern);
}
};
iterables.add(iterable);
}
return new CompositeIterable<String>(iterables);
}
// 取所有的键
@Override
public Iterable<String> getKeys() {
List<Iterable<String>> iterables = new ArrayList<Iterable<String>>();
for (final Integer slot : commandExecutor.getConnectionManager().getEntries().keySet()) {
Iterable<String> iterable = new Iterable<String>() {
@Override
public Iterator<String> iterator() {
return createKeysIterator(slot, null);
}
};
iterables.add(iterable);
}
return new CompositeIterable<String>(iterables);
}
// 获取一个遍历指定槽位中键的迭代器
private ListScanResult<String> scanIterator(int slot, long startPos, String pattern) {
if (pattern == null) {
return commandExecutor.write(slot, StringCodec.INSTANCE, RedisCommands.SCAN, startPos);
}
return commandExecutor.write(slot, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern);
}
private Iterator<String> createKeysIterator(final int slot, final String pattern) {
return new Iterator<String>() {
private Iterator<String> iter;
private Long iterPos;
private boolean removeExecuted;
private String value;
@Override
public boolean hasNext() {
if (iter == null) {
ListScanResult<String> res = scanIterator(slot, 0, pattern);
iter = res.getValues().iterator();
iterPos = res.getPos();
} else if (!iter.hasNext() && iterPos != 0) {
ListScanResult<String> res = scanIterator(slot, iterPos, pattern);
iter = res.getValues().iterator();
iterPos = res.getPos();
}
return iter.hasNext();
}
@Override
public String next() {
if (!hasNext()) {
throw new NoSuchElementException("No such element");
}
value = iter.next();
removeExecuted = false;
return value;
}
@Override
public void remove() {
if (removeExecuted) {
throw new IllegalStateException("Element been already deleted");
}
iter.remove();
delete(value);
removeExecuted = true;
}
};
}
// 随机获取一个 key
@Override
public String randomKey() {
return commandExecutor.get(randomKeyAsync());
}
@Override
public Future<String> randomKeyAsync() {
// 使用 Redis 命令 RANDOMKEY 获取一个随机 key
return commandExecutor.readRandomAsync(RedisCommands.RANDOM_KEY);
}
// 查找符合模式的 key
// 比如如下模式
// h?llo 可以匹配 hello, hallo and hxllo
// h*llo 可以匹配 hllo and heeeello
// h[ae]llo s可以匹配 hello 跟 hallo, 但是不能匹配到 hillo
@Override
public Collection<String> findKeysByPattern(String pattern) {
return commandExecutor.get(findKeysByPatternAsync(pattern));
}
@Override
public Future<Collection<String>> findKeysByPatternAsync(String pattern) {
// 使用 Redis 命令 KEYS 异步查找匹配的 key
return commandExecutor.readAllAsync(RedisCommands.KEYS, pattern);
}
// 删除指定模式的 key
// 模式跟上面扫描的是一样的
@Override
public long deleteByPattern(String pattern) {
return commandExecutor.get(deleteByPatternAsync(pattern));
}
@Override
public Future<Long> deleteByPatternAsync(String pattern) {
return commandExecutor.evalWriteAllAsync(RedisCommands.EVAL_INTEGER, new SlotCallback<Long, Long>() {
AtomicLong results = new AtomicLong();
@Override
public void onSlotResult(Long result) {
results.addAndGet(result);
}
@Override
public Long onFinish() {
return results.get();
}
// 使用 Lua 脚本异步删除匹配的 key
}, "local keys = redis.call('keys', ARGV[1]) "
+ "local n = 0 "
+ "for i=1, table.getn(keys),5000 do "
+ "n = n + redis.call('del', unpack(keys, i, math.min(i+4999, table.getn(keys)))) "
+ "end "
+ "return n;",Collections.emptyList(), pattern);
}
// 删除多个 key
@Override
public long delete(String ... keys) {
return commandExecutor.get(deleteAsync(keys));
}
@Override
public Future<Long> deleteAsync(String ... keys) {
// 使用 Redis 命令 DEL 删除指定的 key
return commandExecutor.writeAllAsync(RedisCommands.DEL, new SlotCallback<Long, Long>() {
AtomicLong results = new AtomicLong();
@Override
public void onSlotResult(Long result) {
results.addAndGet(result);
}
@Override
public Long onFinish() {
return results.get();
}
}, (Object[])keys);
}
}
主要看这个删除符合模式的 key lua脚本:
假设要删除所有以 【user:*】 开头的 key,传递的参数 ARGV[1] 为 【user:*】。
--- 使用 Redis 命令 KEYS 查找所有匹配 ARGV[1] 模式的 key,将结果存储在 keys 中
local keys = redis.call('keys', ARGV[1])
--- 初始化一个计数器n,用于记录删除的键的数量。
local n = 0
--- for循环遍历 keys,每次步长为5000。 table.getn(keys) 返回 keys 的长度
for i=1, table.getn(keys),5000 do
--- 使用DEL命令删除从 i 到 i+4999 之间的 key
--- unpack 可以理解为从数组中取指定下标的元素值,确保不超过数组的长度
--- 把删除 key 的数量累加到 n 里面
n = n + redis.call('del', unpack(keys, i, math.min(i+4999, table.getn(keys))))
end
return n;
这里删除看着也有问题:
- KEYS 命令会扫描 Redis 数据库中的所有 key,key 数量多的时候可能导致性能问题和阻塞。
- 虽然做了每次最多删除 5000 个 key,但是批量删除在键数量非常多时 , 可能导致 Redis 负载过高。
不过即使替换成 scan 命令,在大批量删除的时候性能稍微好一点点。
5. 批量命令执行 RBatch
这个类主要是批量执行 Redis 命令,原理就是通过分布式对象的构造函数传入 CommandBatchExecutorService 来达到批量积攒命令的目的。
package org.redisson;
import java.util.List;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.RAtomicLongAsync;
import org.redisson.core.RBatch;
import org.redisson.core.RBlockingQueueAsync;
import org.redisson.core.RBucketAsync;
import org.redisson.core.RDequeAsync;
import org.redisson.core.RHyperLogLogAsync;
import org.redisson.core.RKeysAsync;
import org.redisson.core.RListAsync;
import org.redisson.core.RMapAsync;
import org.redisson.core.RQueueAsync;
import org.redisson.core.RScriptAsync;
import org.redisson.core.RSetAsync;
import org.redisson.core.RTopicAsync;
import io.netty.util.concurrent.Future;
public class RedissonBatch implements RBatch {
// 这个是核心,批量命令执行服务
private final CommandBatchExecutorService executorService;
public RedissonBatch(ConnectionManager connectionManager) {
this.executorService = new CommandBatchExecutorService(connectionManager);
}
@Override
public <V> RBucketAsync<V> getBucket(String name) {
return new RedissonBucket<V>(executorService, name);
}
@Override
public <V> RHyperLogLogAsync<V> getHyperLogLog(String name) {
return new RedissonHyperLogLog<V>(executorService, name);
}
@Override
public <V> RListAsync<V> getList(String name) {
return new RedissonList<V>(executorService, name);
}
@Override
public <K, V> RMapAsync<K, V> getMap(String name) {
return new RedissonMap<K, V>(executorService, name);
}
@Override
public <V> RSetAsync<V> getSet(String name) {
return new RedissonSet<V>(executorService, name);
}
@Override
public <M> RTopicAsync<M> getTopic(String name) {
return new RedissonTopic<M>(executorService, name);
}
@Override
public <V> RQueueAsync<V> getQueue(String name) {
return new RedissonQueue<V>(executorService, name);
}
@Override
public <V> RBlockingQueueAsync<V> getBlockingQueue(String name) {
return new RedissonBlockingQueue<V>(executorService, name);
}
@Override
public <V> RDequeAsync<V> getDequeAsync(String name) {
return new RedissonDeque<V>(executorService, name);
}
@Override
public RAtomicLongAsync getAtomicLongAsync(String name) {
return new RedissonAtomicLong(executorService, name);
}
@Override
public RScriptAsync getScript() {
return new RedissonScript(executorService);
}
@Override
public RKeysAsync getKeys() {
return new RedissonKeys(executorService);
}
// 调用执行命令,这里实际上调用的是批量命令执行服务。
// 会将队列里的命令一次性执行
@Override
public List<?> execute() {
return executorService.execute();
}
@Override
public Future<List<?>> executeAsync() {
return executorService.executeAsync();
}
}
6. 执行Lua脚本 RScript
这个类主要是用于在 Redis中执行Lua脚本。
package org.redisson;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.RScript;
import io.netty.util.concurrent.Future;
public class RedissonScript implements RScript {
// 用于执行 Redis 命令执行
private final CommandExecutor commandExecutor;
protected RedissonScript(CommandExecutor commandExecutor) {
this.commandExecutor = commandExecutor;
}
@Override
public String scriptLoad(String luaScript) {
return commandExecutor.get(scriptLoadAsync(luaScript));
}
public String scriptLoad(String key, String luaScript) {
return commandExecutor.get(scriptLoadAsync(key, luaScript));
}
// 加载 Lua 脚本到所有 Redis 节点服务器,通过回调机制获取毁掉结果
@Override
public Future<String> scriptLoadAsync(String luaScript) {
return commandExecutor.writeAllAsync(RedisCommands.SCRIPT_LOAD, new SlotCallback<String, String>() {
volatile String result;
@Override
public void onSlotResult(String result) {
this.result = result;
}
@Override
public String onFinish() {
return result;
}
}, luaScript);
}
// 加载 Lua 脚本到Redis服务器,并返回 lua 脚本的 SHA1 摘要
public Future<String> scriptLoadAsync(String key, String luaScript) {
return commandExecutor.writeAsync(key, RedisCommands.SCRIPT_LOAD, luaScript);
}
// 执行Lua脚本
@Override
public <R> R eval(Mode mode, String luaScript, ReturnType returnType) {
// 传入null表示不指定键
return eval(null, mode, luaScript, returnType);
}
public <R> R eval(String key, Mode mode, String luaScript, ReturnType returnType) {
return eval(key, mode, luaScript, returnType, Collections.emptyList());
}
@Override
public <R> R eval(Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values) {
return eval(null, mode, luaScript, returnType, keys, values);
}
public <R> R eval(String key, Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values) {
return (R) commandExecutor.get(evalAsync(key, mode, luaScript, returnType, keys, values));
}
@Override
public <R> Future<R> evalAsync(Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values) {
return evalAsync(null, mode, luaScript, returnType, keys, values);
}
public <R> Future<R> evalAsync(String key, Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values) {
if (mode == Mode.READ_ONLY) {
return commandExecutor.evalReadAsync(key, returnType.getCommand(), luaScript, keys, values);
}
return commandExecutor.evalWriteAsync(key, returnType.getCommand(), luaScript, keys, values);
}
// 执行已加载的指定 Lua 脚本,这里是通过 SHA1摘要 值指定一个脚本
@Override
public <R> R evalSha(Mode mode, String shaDigest, ReturnType returnType) {
return evalSha(null, mode, shaDigest, returnType);
}
public <R> R evalSha(String key, Mode mode, String shaDigest, ReturnType returnType) {
return evalSha(key, mode, shaDigest, returnType, Collections.emptyList());
}
@Override
public <R> R evalSha(Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values) {
return evalSha(null, mode, shaDigest, returnType, keys, values);
}
public <R> R evalSha(String key, Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values) {
return (R) commandExecutor.get(evalShaAsync(key, mode, shaDigest, returnType, keys, values));
}
@Override
public <R> Future<R> evalShaAsync(Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values) {
return evalShaAsync(null, mode, shaDigest, returnType, keys, values);
}
public <R> Future<R> evalShaAsync(String key, Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values) {
RedisCommand command = new RedisCommand(returnType.getCommand(), "EVALSHA");
if (mode == Mode.READ_ONLY) {
return commandExecutor.evalReadAsync(key, command, shaDigest, keys, values);
}
return commandExecutor.evalWriteAsync(key, command, shaDigest, keys, values);
}
// 终止当前正在执行的 lua 脚本
@Override
public void scriptKill() {
commandExecutor.get(scriptKillAsync());
}
public void scriptKill(String key) {
commandExecutor.get(scriptKillAsync(key));
}
@Override
public Future<Void> scriptKillAsync() {
return commandExecutor.writeAllAsync(RedisCommands.SCRIPT_KILL);
}
public Future<Void> scriptKillAsync(String key) {
return commandExecutor.writeAsync(key, RedisCommands.SCRIPT_KILL);
}
// 查看指定的 Lua 脚本是否已加载到 Redis
@Override
public List<Boolean> scriptExists(String ... shaDigests) {
return commandExecutor.get(scriptExistsAsync(shaDigests));
}
@Override
public Future<List<Boolean>> scriptExistsAsync(final String ... shaDigests) {
// SCRIPT_EXISTS 命令查看脚本是否存在
return commandExecutor.writeAllAsync(RedisCommands.SCRIPT_EXISTS, new SlotCallback<List<Boolean>, List<Boolean>>() {
volatile List<Boolean> result = new ArrayList<Boolean>(shaDigests.length);
@Override
public synchronized void onSlotResult(List<Boolean> result) {
for (int i = 0; i < result.size(); i++) {
if (this.result.size() == i) {
this.result.add(false);
}
this.result.set(i, this.result.get(i) | result.get(i));
}
}
@Override
public List<Boolean> onFinish() {
return new ArrayList<Boolean>(result);
}
}, (Object[])shaDigests);
}
public List<Boolean> scriptExists(String key, String ... shaDigests) {
return commandExecutor.get(scriptExistsAsync(key, shaDigests));
}
public Future<List<Boolean>> scriptExistsAsync(String key, String ... shaDigests) {
return commandExecutor.writeAsync(key, RedisCommands.SCRIPT_EXISTS, shaDigests);
}
// 清除Redis中所有已加载的Lua脚本
@Override
public void scriptFlush() {
commandExecutor.get(scriptFlushAsync());
}
public void scriptFlush(String key) {
commandExecutor.get(scriptFlushAsync(key));
}
@Override
public Future<Void> scriptFlushAsync() {
return commandExecutor.writeAllAsync(RedisCommands.SCRIPT_FLUSH);
}
// @Override
public Future<Void> scriptFlushAsync(String key) {
return commandExecutor.writeAsync(key, RedisCommands.SCRIPT_FLUSH);
}
@Override
public <R> Future<R> evalShaAsync(Mode mode, String shaDigest, ReturnType returnType) {
return evalShaAsync(null, mode, shaDigest, returnType, Collections.emptyList());
}
@Override
public <R> Future<R> evalAsync(Mode mode, String luaScript, ReturnType returnType) {
return evalAsync(null, mode, luaScript, returnType, Collections.emptyList());
}
}