Redisson2.0源码分析9-分布式锁
Redisson2.0源码分析9-分布式锁
大部分人接触到 Redisson 估计都是从 Redisson 支持分布锁这个特性,所以 Redisson 在这块肯定是实现上比较可靠的。
这个版本的分布式锁比 Redisson V1 的逻辑要完善许多,代码量也是比之前多很多。 实现方式上已经从 setnx 命令实现分布式锁切换到使用 lua 脚本 去实现分布式锁。
1. 源码定义
package org.redisson;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.core.RLock;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
public class RedissonLock extends RedissonExpirable implements RLock {
// 默认锁过期 30 秒
public static final long LOCK_EXPIRATION_INTERVAL_SECONDS = 30;
// 保存每个锁的刷新任务,定期刷新锁的过期时间,防止锁意外释放。
private static final ConcurrentMap<String, Timeout> refreshTaskMap = PlatformDependent.newConcurrentHashMap();
// 锁的内部租约时间,毫秒单位,控制锁的自动释放时间
protected long internalLockLeaseTime = TimeUnit.SECONDS.toMillis(LOCK_EXPIRATION_INTERVAL_SECONDS);
private final UUID id; // 锁的唯一ID
private static final Integer unlockMessage = 0; // 解锁消息标识符
// 管理锁的状态和订阅
private static final ConcurrentMap<String, RedissonLockEntry> ENTRIES = PlatformDependent.newConcurrentHashMap();
protected RedissonLock(CommandExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name);
this.id = id;
}
// 取消锁的订阅.
// 当锁被释放时,检查是否需要取消频道的订阅。
private void unsubscribe() {
// 一直到取消成功为止
while (true) {
RedissonLockEntry entry = ENTRIES.get(getEntryName());
// 没有这个锁就直接返回
if (entry == null) {
return;
}
// 创建一个新的锁条目并释放锁
RedissonLockEntry newEntry = new RedissonLockEntry(entry);
newEntry.release();
// 使用CAS操作确保线程安全地更新锁条目
if (ENTRIES.replace(getEntryName(), entry, newEntry)) {
if (newEntry.isFree()
&& ENTRIES.remove(getEntryName(), newEntry)) {
synchronized (ENTRIES) {
// 是否需要取消订阅频道
if (!ENTRIES.containsKey(getEntryName())) {
// 取消订阅频道,释放资源,防止资源泄漏
commandExecutor.getConnectionManager().unsubscribe(getChannelName());
}
}
}
return;
}
}
}
// 获取锁的唯一标识符
private String getEntryName() {
return id + ":" + getName();
}
// 获取锁的条目
private Promise<Boolean> aquire() {
// 直到操作成功为止
while (true) {
RedissonLockEntry entry = ENTRIES.get(getEntryName());
// 如果锁条目不存在,返回null。 表示没有可用的锁
if (entry == null) {
return null;
}
// 创建一个新的锁条目并获取锁,增加引用计数
RedissonLockEntry newEntry = new RedissonLockEntry(entry);
newEntry.aquire();
if (ENTRIES.replace(getEntryName(), entry, newEntry)) {
// 成功更新锁条目后,返回Promise对象,用于异步操作的结果
return newEntry.getPromise();
}
}
}
// 订阅锁的 channel ,在锁释放时可以收到通知
private Future<Boolean> subscribe() {
Promise<Boolean> promise = aquire();
// 有锁条目,直接返回对应的Promise,表示已经订阅成功
if (promise != null) {
return promise;
}
Promise<Boolean> newPromise = newPromise();
final RedissonLockEntry value = new RedissonLockEntry(newPromise);
value.aquire();
// 创建新的Promise和锁条目,准备进行订阅
RedissonLockEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value);
if (oldValue != null) {
Promise<Boolean> oldPromise = aquire();
if (oldPromise == null) {
// 旧的Promise不存在,递归调用subscribe以确保订阅成功
return subscribe();
}
return oldPromise;
}
RedisPubSubListener<Integer> listener = new BaseRedisPubSubListener<Integer>() {
@Override
public void onMessage(String channel, Integer message) {
if (message.equals(unlockMessage) && getChannelName().equals(channel)) {
// 当收到解锁消息时,释放锁,通知等待的线程
value.getLatch().release();
}
}
@Override
public boolean onStatus(PubSubType type, String channel) {
if (channel.equals(getChannelName()) && !value.getPromise().isSuccess()) {
// 订阅成功时,设置Promise为成功状态,通知等待的线程
value.getPromise().setSuccess(true);
return true;
}
return false;
}
};
// 订阅锁的 channel ,以便在锁被释放时接收到通知
// 用 Redis 的发布/订阅机制来实现锁的通知
synchronized (ENTRIES) {
commandExecutor.getConnectionManager().subscribe(listener, getChannelName());
}
return newPromise;
}
// 生成锁 channel 名称
private String getChannelName() {
return "redisson__lock__channel__{" + getName() + "}";
}
// 获取锁,不响应中断
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 支持超时时间获取锁,不响应中断
@Override
public void lock(long leaseTime, TimeUnit unit) {
try {
lockInterruptibly(leaseTime, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 响应中断获取锁
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
// 获取锁,在等待锁的过程中响应中断。
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 计算 ttl 时间
Long ttl;
if (leaseTime != -1) {
ttl = tryLockInner(leaseTime, unit);
} else {
ttl = tryLockInner();
}
// 尝试获取锁,如果成功,ttl将为null
if (ttl == null) {
return;
}
// 等待锁释放消息
subscribe().awaitUninterruptibly();
try {
// 一直到拿到锁为止
while (true) {
// 尝试去获取锁
if (leaseTime != -1) {
ttl = tryLockInner(leaseTime, unit);
} else {
ttl = tryLockInner();
}
// ttl 为空意味着拿到了锁,不用再循环了
if (ttl == null) {
// 锁已成功获取,退出循环
break;
}
RedissonLockEntry entry = ENTRIES.get(getEntryName());
// ttl 是被别人占有的锁过期时间, 大于 0 则当前线程尝试等待这个时间。
// 等这个时间到了,立马就去抢锁。
if (ttl >= 0) {
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
// 无限制等待锁释放通知
entry.getLatch().acquire();
}
}
} finally {
// 确保在方法结束时取消订阅
unsubscribe();
}
}
// 尝试获取锁,会立马返回结果,返回拿到锁的状态
@Override
public boolean tryLock() {
return tryLockInner() == null;
}
// 尝试获取锁,返回锁的剩余存活时间
private Long tryLockInner() {
// 查询锁剩余时间
Long ttlRemaining = tryLockInner(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS);
// 如果锁成功获取,创建一个新的刷新任务,定期刷新锁的过期时间
if (ttlRemaining == null) {
newRefreshTask();
}
return ttlRemaining;
}
// 创建一个新的刷新任务,以定期刷新锁的过期时间
private void newRefreshTask() {
// 已经存在刷新任务,直接返回,避免重复创建
if (refreshTaskMap.containsKey(getName())) {
return;
}
// 定义刷新锁过期时间的定时任务,通过 netty 的时间轮去执行。
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 调用 expire 方法重新设置锁的过期时间,默认为 30,秒.
expire(internalLockLeaseTime, TimeUnit.MILLISECONDS);
// internalLockLeaseTime/3 是任务的执行间隔,默认情况下为 30/3=10秒,即10000毫秒。
refreshTaskMap.remove(getName());
newRefreshTask(); // reschedule itself
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
// 如果这个锁已经有刷新任务了,这次的刷新任务就取消掉
if (refreshTaskMap.putIfAbsent(getName(), task) != null) {
task.cancel();
}
}
// 通知刷新任务
private void stopRefreshTask() {
Timeout task = refreshTaskMap.remove(getName());
if (task != null) {
task.cancel();
}
}
// 尝试获取锁,获取成功返回剩余存活时间
private Long tryLockInner(final long leaseTime, final TimeUnit unit) {
internalLockLeaseTime = unit.toMillis(leaseTime);
// 使用Redis的EVAL命令执行Lua脚本,确保锁的获取操作是原子性的
// 如果锁不存在,则创建锁并设置过期时间
// 如果锁已存在且由当前线程持有,则增加持有计数
// 否则就会返回锁的剩余存活时间
// 调用参数 锁的名称,锁的持有者ID,由UUID和线程ID组成,锁的过期时间
return commandExecutor.evalWrite(getName(), RedisCommands.EVAL_INTEGER,
"local v = redis.call('get', KEYS[1]); " +
"if (v == false) then " +
" redis.call('set', KEYS[1], cjson.encode({['o'] = ARGV[1], ['c'] = 1}), 'px', ARGV[2]); " +
" return nil; " +
"else " +
" local o = cjson.decode(v); " +
" if (o['o'] == ARGV[1]) then " +
" o['c'] = o['c'] + 1; redis.call('set', KEYS[1], cjson.encode(o), 'px', ARGV[2]); " +
" return nil; " +
" end;" +
" return redis.call('pttl', KEYS[1]); " +
"end",
Collections.<Object>singletonList(getName()), id.toString() + "-" + Thread.currentThread().getId(), internalLockLeaseTime);
}
// 尝试在指定时间获取锁
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// 将等待时间转换为毫秒
long time = unit.toMillis(waitTime);
Long ttl;
if (leaseTime != -1) {
ttl = tryLockInner(leaseTime, unit);
} else {
ttl = tryLockInner();
}
// 如果拿到了锁,直接返回 true
if (ttl == null) {
return true;
}
if (!subscribe().awaitUninterruptibly(time, TimeUnit.MILLISECONDS)) {
// 如果在指定的等待时间内未能订阅到锁的释放通知,返回false
return false;
}
try {
while (true) {
// 再次尝试获取锁
if (leaseTime != -1) {
ttl = tryLockInner(leaseTime, unit);
} else {
ttl = tryLockInner();
}
if (ttl == null) {
// 如果锁成功获取,退出循环
break;
}
if (time <= 0) {
// 等待时间已耗尽,返回false
return false;
}
// 记录当前时间,用于计算等待时间
long current = System.currentTimeMillis();
RedissonLockEntry entry = ENTRIES.get(getEntryName());
if (ttl >= 0 && ttl < time) {
// 等待锁释放通知,或直到ttl过期
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
// 等待锁释放通知,或直到剩余等待时间耗尽
entry.getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// 计算等待操作消耗的时间
long elapsed = System.currentTimeMillis() - current;
// 减少剩余的等待时间
time -= elapsed;
}
// 成功获取锁,返回true
return true;
} finally {
// 结束时取消订阅
unsubscribe();
}
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return tryLock(time, -1, unit);
}
// 释放锁
@Override
public void unlock() {
// 执行 Redis lua 脚本
// 1. 先尝试获取锁的当前值,锁不存在就直接发布结果消息通知等待的线程。
// 2. 锁存在并且持有者是当前线程,减少持有技术,持有计数大于0更新锁的值跟剩余时间,同时返回 false 表示未释放。
// 3. 锁存在但不是当前线程持有,返回 nil 表示无法释放锁。
Boolean opStatus = commandExecutor.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN,
"local v = redis.call('get', KEYS[1]); " +
"if (v == false) then " +
" redis.call('publish', ARGV[4], ARGV[2]); " +
" return true; " +
"else " +
" local o = cjson.decode(v); " +
" if (o['o'] == ARGV[1]) then " +
" o['c'] = o['c'] - 1; " +
" if (o['c'] > 0) then " +
" redis.call('set', KEYS[1], cjson.encode(o), 'px', ARGV[3]); " +
" return false;"+
" else " +
" redis.call('del', KEYS[1]);" +
" redis.call('publish', ARGV[4], ARGV[2]); " +
" return true;"+
" end" +
" end;" +
" return nil; " +
"end",
Collections.<Object>singletonList(getName()), id.toString() + "-" + Thread.currentThread().getId(), unlockMessage, internalLockLeaseTime, getChannelName());
// 为空则表示无法释放锁
if (opStatus == null) {
throw new IllegalStateException("Can't unlock lock Current id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
// 为 true 停止锁续期任务
if (opStatus) {
stopRefreshTask();
}
}
@Override
public Condition newCondition() {
// TODO implement
throw new UnsupportedOperationException();
}
@Override
public void forceUnlock() {
get(forceUnlockAsync());
}
// 强制解锁
private Future<Boolean> forceUnlockAsync() {
// 停止锁续期任务
stopRefreshTask();
// 删除锁并且发布解锁消息
return commandExecutor.evalWriteAsync(getName(), RedisCommands.EVAL_BOOLEAN,
"redis.call('del', KEYS[1]); redis.call('publish', ARGV[2], ARGV[1]); return true",
Collections.<Object>singletonList(getName()), unlockMessage, getChannelName());
}
@Override
public boolean isLocked() {
// 判断锁是否存在
return commandExecutor.read(getName(), RedisCommands.EXISTS, getName());
}
// 判断锁是否当前线程持有
@Override
public boolean isHeldByCurrentThread() {
Boolean opStatus = commandExecutor.evalRead(getName(), RedisCommands.EVAL_BOOLEAN,
"local v = redis.call('get', KEYS[1]); " +
"if (v == false) then " +
" return false; " +
"else " +
" local o = cjson.decode(v); " +
" if (o['o'] == ARGV[1]) then " +
" return true; " +
" else" +
" return false; " +
" end;" +
"end",
Collections.<Object>singletonList(getName()), id.toString() + "-" + Thread.currentThread().getId());
return opStatus;
}
// 获取锁持有计数
@Override
public int getHoldCount() {
Long opStatus = commandExecutor.evalRead(getName(), RedisCommands.EVAL_INTEGER,
"local v = redis.call('get', KEYS[1]); " +
"if (v == false) then " +
" return 0; " +
"else " +
" local o = cjson.decode(v); " +
" return o['c']; " +
"end",
Collections.<Object>singletonList(getName()));
return opStatus.intValue();
}
// 删除锁
@Override
public boolean delete() {
forceUnlock();
return true;
}
@Override
public Future<Boolean> deleteAsync() {
return forceUnlockAsync();
}
}
这个分布式锁的核心:
- 利用 lua 脚本去获取锁、更新锁的值。
- 每个锁都会订阅一个 channel ,用来接收加锁、解锁的消息。
- 每次获取锁前,先订阅这个锁专属的 channel ,等待加锁解锁消息。
2. 核心的 lua 脚本
大部分核心的逻辑其实都是在 lua 脚本中,理解了这些 lua 脚本的逻辑分布锁这块的逻辑基本就可以理清。
2.1 获取锁
假设有一个锁名为 myLock,持有者为线程ID 123e4567-e89b-12d3-a456-426614174000-1 , 过期时间为30000毫秒(30秒),lua脚本参数示例为:
- KEYS[1]:myLock
- ARGV[1]:123e4567-e89b-12d3-a456-426614174000-1
- ARGV[2]:30000
获取锁的 lua 代码:
--- 从 Redis 中获取锁的当前值
local v = redis.call('get', KEYS[1]);
--- 如果锁不存在,说明可以获取到锁
if (v == false) then
--- 尝试使用 set 命令设置一个锁。 锁的值是一个 JSON 对象, o表示锁的持有者,也就是线程的UUID。 c表示重入计数,默认就是1。
--- px 就是设置锁的过期时间,也就是参数 ARGV[2]。
redis.call('set', KEYS[1], cjson.encode({['o'] = ARGV[1], ['c'] = 1}), 'px', ARGV[2]);
--- 返回nil表示锁已获取
return nil;
else
--- 这里说明锁已经存在了。
--- 将已经存在的锁对象数据解析成JSON。
local o = cjson.decode(v);
--- o表示锁的持有者,也就是线程的UUID , 判断这个锁的持有者跟当前尝试获取锁的线程是不是一样
if (o['o'] == ARGV[1]) then
--- 如果一样说明是当前线程重复获取锁,直接增加这个锁的重入次数,重新设置过期时间。
o['c'] = o['c'] + 1; redis.call('set', KEYS[1], cjson.encode(o), 'px', ARGV[2]);
--- 返回nil表示锁已获取
return nil;
end;
-- 走到这里说明锁被其他线程持有,只能继续等,返回锁的剩余毫秒TTL。
return redis.call('pttl', KEYS[1]);
end
2.2 解锁
假设有一个锁名为myLock,持有者为线程ID 123e4567-e89b-12d3-a456-426614174000-1 ,解锁消息为 0,过期时间为30000毫秒(30秒),channel 名为redissonlockchannel__{myLock},则 lua 脚本参数参数示例为:
- KEYS[1]:myLock
- ARGV[1]:123e4567-e89b-12d3-a456-426614174000-1
- ARGV[2]:0
- ARGV[3]:30000
- ARGV[4]:redissonlockchannel__{myLock}
--- 从 Redis 中获取锁的当前值
local v = redis.call('get', KEYS[1]);
--- 如果锁不存在,直接发布一个解锁消息
if (v == false) then
redis.call('publish', ARGV[4], ARGV[2]); --- ARGV[4]是 channel 名,ARGV[2] 是解锁消息,外面传进来的这个值 0
return true; --- 返回true表示锁已成功释放
else
local o = cjson.decode(v);
--- 检查当前线程是不是这个锁的持有者
if (o['o'] == ARGV[1]) then
--- 如果是的话,锁重入次数就要减1
o['c'] = o['c'] - 1;
--- 如果减完1之后重入次数还是大于0,说明还不能立即释放这个锁,其他地方还在用
if (o['c'] > 0) then
redis.call('set', KEYS[1], cjson.encode(o), 'px', ARGV[3]); --- 更新锁的值和过期时间。
return false; --- 返回false表示锁仍然被持有
else
--- 重入次数为0,就可以删除这个锁了。
redis.call('del', KEYS[1]); --- 删除锁
redis.call('publish', ARGV[4], ARGV[2]); -- 发布一个解锁消息
return true; --- 返回true表示锁已成功释放
end
end;
--- 如果当前线程不是锁的持有者,返回nil
return nil;
end
2.3 查看锁是否被当前线程持有
假设有一个锁名为 myLock ,持有者为线程ID 123e4567-e89b-12d3-a456-426614174000-1 , lua 脚本参数示例为:
- KEYS[1]:myLock
- ARGV[1]:123e4567-e89b-12d3-a456-426614174000-1
--- 从 Redis 中获取锁的当前值
local v = redis.call('get', KEYS[1]);
if (v == false) then
--- 锁不存在,返回false,表示锁未被持有。
return false;
else
local o = cjson.decode(v);
--- 如果当前线程是锁的持有者返回true , 不是返回false。
if (o['o'] == ARGV[1]) then
return true;
else
return false;
end;
end
2.4 获取锁的重入次数
假设有一个锁名为 myLock, lua 脚本参数示例为:
- KEYS[1]:myLock
--- 从 Redis 中获取锁的当前值
local v = redis.call('get', KEYS[1]);
--- 如果锁不存在,返回0,表示当前线程没有持有锁
if (v == false) then
return 0;
else
\--- 返回当前锁数据中的重入计数 c,c表示当前线程持有锁的次数
local o = cjson.decode(v);
return o['c'];
end
2.5 强制解锁
假设有一个锁名为myLock,解锁消息为0,频道名为 redissonlockchannel__{myLock} ,则 lua 脚本参数示例为:
- KEYS[1]:myLock
- ARGV[1]:0
- ARGV[2]:redissonlockchannel__{myLock}
redis.call('del', KEYS[1]); --- 从 Redis 中删除锁,KEYS[1]是锁的键名。这一步是强制的,无论当前锁的持有者是谁都会直接删除。
redis.call('publish', ARGV[2], ARGV[1]); --- 发布一个解锁消息
return true --- 返回true表示锁已成功强制释放
3. 主要逻辑梳理
3.1 获取锁
简单的流程可以看下面这张图。
文字版本:
- 先去 Redis 中查找对应的锁是否存在,不存在就直接设置锁。 同时开启一个定时任务,定期去刷新锁的过期时间。
- 如果存在锁,查看这把锁是否是当前线程持有,如果是当前线程持有的话,则执行重入操作。 也就是持有的数值+1。
- 如果存在锁,但不是当前线程持有,就通过 Redis channel 订阅这把锁的解锁消息。
- 当收到解锁消息后,重新开始尝试获取锁。
3.2 释放锁
释放锁的逻辑更简单一点:
- 先去 Redis 中查找对应的锁是否存在,不存在就直接跳过处理。
- 锁存在的话,比较锁的持有者是否是当前线程,不是当前线程不能解锁,跳过处理。
- 是当前线程持有锁,对持有次数 -1 。 如果持有次数等于0,那么删除锁,然后发送解锁消息到 channel 中,最后停止锁的自动续期任务。 如果持有次数还大于0则暂时不释放锁。