七的博客

Redisson2.0源码分析9-分布式锁

源码分析

Redisson2.0源码分析9-分布式锁

大部分人接触到 Redisson 估计都是从 Redisson 支持分布锁这个特性,所以 Redisson 在这块肯定是实现上比较可靠的。

这个版本的分布式锁比 Redisson V1 的逻辑要完善许多,代码量也是比之前多很多。 实现方式上已经从 setnx 命令实现分布式锁切换到使用 lua 脚本 去实现分布式锁。

1. 源码定义

package org.redisson;

import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;

import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.core.RLock;

import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;

public class RedissonLock extends RedissonExpirable implements RLock {

	// 默认锁过期 30 秒
    public static final long LOCK_EXPIRATION_INTERVAL_SECONDS = 30;

    // 保存每个锁的刷新任务,定期刷新锁的过期时间,防止锁意外释放。
    private static final ConcurrentMap<String, Timeout> refreshTaskMap = PlatformDependent.newConcurrentHashMap();

    // 锁的内部租约时间,毫秒单位,控制锁的自动释放时间
    protected long internalLockLeaseTime = TimeUnit.SECONDS.toMillis(LOCK_EXPIRATION_INTERVAL_SECONDS);

    private final UUID id;  // 锁的唯一ID

    private static final Integer unlockMessage = 0;  // 解锁消息标识符

    // 管理锁的状态和订阅
    private static final ConcurrentMap<String, RedissonLockEntry> ENTRIES = PlatformDependent.newConcurrentHashMap();

    protected RedissonLock(CommandExecutor commandExecutor, String name, UUID id) {
        super(commandExecutor, name);
        this.id = id;
    }

    // 取消锁的订阅.
    // 当锁被释放时,检查是否需要取消频道的订阅。
    private void unsubscribe() {
    	// 一直到取消成功为止
        while (true) {
            RedissonLockEntry entry = ENTRIES.get(getEntryName());
            // 没有这个锁就直接返回
            if (entry == null) {
                return;
            }

            // 创建一个新的锁条目并释放锁
            RedissonLockEntry newEntry = new RedissonLockEntry(entry);
            newEntry.release();

            // 使用CAS操作确保线程安全地更新锁条目
            if (ENTRIES.replace(getEntryName(), entry, newEntry)) {
                if (newEntry.isFree()
                        && ENTRIES.remove(getEntryName(), newEntry)) {
                    synchronized (ENTRIES) {
                        // 是否需要取消订阅频道
                        if (!ENTRIES.containsKey(getEntryName())) {
                        	// 取消订阅频道,释放资源,防止资源泄漏
                            commandExecutor.getConnectionManager().unsubscribe(getChannelName());
                        }
                    }
                }
                return;
            }
        }
    }

    // 获取锁的唯一标识符
    private String getEntryName() {
        return id + ":" + getName();
    }

    // 获取锁的条目
    private Promise<Boolean> aquire() {
    	// 直到操作成功为止
        while (true) {
            RedissonLockEntry entry = ENTRIES.get(getEntryName());
            // 如果锁条目不存在,返回null。  表示没有可用的锁
            if (entry == null) {
                return null;
            }

             // 创建一个新的锁条目并获取锁,增加引用计数
            RedissonLockEntry newEntry = new RedissonLockEntry(entry);
            newEntry.aquire();
            if (ENTRIES.replace(getEntryName(), entry, newEntry)) {
            	// 成功更新锁条目后,返回Promise对象,用于异步操作的结果
                return newEntry.getPromise();
            }
        }
    }

    // 订阅锁的 channel ,在锁释放时可以收到通知
    private Future<Boolean> subscribe() {
        Promise<Boolean> promise = aquire();
        // 有锁条目,直接返回对应的Promise,表示已经订阅成功
        if (promise != null) {
            return promise;
        }

        Promise<Boolean> newPromise = newPromise();
        final RedissonLockEntry value = new RedissonLockEntry(newPromise);
        value.aquire();

        // 创建新的Promise和锁条目,准备进行订阅
        RedissonLockEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value);
        if (oldValue != null) {
            Promise<Boolean> oldPromise = aquire();
            if (oldPromise == null) {
            	// 旧的Promise不存在,递归调用subscribe以确保订阅成功
                return subscribe();
            }
            return oldPromise;
        }

        RedisPubSubListener<Integer> listener = new BaseRedisPubSubListener<Integer>() {

            @Override
            public void onMessage(String channel, Integer message) {
                if (message.equals(unlockMessage) && getChannelName().equals(channel)) {
                	 // 当收到解锁消息时,释放锁,通知等待的线程 
                    value.getLatch().release();
                }
            }

            @Override
            public boolean onStatus(PubSubType type, String channel) {
                if (channel.equals(getChannelName()) && !value.getPromise().isSuccess()) {
                	// 订阅成功时,设置Promise为成功状态,通知等待的线程
                    value.getPromise().setSuccess(true);
                    return true;
                }
                return false;
            }

        };

        // 订阅锁的 channel ,以便在锁被释放时接收到通知
        // 用 Redis 的发布/订阅机制来实现锁的通知
        synchronized (ENTRIES) {
            commandExecutor.getConnectionManager().subscribe(listener, getChannelName());
        }
        return newPromise;
    }

    // 生成锁 channel 名称
    private String getChannelName() {
        return "redisson__lock__channel__{" + getName() + "}";
    }

    // 获取锁,不响应中断
    @Override
    public void lock() {
        try {
            lockInterruptibly();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    // 支持超时时间获取锁,不响应中断
    @Override
    public void lock(long leaseTime, TimeUnit unit) {
        try {
            lockInterruptibly(leaseTime, unit);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }


    // 响应中断获取锁
    @Override
    public void lockInterruptibly() throws InterruptedException {
        lockInterruptibly(-1, null);
    }

    // 获取锁,在等待锁的过程中响应中断。
    @Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    	// 计算 ttl 时间 
        Long ttl;
        if (leaseTime != -1) {
            ttl = tryLockInner(leaseTime, unit);
        } else {
            ttl = tryLockInner();
        }

        // 尝试获取锁,如果成功,ttl将为null
        if (ttl == null) {
            return;
        }

        // 等待锁释放消息
        subscribe().awaitUninterruptibly();

        try {
        	// 一直到拿到锁为止
            while (true) {
            	// 尝试去获取锁
                if (leaseTime != -1) {
                    ttl = tryLockInner(leaseTime, unit);
                } else {
                    ttl = tryLockInner();
                }

                // ttl 为空意味着拿到了锁,不用再循环了
                if (ttl == null) {
                	// 锁已成功获取,退出循环
                    break;
                }

                RedissonLockEntry entry = ENTRIES.get(getEntryName());
                // ttl 是被别人占有的锁过期时间, 大于 0 则当前线程尝试等待这个时间。
                // 等这个时间到了,立马就去抢锁。
                if (ttl >= 0) {
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                	// 无限制等待锁释放通知
                    entry.getLatch().acquire();
                }
            }
        } finally {
        	// 确保在方法结束时取消订阅
            unsubscribe();
        }
    }

    // 尝试获取锁,会立马返回结果,返回拿到锁的状态
    @Override
    public boolean tryLock() {
        return tryLockInner() == null;
    }

    // 尝试获取锁,返回锁的剩余存活时间
    private Long tryLockInner() {
    	// 查询锁剩余时间
        Long ttlRemaining = tryLockInner(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS);
         // 如果锁成功获取,创建一个新的刷新任务,定期刷新锁的过期时间
        if (ttlRemaining == null) {
            newRefreshTask();
        }
        return ttlRemaining;
    }

    // 创建一个新的刷新任务,以定期刷新锁的过期时间
    private void newRefreshTask() {
    	// 已经存在刷新任务,直接返回,避免重复创建
        if (refreshTaskMap.containsKey(getName())) {
            return;
        }

        // 定义刷新锁过期时间的定时任务,通过 netty 的时间轮去执行。
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                // 调用 expire 方法重新设置锁的过期时间,默认为 30,秒.
                expire(internalLockLeaseTime, TimeUnit.MILLISECONDS);
                // internalLockLeaseTime/3 是任务的执行间隔,默认情况下为 30/3=10秒,即10000毫秒。
                refreshTaskMap.remove(getName());
                newRefreshTask(); // reschedule itself
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

        // 如果这个锁已经有刷新任务了,这次的刷新任务就取消掉
        if (refreshTaskMap.putIfAbsent(getName(), task) != null) {
            task.cancel();
        }
    }

    // 通知刷新任务
    private void stopRefreshTask() {
        Timeout task = refreshTaskMap.remove(getName());
        if (task != null) {
            task.cancel();
        }
    }

    // 尝试获取锁,获取成功返回剩余存活时间
    private Long tryLockInner(final long leaseTime, final TimeUnit unit) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        // 使用Redis的EVAL命令执行Lua脚本,确保锁的获取操作是原子性的
        // 如果锁不存在,则创建锁并设置过期时间
        // 如果锁已存在且由当前线程持有,则增加持有计数
        // 否则就会返回锁的剩余存活时间
        // 调用参数 锁的名称,锁的持有者ID,由UUID和线程ID组成,锁的过期时间
        return commandExecutor.evalWrite(getName(), RedisCommands.EVAL_INTEGER,
                "local v = redis.call('get', KEYS[1]); " +
                                "if (v == false) then " +
                                "  redis.call('set', KEYS[1], cjson.encode({['o'] = ARGV[1], ['c'] = 1}), 'px', ARGV[2]); " +
                                "  return nil; " +
                                "else " +
                                "  local o = cjson.decode(v); " +
                                "  if (o['o'] == ARGV[1]) then " +
                                "    o['c'] = o['c'] + 1; redis.call('set', KEYS[1], cjson.encode(o), 'px', ARGV[2]); " +
                                "    return nil; " +
                                "  end;" +
                                "  return redis.call('pttl', KEYS[1]); " +
                                "end",
                        Collections.<Object>singletonList(getName()), id.toString() + "-" + Thread.currentThread().getId(), internalLockLeaseTime);
    }

    // 尝试在指定时间获取锁
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    	// 将等待时间转换为毫秒
        long time = unit.toMillis(waitTime);
        Long ttl;
        if (leaseTime != -1) {
            ttl = tryLockInner(leaseTime, unit);
        } else {
            ttl = tryLockInner();
        }


        // 如果拿到了锁,直接返回 true
        if (ttl == null) {
            return true;
        }

        if (!subscribe().awaitUninterruptibly(time, TimeUnit.MILLISECONDS)) {
        	// 如果在指定的等待时间内未能订阅到锁的释放通知,返回false
            return false;
        }

        try {
            while (true) {
            	// 再次尝试获取锁
                if (leaseTime != -1) {
                    ttl = tryLockInner(leaseTime, unit);
                } else {
                    ttl = tryLockInner();
                }
              
                if (ttl == null) {
                  // 如果锁成功获取,退出循环
                    break;
                }

                if (time <= 0) {
                	// 等待时间已耗尽,返回false
                    return false;
                }

                // 记录当前时间,用于计算等待时间
                long current = System.currentTimeMillis();
                RedissonLockEntry entry = ENTRIES.get(getEntryName());

                if (ttl >= 0 && ttl < time) {
                	// 等待锁释放通知,或直到ttl过期
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                	 // 等待锁释放通知,或直到剩余等待时间耗尽
                    entry.getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                 // 计算等待操作消耗的时间
                long elapsed = System.currentTimeMillis() - current;
                 // 减少剩余的等待时间
                time -= elapsed;
            }

            // 成功获取锁,返回true
            return true;
        } finally {
        	// 结束时取消订阅
            unsubscribe();
        }
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return tryLock(time, -1, unit);
    }

    // 释放锁
    @Override
    public void unlock() {
    	// 执行 Redis lua 脚本
    	// 1. 先尝试获取锁的当前值,锁不存在就直接发布结果消息通知等待的线程。
    	// 2. 锁存在并且持有者是当前线程,减少持有技术,持有计数大于0更新锁的值跟剩余时间,同时返回 false 表示未释放。
    	// 3. 锁存在但不是当前线程持有,返回 nil 表示无法释放锁。
        Boolean opStatus = commandExecutor.evalWrite(getName(), RedisCommands.EVAL_BOOLEAN,
                "local v = redis.call('get', KEYS[1]); " +
                                "if (v == false) then " +
                                "  redis.call('publish', ARGV[4], ARGV[2]); " +
                                "  return true; " +
                                "else " +
                                "  local o = cjson.decode(v); " +
                                "  if (o['o'] == ARGV[1]) then " +
                                "    o['c'] = o['c'] - 1; " +
                                "    if (o['c'] > 0) then " +
                                "      redis.call('set', KEYS[1], cjson.encode(o), 'px', ARGV[3]); " +
                                "      return false;"+
                                "    else " +
                                "      redis.call('del', KEYS[1]);" +
                                "      redis.call('publish', ARGV[4], ARGV[2]); " +
                                "      return true;"+
                                "    end" +
                                "  end;" +
                                "  return nil; " +
                                "end",
                        Collections.<Object>singletonList(getName()), id.toString() + "-" + Thread.currentThread().getId(), unlockMessage, internalLockLeaseTime, getChannelName());
        
        // 为空则表示无法释放锁
        if (opStatus == null) {
            throw new IllegalStateException("Can't unlock lock Current id: "
                    + id + " thread-id: " + Thread.currentThread().getId());
        }
        // 为 true 停止锁续期任务
        if (opStatus) {
            stopRefreshTask();
        }
    }

    @Override
    public Condition newCondition() {
        // TODO implement
        throw new UnsupportedOperationException();
    }

    @Override
    public void forceUnlock() {
        get(forceUnlockAsync());
    }

    // 强制解锁
    private Future<Boolean> forceUnlockAsync() {
    	// 停止锁续期任务
        stopRefreshTask();
        // 删除锁并且发布解锁消息
        return commandExecutor.evalWriteAsync(getName(), RedisCommands.EVAL_BOOLEAN,
                "redis.call('del', KEYS[1]); redis.call('publish', ARGV[2], ARGV[1]); return true",
                        Collections.<Object>singletonList(getName()), unlockMessage, getChannelName());
    }

    @Override
    public boolean isLocked() {
    	// 判断锁是否存在
        return commandExecutor.read(getName(), RedisCommands.EXISTS, getName());
    }

	// 判断锁是否当前线程持有
    @Override
    public boolean isHeldByCurrentThread() {

        Boolean opStatus = commandExecutor.evalRead(getName(), RedisCommands.EVAL_BOOLEAN,
                            "local v = redis.call('get', KEYS[1]); " +
                                "if (v == false) then " +
                                "  return false; " +
                                "else " +
                                "  local o = cjson.decode(v); " +
                                "  if (o['o'] == ARGV[1]) then " +
                                "    return true; " +
                                "  else" +
                                "    return false; " +
                                "  end;" +
                                "end",
                        Collections.<Object>singletonList(getName()), id.toString() + "-" + Thread.currentThread().getId());
        return opStatus;
    }

    // 获取锁持有计数
    @Override
    public int getHoldCount() {
        Long opStatus = commandExecutor.evalRead(getName(), RedisCommands.EVAL_INTEGER,
                "local v = redis.call('get', KEYS[1]); " +
                                "if (v == false) then " +
                                "  return 0; " +
                                "else " +
                                "  local o = cjson.decode(v); " +
                                "  return o['c']; " +
                                "end",
                        Collections.<Object>singletonList(getName()));
        return opStatus.intValue();
    }

    // 删除锁
    @Override
    public boolean delete() {
        forceUnlock();
        return true;
    }

    @Override
    public Future<Boolean> deleteAsync() {
        return forceUnlockAsync();
    }

}

这个分布式锁的核心:

  • 利用 lua 脚本去获取锁、更新锁的值。
  • 每个锁都会订阅一个 channel ,用来接收加锁、解锁的消息。
  • 每次获取锁前,先订阅这个锁专属的 channel ,等待加锁解锁消息。

2. 核心的 lua 脚本

大部分核心的逻辑其实都是在 lua 脚本中,理解了这些 lua 脚本的逻辑分布锁这块的逻辑基本就可以理清。

2.1 获取锁

假设有一个锁名为 myLock,持有者为线程ID 123e4567-e89b-12d3-a456-426614174000-1 , 过期时间为30000毫秒(30秒),lua脚本参数示例为:

  • KEYS[1]:myLock
  • ARGV[1]:123e4567-e89b-12d3-a456-426614174000-1
  • ARGV[2]:30000

获取锁的 lua 代码:

--- 从 Redis 中获取锁的当前值
local v = redis.call('get', KEYS[1]);

--- 如果锁不存在,说明可以获取到锁
if (v == false) then

  --- 尝试使用 set 命令设置一个锁。 锁的值是一个 JSON 对象, o表示锁的持有者,也就是线程的UUID。 c表示重入计数,默认就是1。
  --- px 就是设置锁的过期时间,也就是参数 ARGV[2]。
  redis.call('set', KEYS[1], cjson.encode({['o'] = ARGV[1], ['c'] = 1}), 'px', ARGV[2]);
  --- 返回nil表示锁已获取
  return nil;
else

  --- 这里说明锁已经存在了。
  --- 将已经存在的锁对象数据解析成JSON。  
  local o = cjson.decode(v);

  --- o表示锁的持有者,也就是线程的UUID , 判断这个锁的持有者跟当前尝试获取锁的线程是不是一样
  if (o['o'] == ARGV[1]) then

    --- 如果一样说明是当前线程重复获取锁,直接增加这个锁的重入次数,重新设置过期时间。
    o['c'] = o['c'] + 1; redis.call('set', KEYS[1], cjson.encode(o), 'px', ARGV[2]);
    
    --- 返回nil表示锁已获取
    return nil;
  end;
  
  -- 走到这里说明锁被其他线程持有,只能继续等,返回锁的剩余毫秒TTL。
  return redis.call('pttl', KEYS[1]);
end

2.2 解锁

假设有一个锁名为myLock,持有者为线程ID 123e4567-e89b-12d3-a456-426614174000-1 ,解锁消息为 0,过期时间为30000毫秒(30秒),channel 名为redissonlockchannel__{myLock},则 lua 脚本参数参数示例为:

  • KEYS[1]:myLock
  • ARGV[1]:123e4567-e89b-12d3-a456-426614174000-1
  • ARGV[2]:0
  • ARGV[3]:30000
  • ARGV[4]:redissonlockchannel__{myLock}
--- 从 Redis 中获取锁的当前值
local v = redis.call('get', KEYS[1]);

--- 如果锁不存在,直接发布一个解锁消息
if (v == false) then
  redis.call('publish', ARGV[4], ARGV[2]);    --- ARGV[4]是 channel 名,ARGV[2] 是解锁消息,外面传进来的这个值 0
  return true;  --- 返回true表示锁已成功释放

else
  local o = cjson.decode(v);
  
  --- 检查当前线程是不是这个锁的持有者
  if (o['o'] == ARGV[1]) then  
    
    --- 如果是的话,锁重入次数就要减1
    o['c'] = o['c'] - 1;
    
    --- 如果减完1之后重入次数还是大于0,说明还不能立即释放这个锁,其他地方还在用
    if (o['c'] > 0) then
      redis.call('set', KEYS[1], cjson.encode(o), 'px', ARGV[3]);  --- 更新锁的值和过期时间。
      return false;  --- 返回false表示锁仍然被持有
    else
     
      --- 重入次数为0,就可以删除这个锁了。  
      redis.call('del', KEYS[1]);   --- 删除锁
      redis.call('publish', ARGV[4], ARGV[2]); -- 发布一个解锁消息
      return true;  --- 返回true表示锁已成功释放
    end
  end;
  
  --- 如果当前线程不是锁的持有者,返回nil
  return nil;
end

2.3 查看锁是否被当前线程持有

假设有一个锁名为 myLock ,持有者为线程ID 123e4567-e89b-12d3-a456-426614174000-1 , lua 脚本参数示例为:

  • KEYS[1]:myLock
  • ARGV[1]:123e4567-e89b-12d3-a456-426614174000-1
--- 从 Redis 中获取锁的当前值
local v = redis.call('get', KEYS[1]);

if (v == false) then
  
  --- 锁不存在,返回false,表示锁未被持有。  
  return false; 
else
  local o = cjson.decode(v);
  
  --- 如果当前线程是锁的持有者返回true , 不是返回false。
  if (o['o'] == ARGV[1]) then
    return true;
  else
    return false;
  end;
end

2.4 获取锁的重入次数

假设有一个锁名为 myLock, lua 脚本参数示例为:

  • KEYS[1]:myLock
--- 从 Redis 中获取锁的当前值
local v = redis.call('get', KEYS[1]);

--- 如果锁不存在,返回0,表示当前线程没有持有锁
if (v == false) then
  return 0;
else
  
  \--- 返回当前锁数据中的重入计数 c,c表示当前线程持有锁的次数 
  local o = cjson.decode(v);
  return o['c'];
end

2.5 强制解锁

假设有一个锁名为myLock,解锁消息为0,频道名为 redissonlockchannel__{myLock} ,则 lua 脚本参数示例为:

  • KEYS[1]:myLock
  • ARGV[1]:0
  • ARGV[2]:redissonlockchannel__{myLock}
redis.call('del', KEYS[1]);    --- 从 Redis 中删除锁,KEYS[1]是锁的键名。这一步是强制的,无论当前锁的持有者是谁都会直接删除。
redis.call('publish', ARGV[2], ARGV[1]);   --- 发布一个解锁消息
return true   --- 返回true表示锁已成功强制释放

3. 主要逻辑梳理

3.1 获取锁

简单的流程可以看下面这张图。

获取锁简单流程

文字版本:

  • 先去 Redis 中查找对应的锁是否存在,不存在就直接设置锁。 同时开启一个定时任务,定期去刷新锁的过期时间。
  • 如果存在锁,查看这把锁是否是当前线程持有,如果是当前线程持有的话,则执行重入操作。 也就是持有的数值+1。
  • 如果存在锁,但不是当前线程持有,就通过 Redis channel 订阅这把锁的解锁消息。
  • 当收到解锁消息后,重新开始尝试获取锁。

3.2 释放锁

释放锁简单逻辑

释放锁的逻辑更简单一点:

  • 先去 Redis 中查找对应的锁是否存在,不存在就直接跳过处理。
  • 锁存在的话,比较锁的持有者是否是当前线程,不是当前线程不能解锁,跳过处理。
  • 是当前线程持有锁,对持有次数 -1 。 如果持有次数等于0,那么删除锁,然后发送解锁消息到 channel 中,最后停止锁的自动续期任务。 如果持有次数还大于0则暂时不释放锁。