七的博客

RedissonV1.0源码分析-分布式对象篇

源码分析

RedissonV1.0源码分析-分布式对象篇

这篇主要分析的是 Redisson V1.0 版本分布式对象的实现,也是最为核心的源码分析。

这个版本的 Redisson 一共提供了下面几个分布式对象:

  • 分布式 AtomicLong

  • 分布式 CountDownLatch

  • 分布式 List

  • 分布式 Map

  • 分布式 Queue

  • 分布式 Set

  • 分布式 Topic

  • 分布式 Lock

可以看出 V1.0 版本这些分布式对象基本可以满足实际项目中大部分的场景需求。

Redisson 中每一个分布式对象都有各自的接口抽象,从接口的定义就可以直接看出来接口可以实现什么功能。

1. 分布式对象基类

1.1 RObject 接口定义

这个接口是 Redisson 中所有分布式对象的基础接口。

package org.redisson.core;


public interface RObject {

    // 获取对象的名称
    String getName();

}


1.2 RedissonObject 抽象类定义

RedissonObject 是一个基础的抽象类,不能被直接实例化,只能用作其他类的基类。

package org.redisson;

abstract class RedissonObject {

    private String name;  // 分布式对象的名称,用于 Redis 里面存储

    public RedissonObject(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public void close() {
    }

}


2. 分布式 AtomicLong

分布式 AtomicLong 对象主要是利用了 Redis 的元素值自增、递减等特性。

2.1 RAtomicLong 接口定义

Java 标准库中 AtomicLong 的分布式替代版本,跟 AtomicLong 的功能几乎一样。

package org.redisson.core;

public interface RAtomicLong extends RObject {

    // 减 1, 再返回减之前的值
    long getAndDecrement();

    // 加指定数值,再返回加之后的值
    long addAndGet(long delta);

    // 如果当前的值等于预期的值,原子性地设置为新的值。 返回值是操作是否成功
    boolean compareAndSet(long expect, long update);

    // 减 1, 再返回减之后的值
    long decrementAndGet();

    // 获取当前值
    long get();

    // 加指定数值,再返回加之前的值
    long getAndAdd(long delta);

    // 设置新的值,返回旧的值
    long getAndSet(long newValue);

    // 自增1,然后返回自增后的值
    long incrementAndGet();

    // 自增1,然后返回自增前的值
    long getAndIncrement();

    // 设置新的值
    void set(long newValue);

}

2.2 RedissonAtomicLong 源码实现

分布式 AtomicLong 的实现。

package org.redisson;

import org.redisson.connection.ConnectionManager;
import org.redisson.core.RAtomicLong;

import com.lambdaworks.redis.RedisConnection;


public class RedissonAtomicLong extends RedissonObject implements RAtomicLong {

    private final ConnectionManager connectionManager;

    RedissonAtomicLong(ConnectionManager connectionManager, String name) {
        super(name);
        this.connectionManager = connectionManager;
    }


    @Override
    public long addAndGet(long delta) {
    	// 获取一个可用连接
        RedisConnection<String, Object> conn = connectionManager.connection();
        try {
        	// 调用 incrby 方法,实际上调用的是 INCRBY 指令
            return conn.incrby(getName(), delta);
        } finally {
        	// 释放当前连接
            connectionManager.release(conn);
        }
    }

    @Override
    public boolean compareAndSet(long expect, long update) {
        RedisConnection<String, Object> conn = connectionManager.connection();
        try {
        	// 无限循环,直到设置成功为止
            while (true) {
            	// 使用 Redis 的 WATCH 命令监视 key
                conn.watch(getName());

                // 如果当前值不等于期望值,放弃事务并直接返回 false
                Long value = (Long) conn.get(getName());
                if (value != expect) {
                    conn.discard();
                    return false;
                }

                // 开始事务、设置新的值
                conn.multi();
                conn.set(getName(), update);

                // 返回结果大小为1 则说明事务成功,返回 true。
                if (conn.exec().size() == 1) {
                    return true;
                }
            }
        } finally {
            connectionManager.release(conn);
        }
    }

    @Override
    public long decrementAndGet() {
        RedisConnection<String, Object> conn = connectionManager.connection();
        try {
        	// 调用 decr 方法,实际上调用的是 Redis 的 DECR 指令        	
            return conn.decr(getName());
        } finally {
            connectionManager.release(conn);
        }
    }

    @Override
    public long get() {
        RedisConnection<String, Object> conn = connectionManager.connection();
        try {
        	// 调用 decr 方法,实际上调用的是 Redis 的 GET 指令        	
            return (Long) conn.get(getName());
        } finally {
            connectionManager.release(conn);
        }
    }

    @Override
    public long getAndAdd(long delta) {
        while (true) {
            long current = get();
            long next = current + delta;
            if (compareAndSet(current, next))
                return current;
        }
    }

    @Override
    public long getAndSet(long newValue) {
        RedisConnection<String, Object> conn = connectionManager.connection();
        try {
        	// 实际上调用的是 Redis 的 GETSET 指令        	
            return (Long) conn.getset(getName(), newValue);
        } finally {
            connectionManager.release(conn);
        }
    }

    @Override
    public long incrementAndGet() {
        RedisConnection<String, Object> conn = connectionManager.connection();
        try {
        	// 调用 incr 方法,实际上调用的是 Redis 的 INCR 指令        	
            return conn.incr(getName());
        } finally {
            connectionManager.release(conn);
        }
    }

    @Override
    public long getAndIncrement() {
        return getAndAdd(1);
    }

    public long getAndDecrement() {
        return getAndAdd(-1);
    }

    @Override
    public void set(long newValue) {
        RedisConnection<String, Object> conn = connectionManager.connection();
        try {
        	// 调用 Redis SET 指令设置值
            conn.set(getName(), newValue);
        } finally {
            connectionManager.release(conn);
        }
    }

    public String toString() {
        return Long.toString(get());
    }

}

3. 分布式 CountDownLatch

分布式 CountDownLatch 主要是利用了 Redis 的 get、set 、发布订阅机制实现。

3.1 RCountDownLatch 接口定义

这个就是相当于 Java 标准库里 java.util.concurrent.CountDownLatch 的分布式替代版本。

比如 java.util.concurrent.CountDownLatch 更有优势的地方在于计数值可以通过 trySetCount 方法重置。

运用在多个 JVM 或服务器之间去共享或者同步状态。

package org.redisson.core;

import java.util.concurrent.TimeUnit;

public interface RCountDownLatch extends RObject {

    // 方法会导致当前线程等待,直到计数器降到零。除非是当前线程被中断。
    // 计数器为零,方法会立即返回。
    void await() throws InterruptedException;

    // 支持超时时间的等待。
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;

    // 计数器减1
    void countDown();

    // 获取当前计数器的值。 用于调试跟测试
    long getCount();


    // 在计数器到0或者还没有设置的时候,设置一个新的计数器值。
    /  如果之前设置成功了计数值则返回 true
    /  如果之前的计数值还没有到达到 0 就返回 false
    boolean trySetCount(long count);

}


3.2 RedissonCountDownLatch 源码实现

Java java.util.concurrent.CountDownLatch 的分布式实现。

package org.redisson;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.redisson.connection.ConnectionManager;
import org.redisson.connection.ConnectionManager.PubSubEntry;
import org.redisson.core.RCountDownLatch;
import org.redisson.misc.ReclosableLatch;

import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;


public class RedissonCountDownLatch extends RedissonObject implements RCountDownLatch {

    private final CountDownLatch subscribeLatch = new CountDownLatch(1);  // 确保订阅操作只执行一次

    private final String groupName = "redisson_countdownlatch_";  // 构建 Redis 中的 channel 名称的前缀

    private static final Integer zeroCountMessage = 0;  // 发布/订阅模式中传递消息的常量
    private static final Integer newCountMessage = 1;

    private final AtomicBoolean subscribeOnce = new AtomicBoolean();  // 确保只订阅一次

    private final ReclosableLatch msg = new ReclosableLatch();  // 可重复的闭锁,用于线程间的同步
 
    private final ConnectionManager connectionManager;  // 连接管理
    private PubSubEntry pubSubEntry;   // 发布订阅的对象

    RedissonCountDownLatch(ConnectionManager connectionManager, String name) {
        super(name);
        this.connectionManager = connectionManager;
    }

    public void subscribe() {
    	// 防止多线程环境下重复订阅
        if (subscribeOnce.compareAndSet(false, true)) {
            RedisPubSubAdapter<String, Integer> listener = new RedisPubSubAdapter<String, Integer>() {

                @Override
                public void subscribed(String channel, long count) {
                	// 查订阅的 channel 是否是期望的。是预期的调用 subscribeLatch.countDown(),释放等待订阅完成的线程
                    if (getChannelName().equals(channel)) {
                        subscribeLatch.countDown();
                    }
                }

                @Override
                public void message(String channel, Integer message) {
                    if (!getChannelName().equals(channel)) {
                        return;
                    }
					// 收到值为0 表示计数已经到达零,调用 msg.open() 来释放等待的线程
                    if (message.equals(zeroCountMessage)) {
                        msg.open();
                    }

                   // 收到值为 1 表示设置了新的计数,调用 msg.close() 来重置等待状态
                    if (message.equals(newCountMessage)) {
                        msg.close();
                    }
                }

            };
            // 订阅指定的 channel,并关联创建的监听器
            pubSubEntry = connectionManager.subscribe(listener, getChannelName());
        }

        // 等待订阅操作完成。await() 会阻塞当前线程,直到 countDown() 被调用
        try {
            subscribeLatch.await();
        } catch (InterruptedException e) {
        	// 等待过程中被中断,重新设置线程的中断状态
            Thread.currentThread().interrupt();
        }
    }

    public void await() throws InterruptedException {
    	// 循环会一直执行,直到计数器的值变为 0 或小于 0
    	// 这里会循环通过访问 Redis 来获取最新的计数值

        while (getCount() > 0) {
            // waiting for message
            msg.await();
        }
    }


    // 带超时时间的等待
    @Override
    public boolean await(long time, TimeUnit unit) throws InterruptedException {
        time = unit.toMillis(time);
        while (getCount() > 0) {
            if (time <= 0) {
                return false;
            }
            // 记录下当前的时间
            long current = System.currentTimeMillis();
            // 等待消息,传入超时时间等待
            msg.await(time, TimeUnit.MILLISECONDS);

            // 扣减本次用掉的时间
            long elapsed = System.currentTimeMillis() - current;
            time = time - elapsed;
        }

        return true;
    }

    @Override
    public void countDown() {
    	// 当前计数已经小于等于 0 直接返回
        if (getCount() <= 0) {
            return;
        }


        RedisConnection<String, Object> connection = connectionManager.connection();
        try {
        	// 调用 Redis 指令对数值减 1
            Long val = connection.decr(getName());

            // 等于 0 的操作
            if (val == 0) {
            	// 开启事务 > 删除计数器  > 发布消息通知等待的其他线程  >  执行事务检查操作结果
                connection.multi();
                connection.del(getName());
                connection.publish(getChannelName(), zeroCountMessage);
                if (connection.exec().size() != 2) {
                    throw new IllegalStateException();
                }
            } else if (val < 0) {
            	// 小于 0 是异常情况,直接删除掉
                connection.del(getName());
            }
        } finally {
            connectionManager.release(connection);
        }
    }

    private String getChannelName() {
        return groupName + getName();
    }

    // 获取计数器当前值
    @Override
    public long getCount() {
        RedisConnection<String, Object> connection = connectionManager.connection();
        try {
        	// 直接调用 Redis get 指令查询计数器的当前值, 查询不到默认为 0 
            Number val = (Number) connection.get(getName());
            if (val == null) {
                return 0;
            }
            return val.longValue();
        } finally {
            connectionManager.release(connection);
        }
    }

    // 尝试设置计数器的值
    @Override
    public boolean trySetCount(long count) {

        RedisConnection<String, Object> connection = connectionManager.connection();
        try {
        	// 使用 WATCH 命令监视计数器键,为后续的事务做准备
            connection.watch(getName());

            // 获取这个名称的计数器是不是有值,有值就放弃本次操作
            Long oldValue = (Long) connection.get(getName());
            if (oldValue != null) {
                connection.discard();
                return false;
            }

            开始事务 > 设置计数器的值到 Redis > 发布一个消息通知其他线程有新的计数值设置 > 执行事务检查结果
            connection.multi();
            connection.set(getName(), count);
            connection.publish(getChannelName(), newCountMessage);
            return connection.exec().size() == 2;
        } finally {
            connectionManager.release(connection);
        }
    }

    public void close() {
        connectionManager.unsubscribe(pubSubEntry, getChannelName());
    }

}

4. 分布式 List

分布式 List 对象主要是利用了 Redis 的 List 类型存储数据。

4.1 RList 接口定义

这接口就是内置的 List 接口的分布式实现,没很特殊的地方。

package org.redisson.core;
import java.util.List;

public interface RList<V> extends List<V>, RObject {
}

4.2 RedissonList 源码实现

java.util.List 接口的分布式实现,主要是利用了 Redis 的 List 结构。

package org.redisson;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;

import org.redisson.connection.ConnectionManager;
import org.redisson.core.RList;

import com.lambdaworks.redis.RedisConnection;


public class RedissonList<V> extends RedissonObject implements RList<V> {

    private int batchSize = 50;   // 批量操作的大小  默认为50。 这个参数会影响一些批量操作的性能

    private final ConnectionManager connectionManager;

    RedissonList(ConnectionManager connectionManager, String name) {
        super(name);
        this.connectionManager = connectionManager;
    }

    protected ConnectionManager getConnectionManager() {
        return connectionManager;
    }

    @Override
    public int size() {
        RedisConnection<String, Object> connection = connectionManager.connection();
        try {
        	// 调用 Redis LLEN 命令获取 List 的长度
            return connection.llen(getName()).intValue();
        } finally {
            connectionManager.release(connection);
        }
    }

    @Override
    public boolean isEmpty() {
        return size() == 0;
    }

    @Override
    public boolean contains(Object o) {
        return indexOf(o) != -1;
    }

    @Override
    public Iterator<V> iterator() {
        return listIterator();
    }

    @Override
    public Object[] toArray() {
        List<V> list = subList(0, size());
        return list.toArray();
    }

    @Override
    public <T> T[] toArray(T[] a) {
        List<V> list = subList(0, size());
        return list.toArray(a);
    }

    @Override
    public boolean add(V e) {
        return addAll(Collections.singleton(e));
    }

    @Override
    public boolean remove(Object o) {
        RedisConnection<String, Object> connection = connectionManager.connection();
        try {
        	// 使用 Redis LREM 移除目标元素。
            return connection.lrem(getName(), 1, o) > 0;
        } finally {
            connectionManager.release(connection);
        }
    }

    @Override
    public boolean containsAll(Collection<?> c) {
    	// 发送 Redis 命令查询是否非空
        if (isEmpty()) {
            return false;
        }

        RedisConnection<String, Object> connection = connectionManager.connection();
        try {
        	// 创建集合的副本
            Collection<Object> copy = new ArrayList<Object>(c);

            // 计算需要分多少批操作
            int to = div(size(), batchSize);
            for (int i = 0; i < to; i++) {
            	// 调用 LRANGE 命令获取一批元素
                List<Object> range = connection.lrange(getName(), i*batchSize, i*batchSize + batchSize - 1);

                // 迭代需要判断是否存在的元素
                for (Iterator<Object> iterator = copy.iterator(); iterator.hasNext();) {
                    Object obj = iterator.next();
                    // 看元素是不是在本次 Redis 查询出来的数据中
                    int index = range.indexOf(obj);
                    // 找到了从副本集合中移除
                    if (index != -1) {
                        iterator.remove();
                    }
                }
            }
            // 复制集合为空,说明所有元素都被找到
            return copy.isEmpty();
        } finally {
            connectionManager.release(connection);
        }

    }

    @Override
    public boolean addAll(Collection<? extends V> c) {
        RedisConnection<String, Object> conn = connectionManager.connection();
        try {
        	// 调用 Redis RPUSH  将集合中的所有元素添加到 List 的末尾
            conn.rpush(getName(), c.toArray());
            return true;
        } finally {
            connectionManager.release(conn);
        }
    }

    @Override
    public boolean addAll(int index, Collection<? extends V> coll) {
    	// 先检查下标是不是超过 redis list 总元素大小了
        checkPosition(index);

        // 如果index小于当前列表的大小,则需要在中间插入元素
        if (index < size()) {
            RedisConnection<String, Object> conn = connectionManager.connection();
            try {
                while (true) {
                	// WATCH命令监视列表的变化
                    conn.watch(getName());
                    // LRANGE 获取从 index 到 List 末尾的所有元素
                    List<Object> tail = conn.lrange(getName(), index, size());
                    // 开启事务
                    conn.multi();
                    
                    // LTRIM 命令保留从开始到 index-1 的元素,移除 index 及其后的元素
                    conn.ltrim(getName(), 0, index - 1);
                    
                    // RPUSH命令将集合入参中的元素插入到 List 中
                    conn.rpush(getName(), coll.toArray());
                    
                    // 再次使用 RPUSH 命令将之前获取的尾部元素重新添加到列表中
                    conn.rpush(getName(), tail.toArray());

                    // 执行事务。 返回的结果大小为3则为执行成功  
                    if (conn.exec().size() == 3) {
                        return true;
                    }
                }
            } finally {
                connectionManager.release(conn);
            }
        } else {
        	// 下标在 List 末尾的情况
            return addAll(coll);
        }
    }

    @Override
    public boolean removeAll(Collection<?> c) {
        RedisConnection<String, Object> conn = connectionManager.connection();
        try {
            boolean result = false;
            // 使用 LREM 命令逐个移除
            for (Object object : c) {
                boolean res = conn.lrem(getName(), 0, object) > 0;
                if (!result) {
                    result = res;
                }
            }
            return result;
        } finally {
            connectionManager.release(conn);
        }

    }

    // 保留传入集合中的元素,其他的删除掉
    @Override
    public boolean retainAll(Collection<?> c) {
        boolean changed = false;
        // 通过迭代器遍历 List,移除不在集合中的元素
        for (Iterator<V> iterator = iterator(); iterator.hasNext();) {
            V object = iterator.next();
            if (!c.contains(object)) {
                iterator.remove();
                changed = true;
            }
        }
        return changed;
    }

    @Override
    public void clear() {
        RedisConnection<String, Object> conn = connectionManager.connection();
        try {
        	// DEL 命令删除元素
            conn.del(getName());
        } finally {
            connectionManager.release(conn);
        }
    }

    @Override
    public V get(int index) {
    	// 先检查下标是不是超过 redis list 总元素大小

        checkIndex(index);
        RedisConnection<String, Object> conn = connectionManager.connection();
        try {

        	// LINDEX 命令获取指定索引的元素
            return (V) conn.lindex(getName(), index);
        } finally {
            connectionManager.release(conn);
        }
    }

    private void checkIndex(int index) {
        int size = size();
        if (!isInRange(index, size))
            throw new IndexOutOfBoundsException("index: " + index + " but current size: "+ size);
    }

    private boolean isInRange(int index, int size) {
        return index >= 0 && index < size;
    }

    private void checkPosition(int index) {
        int size = size();
        if (!isPositionInRange(index, size))
            throw new IndexOutOfBoundsException("index: " + index + " but current size: "+ size);
    }

    private boolean isPositionInRange(int index, int size) {
        return index >= 0 && index <= size;
    }


    @Override
    public V set(int index, V element) {
    	// 先检查下标是不是超过 redis list 总元素大小
        checkIndex(index);
        RedisConnection<String, Object> conn = connectionManager.connection();
        try {
        	// LINDEX 获取之前的对象
            V prev = (V) conn.lindex(getName(), index);
            // LSET 设置值  指定下标
            conn.lset(getName(), index, element);
            return prev;
        } finally {
            connectionManager.release(conn);
        }
    }

    @Override
    public void add(int index, V element) {
        addAll(index, Collections.singleton(element));
    }

    // 计算要处理的总批次
    private int div(int p, int q) {
        int div = p / q;
        int rem = p - q * div; // equal to p % q

        if (rem == 0) {
          return div;
        }

        return div + 1;
    }

    @Override
    public V remove(int index) {
        checkIndex(index);

        RedisConnection<String, Object> conn = connectionManager.connection();
        try {
            if (index == 0) {
                return (V) conn.lpop(getName());
            }
            while (true) {
            	// WATCH 命令监视列表的变化
                conn.watch(getName());
                // LINDEX 命令获取指定下标元素
                V prev = (V) conn.lindex(getName(), index);

                // LRANG 取出从 index+1 到列表末尾的所有元素。
                List<Object> tail = conn.lrange(getName(), index + 1, size());

                // 开启事务
                conn.multi();
                // LTRIM 命令保留  Redis List 从开始到 index-1 的元素 。  移除index及其后的元素
                conn.ltrim(getName(), 0, index - 1);
                // RPUSH 命令将之前获取的尾部元素重新添加到 Redis List中
                conn.rpush(getName(), tail.toArray());
                // 返回的结果大小为 2 则为事务执行成功,返回上一个元素的值
                if (conn.exec().size() == 2) {
                    return prev;
                }
            }
        } finally {
            connectionManager.release(conn);
        }
    }

    @Override
    public int indexOf(Object o) {
        if (isEmpty()) {
            return -1;
        }

        RedisConnection<String, Object> conn = connectionManager.connection();
        try {
        	// 计算总批次
            int to = div(size(), batchSize);
            for (int i = 0; i < to; i++) {
            	
            	// LRANG 命令取出一批数据,判断是否在这一批数据里面
                List<Object> range = conn.lrange(getName(), i*batchSize, i*batchSize + batchSize - 1);
                int index = range.indexOf(o);
                if (index != -1) {
                    return index + i*batchSize;
                }
            }

            return -1;
        } finally {
            connectionManager.release(conn);
        }
    }

    @Override
    public int lastIndexOf(Object o) {
        if (isEmpty()) {
            return -1;
        }

        RedisConnection<String, Object> conn = connectionManager.connection();
        try {
        	// 拿到 Redis List 总元素大小
            int size = size();
            // 计算要处理的总批次
            int to = div(size, batchSize);

            // 循环处理
            for (int i = 1; i <= to; i++) {
                int startIndex = -i*batchSize;

                // LRANGE命令获取当前批次的元素
                List<Object> range = conn.lrange(getName(), startIndex, size - (i-1)*batchSize);
                // 从 Redis 获取的数据范围内查找元素的最后一个索引
                int index = range.lastIndexOf(o);
                if (index != -1) {
                	// 找到了该元素,计算其在整个列表中的实际索引并返回
                    return Math.max(size + startIndex, 0) + index;
                }
            }

            return -1;
        } finally {
            connectionManager.release(conn);
        }
    }

    @Override
    public ListIterator<V> listIterator() {
        return listIterator(0);
    }

    // 迭代器实现
    // 支持双向遍历和修改

    @Override
    public ListIterator<V> listIterator(final int ind) {
        return new ListIterator<V>() {
        	// 当前迭代器指向的索引,初始化为ind - 1,因为 next() 会先增加索引
            private int currentIndex = ind - 1;
            // 是否执行过remove 操作,防止重复删除
            private boolean removeExecuted;

            @Override
            public boolean hasNext() {
            	// 检查是否存在下一个元素,确保 currentIndex+1 小于列表大小且列表非空
                int size = size();
                return currentIndex+1 < size && size > 0;
            }

            @Override
            public V next() {
            	// 返回下一个元素,如果没有下一个元素则抛出NoSuchElementException
                if (!hasNext()) {
                    throw new NoSuchElementException("No such element at index " + currentIndex);
                }
                // 更新 currentIndex 并重置 removeExecuted 状态
                currentIndex++;
                removeExecuted = false;
                return RedissonList.this.get(currentIndex);
            }

            @Override
            public void remove() {
            	// 已执行过remove()则抛出IllegalStateException。
                if (removeExecuted) {
                    throw new IllegalStateException("Element been already deleted");
                }
                // 调用  remove 方法   更新 currentIndex 和 removeExecuted
                RedissonList.this.remove(currentIndex);
                currentIndex--;
                removeExecuted = true;
            }

            @Override
            public boolean hasPrevious() {
                int size = size();
                return currentIndex-1 < size && size > 0 && currentIndex >= 0;
            }

            @Override
            public V previous() {
                if (!hasPrevious()) {
                    throw new NoSuchElementException("No such element at index " + currentIndex);
                }
                removeExecuted = false;
                V res = RedissonList.this.get(currentIndex);
                currentIndex--;
                return res;
            }

            @Override
            public int nextIndex() {
                return currentIndex + 1;
            }

            @Override
            public int previousIndex() {
                return currentIndex;
            }

            @Override
            public void set(V e) {
                if (currentIndex >= size()-1) {
                    throw new IllegalStateException();
                }
                RedissonList.this.set(currentIndex, e);
            }

            @Override
            public void add(V e) {
                RedissonList.this.add(currentIndex+1, e);
                currentIndex++;
            }
        };
    }

    @Override
    public List<V> subList(int fromIndex, int toIndex) {
        int size = size();
        if (fromIndex < 0 || toIndex > size) {
            throw new IndexOutOfBoundsException("fromIndex: " + fromIndex + " toIndex: " + toIndex + " size: " + size);
        }
        if (fromIndex > toIndex) {
            throw new IllegalArgumentException("fromIndex: " + fromIndex + " toIndex: " + toIndex);
        }

        RedisConnection<String, Object> conn = connectionManager.connection();
        try {
        	// LRANGE 命令获取指定范围的元素
            return (List<V>) conn.lrange(getName(), fromIndex, toIndex - 1);
        } finally {
            connectionManager.release(conn);
        }
    }

    public String toString() {
        Iterator<V> it = iterator();
        if (! it.hasNext())
            return "[]";

        StringBuilder sb = new StringBuilder();
        sb.append('[');
        for (;;) {
            V e = it.next();
            sb.append(e == this ? "(this Collection)" : e);
            if (! it.hasNext())
                return sb.append(']').toString();
            sb.append(',').append(' ');
        }
    }

}

5. 分布式 Map

分布式 Map 对象主要是利用了 Redis 的 Hash 类型存储数据。

5.1 RMap 接口定义

分布式 Map 实现,也没有很特殊的地方。 实现了 java.util.concurrent.ConcurrentMap 跟 java.util.Map 接口

package org.redisson.core;

import java.util.concurrent.ConcurrentMap;

public interface RMap<K, V> extends ConcurrentMap<K, V>, RObject {

}

5.2 RedissonMap 源码实现

分布式 Map 实现

package org.redisson;

import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;

import org.redisson.connection.ConnectionManager;
import org.redisson.core.RMap;

import com.lambdaworks.redis.RedisConnection;

//TODO implement watching by keys instead of map name
public class RedissonMap<K, V> extends RedissonObject implements RMap<K, V> {

    private final ConnectionManager connectionManager;

    RedissonMap(ConnectionManager connectionManager, String name) {
        super(name);
        this.connectionManager = connectionManager;
    }

    @Override
    public int size() {
        RedisConnection<String, Object> connection = connectionManager.connection();
        try {
        	// HLEN 命令查询 hash 总元素个数
            return connection.hlen(getName()).intValue();
        } finally {
            connectionManager.release(connection);
        }
    }

    @Override
    public boolean isEmpty() {
        return size() == 0;
    }

    @Override
    public boolean containsKey(Object key) {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
        	// HEXISTS 命令查看元素是否存在
            return connection.hexists(getName(), key);
        } finally {
            connectionManager.release(connection);
        }
    }

    @Override
    public boolean containsValue(Object value) {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
        	// HVALS 命令查看 value 是否存在
            return connection.hvals(getName()).contains(value);
        } finally {
            connectionManager.release(connection);
        }
    }

    @Override
    public V get(Object key) {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
        	// HGET 命令查询 key
            return (V) connection.hget(getName(), key);
        } finally {
            connectionManager.release(connection);
        }
    }

    @Override
    public V put(K key, V value) {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
        	// HGET 命令查询旧的值
            V prev = (V) connection.hget(getName(), key);
            // HSET 命令设置新的值
            connection.hset(getName(), key, value);
            return prev;
        } finally {
            connectionManager.release(connection);
        }
    }

    @Override
    public V remove(Object key) {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
        	// HGET 命令查询旧的值
            V prev = (V) connection.hget(getName(), key);
            // HDEL 移除元素
            connection.hdel(getName(), key);
            return prev;
        } finally {
            connectionManager.release(connection);
        }
    }

    @Override
    public void putAll(Map<? extends K, ? extends V> map) {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
        	// HMSET 设置一批 hash 值
            connection.hmset(getName(), (Map<Object, Object>) map);
        } finally {
            connectionManager.release(connection);
        }
    }

    @Override
    public void clear() {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
        	// HDEL 命令删除元素
            connection.del(getName());
        } finally {
            connectionManager.release(connection);
        }
    }

    @Override
    public Set<K> keySet() {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
        		// HKEYS 返回所有的 hash key
            return new LinkedHashSet<K>((Collection<? extends K>) connection.hkeys(getName()));
        } finally {
            connectionManager.release(connection);
        }
    }

    @Override
    public Collection<V> values() {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
        	// HVALS 返回所有的 hash value
            return (Collection<V>) connection.hvals(getName());
        } finally {
            connectionManager.release(connection);
        }
    }

    @Override
    public Set<java.util.Map.Entry<K, V>> entrySet() {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
        	// HKEYS 命令获取 Redis 哈希表中所有的键s, 并将其转换为 LinkedHashSet 保持顺序
            Map<Object, Object> map = connection.hgetall(getName());
            Map<K, V> result = new HashMap<K, V>();
            for (java.util.Map.Entry<Object, Object> entry : map.entrySet()) {
                result.put((K)entry.getKey(), (V)entry.getValue());
            }
            return result.entrySet();
        } finally {
            connectionManager.release(connection);
        }
    }

    @Override
    public V putIfAbsent(K key, V value) {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
            while (true) {
            	// HSETNX 命令设置键值对,如果键不存在则设置成功
                Boolean res = connection.hsetnx(getName(), key, value);

                // 设置失败,说明键已存在,返回当前键的值
                if (!res) {
                    V result = get(key);
                    if (result != null) {
                        return result;
                    }
                } else {
                    return null;
                }
            }
        } finally {
            connectionManager.release(connection);
        }

    }

    private boolean isEquals(RedisConnection<Object, Object> connection, Object key, Object value) {
        Object val = connection.hget(getName(), key);
        return (value != null && value.equals(val)) || (value == null && val == null);
    }


    @Override
    public boolean remove(Object key, Object value) {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
            while (true) {
            	// watch命令监视键
                connection.watch(getName());
                // 检查键是否存在并且值又匹配
                if (connection.hexists(getName(), key)
                        && isEquals(connection, key, value)) {
                	// 开启事务 > 删除键 > 判断事务执行成功
                    connection.multi();
                    connection.hdel(getName(), key);
                    if (connection.exec().size() == 1) {
                        return true;
                    }
                } else {
                    return false;
                }
            }
        } finally {
            connectionManager.release(connection);
        }
    }

    @Override
    public boolean replace(K key, V oldValue, V newValue) {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
            while (true) {
            	// watch命令监视键
                connection.watch(getName());

                // 检查键是否存在并且值又匹配
                if (connection.hexists(getName(), key)
                        && isEquals(connection, key, oldValue)) {
                	// 开启事务 > 设置键 > 判断事务执行成功

                    connection.multi();
                    connection.hset(getName(), key, newValue);
                    if (connection.exec().size() == 1) {
                        return true;
                    }
                } else {
                    return false;
                }
            }
        } finally {
            connectionManager.release(connection);
        }
    }

    @Override
    public V replace(K key, V value) {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
            while (true) {
            	// watch命令监视键
                connection.watch(getName());

                 // 检查键是否存在并且值又匹配
                if (connection.hexists(getName(), key)) {
                    // 获取旧的值
                    V prev = (V) connection.hget(getName(), key);
                    // 开始事务 > 设置值 > 检查事务执行是否成功
                    connection.multi();
                    connection.hset(getName(), key, value);
                    if (connection.exec().size() == 1) {
                        // 返回旧的值
                        return prev;
                    }
                }
                return null;
            }
        } finally {
            connectionManager.release(connection);
        }
    }

}

6. 分布式 Queue

分布式 Queue 对象主要是利用了 Redis 的 List 类型存储数据。

6.1 RQueue 接口定义

分布式队列接口,实现了 java.util.List 接口。

package org.redisson.core;

import java.util.Queue;


public interface RQueue<V> extends Queue<V>, RObject {

}


6.1 RedissonQueue 源码实现

分布式队列实现。 总体来说实现还是比较简单,主要是利用了 Redis List 的相关 API 实现。

package org.redisson;

import java.util.NoSuchElementException;

import org.redisson.connection.ConnectionManager;
import org.redisson.core.RQueue;

import com.lambdaworks.redis.RedisConnection;

public class RedissonQueue<V> extends RedissonList<V> implements RQueue<V> {

    RedissonQueue(ConnectionManager connectionManager, String name) {
        super(connectionManager, name);
    }

    // 将元素添加到队列中
    @Override
    public boolean offer(V e) {
        return add(e);
    }

    // 获取队列的第一个元素
    public V getFirst() {
        RedisConnection<String, Object> connection = getConnectionManager().connection();
        try {
            // LINDEX 命令读取 Redis List 的第一个元素
            V value = (V) connection.lindex(getName(), 0);
            if (value == null) {
                // 元素不存在,抛出异常
                throw new NoSuchElementException();
            }
            return value;
        } finally {
            getConnectionManager().release(connection);
        }
    }

    // 移除并返回队列的第一个元素
    public V removeFirst() {
        RedisConnection<String, Object> connection = getConnectionManager().connection();
        try {
            // lpop 命令从 Redis List 中弹出第一个元素
            V value = (V) connection.lpop(getName());
            if (value == null) {
                 // 元素不存在,抛出异常
                throw new NoSuchElementException();
            }
            return value;
        } finally {
            getConnectionManager().release(connection);
        }
    }

    @Override
    public V remove() {
        return removeFirst();
    }

    @Override
    public V poll() {
        RedisConnection<String, Object> connection = getConnectionManager().connection();
        try {
            // 尝试从队列中移除  并返回第一个元素
            return (V) connection.lpop(getName());
        } finally {
            getConnectionManager().release(connection);
        }
    }

    // 返回队列的第一个元素
    @Override
    public V element() {
        return getFirst();
    }

    // 返回队列的第一个元素 但不移除
    @Override
    public V peek() {
        if (isEmpty()) {
            return null;
        }
        return get(0);
    }

}

7. 分布式 Set

分布式 List 对象主要是利用了 Redis 的 Set 类型存储数据。

7.1 RSet 接口定义

分布式集合接口,实现了 java.util.Set 接口。

package org.redisson.core;

import java.util.Set;


public interface RSet<V> extends Set<V>, RObject {

}


7.2 RedissonSet 源码实现

java.util.Set 的分布式集合实现。

package org.redisson;

import java.util.Collection;
import java.util.Iterator;

import org.redisson.connection.ConnectionManager;
import org.redisson.core.RSet;

import com.lambdaworks.redis.RedisConnection;


public class RedissonSet<V> extends RedissonObject implements RSet<V> {

    private final ConnectionManager connectionManager;

    RedissonSet(ConnectionManager connectionManager, String name) {
        super(name);
        this.connectionManager = connectionManager;
    }

    @Override
    public int size() {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
            // 调用 scard 命令获取集合的大小
            return connection.scard(getName()).intValue();
        } finally {
            connectionManager.release(connection);
        }
    }

    // 集合是否为空
    @Override
    public boolean isEmpty() {
        return size() == 0;
    }

    @Override
    public boolean contains(Object o) {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
            // 使用 Redis 的 sismember 命令进行判断包含该元素
            return connection.sismember(getName(), o);
        } finally {
            connectionManager.release(connection);
        }
    }

    @Override
    public Iterator<V> iterator() {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
            // TODO use SSCAN in case of usage Redis 2.8    后面用 SSCAN 命令来优化大集合的迭代
            // 使用 Redis 的 smembers 命令获取集合中的所有元素
            return (Iterator<V>) connection.smembers(getName()).iterator();
        } finally {
            connectionManager.release(connection);
        }
    }

    @Override
    public Object[] toArray() {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
            // 将集合中的元素转换为数组返回
            return connection.smembers(getName()).toArray();
        } finally {
            connectionManager.release(connection);
        }
    }

    @Override
    public <T> T[] toArray(T[] a) {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
            return connection.smembers(getName()).toArray(a);
        } finally {
            connectionManager.release(connection);
        }
    }

    @Override
    public boolean add(V e) {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
            // sadd 命令添加元素   返回是否成功添加
            return connection.sadd(getName(), e) > 0;
        } finally {
            connectionManager.release(connection);
        }
    }

    @Override
    public boolean remove(Object o) {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
            // srem 命令移除元素,并返回是否成功移除
            return connection.srem(getName(), o) > 0;
        } finally {
            connectionManager.release(connection);
        }
    }

    @Override
    public boolean containsAll(Collection<?> c) {
        // 循环执行 sismember 命令进行判断包含该元素
        for (Object object : c) {
            if (!contains(object)) {
                return false;
            }
        }
        return true;
    }

    @Override
    public boolean addAll(Collection<? extends V> c) {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
            // 调用 sadd 命令添加所有元素
            return connection.sadd(getName(), c.toArray()) > 0;
        } finally {
            connectionManager.release(connection);
        }
    }

    // 保留传入集合中的这些元素,其他的删除掉
    @Override
    public boolean retainAll(Collection<?> c) {
        boolean changed = false;
        for (Iterator<V> iterator = iterator(); iterator.hasNext();) {
            V object = iterator.next();
            // 循环调用 sismember 命令进行判断包含该元素
            if (!c.contains(object)) {
                // 调用删除命令
                iterator.remove();
                changed = true;
            }
        }
        return changed;
    }

    @Override
    public boolean removeAll(Collection<?> c) {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
            // 使用 srem 命令移除指定的元素
            return connection.srem(getName(), c.toArray()) > 0;
        } finally {
            connectionManager.release(connection);
        }
    }

    @Override
    public void clear() {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
            // 使用 Redis 的 del 命令删除集合
            connection.del(getName());
        } finally {
            connectionManager.release(connection);
        }
    }

}

8. 分布式 Topic

分布式 List 对象主要是利用了 Redis 的发布订阅机制,Redisson 中很多功能都会利用 Topic 机制去实现。

8.1 Topic MessageListener 接口定义

用于消息监听的接口,这个接口实现了 java.util.EventListener 接口,表明这是一个事件监听器。 EventListener 接口本身是个空接口,不包含任务方法定义。

package org.redisson.core;
import java.util.EventListener;

public interface MessageListener<M> extends EventListener {

    // 订阅的 Redis topic 接收到新消息时,onMessage 方法会被调用
    void onMessage(M msg);
}

8.2 RTopic 接口定义

分布式 Topic 接口,实现了发布-订阅模式,可以用来在分布式系统中进行消息传递。

package org.redisson.core;

// 消息会被传递到 Redis 集群中的所有消息监听器。
public interface RTopic<M> extends RObject {

    // 向此 Topic 的所有订阅者发布消息
    void publish(M message);

    // 订阅此 Topic 。当任何消息发布到这个 Topic 的时候,会调用 MessageListener.onMessage() 。
    int addListener(MessageListener<M> listener);

    // 通过 listenerId 移除这个 Topic 的监听器。
    void removeListener(int listenerId);

}

8.3 RedissonTopic 源码实现

分布式 Topic 的实现。

package org.redisson;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import org.redisson.connection.ConnectionManager;
import org.redisson.connection.ConnectionManager.PubSubEntry;
import org.redisson.core.MessageListener;
import org.redisson.core.RTopic;

import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;


public class RedissonTopic<M> extends RedissonObject implements RTopic<M> {

    private final CountDownLatch subscribeLatch = new CountDownLatch(1);   // 确保订阅操作只执行一次
    private final AtomicBoolean subscribeOnce = new AtomicBoolean();   // 标记是否已经订阅

    private final Map<Integer, RedisPubSubTopicListenerWrapper<String, M>> listeners =
                                new ConcurrentHashMap<Integer, RedisPubSubTopicListenerWrapper<String, M>>();   // 所有的监听器
    private final ConnectionManager connectionManager;

    private PubSubEntry pubSubEntry;  // 订阅的条目

    RedissonTopic(ConnectionManager connectionManager, String name) {
        super(name);
        this.connectionManager = connectionManager;
    }

    public void subscribe() {
        // 保证只订阅一次
        if (subscribeOnce.compareAndSet(false, true)) {
            RedisPubSubAdapter<String, M> listener = new RedisPubSubAdapter<String, M>() {

                @Override
                public void subscribed(String channel, long count) {
                    // 订阅成功时,调用 countDown 方法
                    if (channel.equals(getName())) {
                        subscribeLatch.countDown();
                    }
                }

            };

            // 执行订阅操作
            pubSubEntry = connectionManager.subscribe(listener, getName());
        }

        try {
            // 确保订阅完成后再继续执行后续的逻辑
            subscribeLatch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    // 发布消息到 topic 
    @Override
    public void publish(M message) {
        RedisConnection<String, Object> conn = connectionManager.connection();
        try {
            // 使用 publish 命令发送消息
            conn.publish(getName(), message);
        } finally {
            connectionManager.release(conn);
        }
    }

    // 添加消息监听器
    @Override
    public int addListener(MessageListener<M> listener) {
        RedisPubSubTopicListenerWrapper<String, M> pubSubListener = new RedisPubSubTopicListenerWrapper<String, M>(listener, getName());
        listeners.put(pubSubListener.hashCode(), pubSubListener);
        pubSubEntry.addListener(pubSubListener);
        return pubSubListener.hashCode();
    }

    // 移除监听器
    @Override
    public void removeListener(int listenerId) {
        RedisPubSubTopicListenerWrapper<String, M> pubSubListener = listeners.remove(listenerId);
        pubSubEntry.removeListener(pubSubListener);
    }

    // 取消订阅
    @Override
    public void close() {
        // 使用 unsubscribe 命令取消订阅
        connectionManager.unsubscribe(pubSubEntry, getName());
    }

}

9. 分布式 Lock

这一个版本的分布式锁,主要是采用 Redis 的 SETNX 命令实现分布式锁的获取和释放。

9.1 RLock 接口定义

Java 标准库中 java.util.concurrent.locks.Lock 接口的分布式实现,可以用来实现分布式锁。

package org.redisson.core;

import java.util.concurrent.locks.Lock;


public interface RLock extends Lock, RObject {

    // 强制解锁,不考虑锁的状态
    void forceUnlock();

    // 检查这个锁是否被 Redisson 集群中的任何线程锁定。 锁定返回 true,否则返回 false
    boolean isLocked();

    // 检查这个锁是否被当前线程持有。 当前线程持有则返回 true,否则返回 false
    boolean isHeldByCurrentThread();

    // 当前线程对这个锁的持有次数。如果这个锁没有被当前线程持有则返回 0
    // 这个主要是支持重入特性,可以通过这个值来判断重入多少次
    int getHoldCount();

}

9.2 RedissonLock 源码实现

java.util.concurrent.locks.Lock 的可重入分布式锁实现。 Redisson 用的最多的场景估计就是这个了,很多人都是从这个功能了解到 Redisson 的。

package org.redisson;

import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;

import org.redisson.connection.ConnectionManager;
import org.redisson.connection.ConnectionManager.PubSubEntry;
import org.redisson.core.RLock;

import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;


public class RedissonLock extends RedissonObject implements RLock {

    public static class LockValue implements Serializable {

        private static final long serialVersionUID = -8895632286065689476L;

        private UUID id;   // 锁的唯一ID
        private Long threadId;  // 线程ID
        // need for reentrant support
        private int counter; // 计数器,支持重入锁

        public LockValue() {
        }

        public LockValue(UUID id, Long threadId) {
            super();
            this.id = id;
            this.threadId = threadId;
        }

        public void decCounter() {
            counter--;
        }

        public void incCounter() {
            counter++;
        }

        public int getCounter() {
            return counter;
        }

        @Override
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result + ((id == null) ? 0 : id.hashCode());
            result = prime * result + ((threadId == null) ? 0 : threadId.hashCode());
            return result;
        }

        @Override
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            LockValue other = (LockValue) obj;
            if (id == null) {
                if (other.id != null)
                    return false;
            } else if (!id.equals(other.id))
                return false;
            if (threadId == null) {
                if (other.threadId != null)
                    return false;
            } else if (!threadId.equals(other.threadId))
                return false;
            return true;
        }

    }

    private final ConnectionManager connectionManager;

    private final UUID id;   // 锁的唯一ID
    private final String groupName = "redisson_lock_";  // 锁的组名前缀,所有锁的名称都将以此为前缀

    private static final Integer unlockMessage = 0;  // 发布订阅机制中的解锁消息值

    private final CountDownLatch subscribeLatch = new CountDownLatch(1);  // 确保订阅操作完成后再进行其他操作
    private final AtomicBoolean subscribeOnce = new AtomicBoolean();  // 确保订阅操作只执行一次的标记

    private final Semaphore msg = new Semaphore(1);  // 信号量控制锁的获取和释放,初始许可数为1。

    private PubSubEntry pubSubEntry;

    RedissonLock(ConnectionManager connectionManager, String name, UUID id) {
        super(name);
        this.connectionManager = connectionManager;
        this.id = id;
    }

 
 	// 订阅 Redis channel ,接收解锁消息
    public void subscribe() {
    	// 保证订阅操作只执行一次
        if (subscribeOnce.compareAndSet(false, true)) {
        	// 获取信号量,确保在订阅完成之前不进行锁操作
            msg.acquireUninterruptibly();

            RedisPubSubAdapter<String, Integer> listener = new RedisPubSubAdapter<String, Integer>() {

                @Override
                public void subscribed(String channel, long count) {
                	// 减少 subscribeLatch 的计数
                    if (getChannelName().equals(channel)) {
                        subscribeLatch.countDown();
                    }
                }

                @Override
                public void message(String channel, Integer message) {
                	// 接收到解锁消息时释放信号量
                    if (message.equals(unlockMessage) && getChannelName().equals(channel)) {
                        msg.release();
                    }
                }

            };
            // 订阅指定 channel
            pubSubEntry = connectionManager.subscribe(listener, getChannelName());
        }

        try {
        	// 等待订阅完成
            subscribeLatch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    // 支持打断的锁获取
    @Override
    public void lock() {
        try {
            lockInterruptibly();
        } catch (InterruptedException e) {
        	// 如果线程在等待获取锁时被中断,捕获异常并设置线程的中断状态
            Thread.currentThread().interrupt();
            return;
        }
    }

    private String getKeyName() {
        return groupName + getName();
    }

    private String getChannelName() {
        return groupName + getName();
    }


    @Override
    public void lockInterruptibly() throws InterruptedException {
    	// 循环调用 tryLock 方法直到成功拿到锁
        while (!tryLock()) {
            // 没拿到锁阻塞当前线程,一直到信号量可用。 这是通过订阅 Redis channel 来监听解锁消息实现
            msg.acquire();
        }
    }


    @Override
    public boolean tryLock() {
    	// 表示当前线程的锁状态,并增加计数器
        LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
        currentLock.incCounter();

        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
        	// 调用 setnx 加锁
            Boolean res = connection.setnx(getKeyName(), currentLock);
            // 锁已被其他线程持有
            if (!res) {
                LockValue lock = (LockValue) connection.get(getKeyName());
                // 如果 lock 不为 null且等于 currentLock , 表示当前线程已持有锁 , 就增加计数器并更新锁状态
                if (lock != null && lock.equals(currentLock)) {
                    lock.incCounter();
                    connection.set(getKeyName(), lock);
                    return true;
                }
            }
            return res;
        } finally {
            connectionManager.release(connection);
        }
    }

    // 带超时时间获取锁,过了超时时间就算获取锁失败
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        time = unit.toMillis(time);
        while (!tryLock()) {
            if (time <= 0) {
                return false;
            }
            long current = System.currentTimeMillis();
            // waiting for message
            msg.tryAcquire(time, TimeUnit.MILLISECONDS);
            long elapsed = System.currentTimeMillis() - current;
            time -= elapsed;
        }
        return true;
    }


    // 解锁流程
    @Override
    public void unlock() {
    	// 当前线程的锁状态
        LockValue currentLock = new LockValue(id, Thread.currentThread().getId());

        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
        	// 查看当前连接的锁信息
            LockValue lock = (LockValue) connection.get(getKeyName());
            // 如果 lock 不为 null 且等于 currentLock , 说明已经被当前线程持有了
            if (lock != null && lock.equals(currentLock)) {
            	// 计数器大于0,说明其他地方还要用这个锁
                if (lock.getCounter() > 1) {
                    lock.decCounter();
                    connection.set(getKeyName(), lock);
                } else {
                	// 释放锁
                    unlock(connection);
                }
            } else {
            	// 锁不属于当前线程,抛出异常
                throw new IllegalMonitorStateException("Attempt to unlock lock, not locked by current id: "
                        + id + " thread-id: " + Thread.currentThread().getId());
            }
        } finally {
            connectionManager.release(connection);
        }
    }

    // 实际执行锁的释放,从 redis 删除锁
    private void unlock(RedisConnection<Object, Object> connection) {
        int counter = 0;
        while (counter < 5) {
        	// 开启事务 > 删除 redis 锁  > 发布解锁消息到 channel > 判断事务执行是否成功
            connection.multi();
            connection.del(getKeyName());
            connection.publish(getChannelName(), unlockMessage);
            if (connection.exec().size() == 2) {
                return;
            }

 
            // 如果事务失败,重试最多5次.  5次尝试后仍未成功抛出异常
            counter++;
        }
        throw new IllegalStateException("Can't unlock lock after 5 attempts. Current id: "
                + id + " thread-id: " + Thread.currentThread().getId());
    }

    @Override
    public Condition newCondition() {
        // TODO implement
        throw new UnsupportedOperationException();
    }

    // 关闭锁的订阅
    @Override
    public void close() {
        connectionManager.unsubscribe(pubSubEntry, getChannelName());
    }


    // 强制结果
    @Override
    public void forceUnlock() {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
        	// 循环解锁
        	// 这里看着会导致无限循环,应该在某个条件下退出循环。
            while (true) {
                LockValue lock = (LockValue) connection.get(getKeyName());
                // 如果锁存在,则调用 unlock() 方法释放锁
                if (lock != null) {
                    unlock(connection);
                }
            }
        } finally {
            connectionManager.release(connection);
        }
    }

    // 锁是否被持有
    @Override
    public boolean isLocked() {
        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
        	// 获取当前锁的值
            LockValue lock = (LockValue) connection.get(getKeyName());
            // 锁存在返回true
            return lock != null;
        } finally {
            connectionManager.release(connection);
        }
    }

    // 检查锁是否被当前线程持有
    @Override
    public boolean isHeldByCurrentThread() {
        LockValue currentLock = new LockValue(id, Thread.currentThread().getId());

        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
        	// 获取当前锁的值
            LockValue lock = (LockValue) connection.get(getKeyName());
            // 如果锁存在且等于currentLock  返回true
            return lock != null && lock.equals(currentLock);
        } finally {
            connectionManager.release(connection);
        }
    }

    // 返回当前线程持有锁的次数
    @Override
    public int getHoldCount() {
    	// 当前线程的锁状态
        LockValue currentLock = new LockValue(id, Thread.currentThread().getId());

        RedisConnection<Object, Object> connection = connectionManager.connection();
        try {
            LockValue lock = (LockValue) connection.get(getKeyName());
            // 如果锁存在且等于currentLock,则返回计数器的值
            if (lock != null && lock.equals(currentLock)) {
                return lock.getCounter();
            }
            return 0;
        } finally {
            connectionManager.release(connection);
        }
    }

}

重点看下这个锁的实现机制:

  • 核心是利用 Redis SETNX 命令来实现分布式锁的获取和释放。
  • 获取锁的时候,SETNX 命令尝试设置一个KEY(通常是锁的标识符)。如果 KEY 不存在,SETNX 会成功设置 KEY 并返回 true,表示锁获取成功。KEY 已经存在,SETNX 命令会返回 false,表示锁已经被其他客户端持有。
  • 释放锁的时候,只有持有锁的客户端可以释放锁。这个是通过检查持有锁时设置的唯一标识来确保锁可以安全释放。释放锁是通过 Redis 的 DEL 命令删除元素。
  • 锁是支持重入的。

这个版本的锁问题还是挺多。比如没有设置超时时间的,比如像客户端在持有锁的过程中关闭了,这样锁就会一直不释放。 通过设置 Redis 元素的过期时间,就可以达到自动释放锁的目的。

10. 对外核心类工具类 Redisson

这个类提供了访问各种 Redisson 分布式对象和功能的入口点,我们平时使用 Redisson 通常也就是直接使用这个工具类。

这个工具类没啥特别,主要是以下几个点:

  • 获取各种 Redisson 分布式对象时,都是先检查缓存,缓存中有就直接获取缓存对象。如果不存在则创建新实例,然后返回。这里说的缓存,是指的 Redisson 分布式对象,不是说的 Redis 的数据缓存。
  • 对象缓存采用了 Key Value 的引用类型,Key 都是使用的是强引用,Value 是软引用。引用主要是用到了 ReferenceMap 这个工具类。这样做的目的是在内存不足时自动回收不再使用的对象,减少内存泄漏的风险。软引用可以在内存不足的时候回收掉,强引用则不允许被回收。
package org.redisson;

import java.util.UUID;
import java.util.concurrent.ConcurrentMap;

import org.redisson.connection.ConnectionManager;
import org.redisson.core.RAtomicLong;
import org.redisson.core.RCountDownLatch;
import org.redisson.core.RList;
import org.redisson.core.RLock;
import org.redisson.core.RMap;
import org.redisson.core.RQueue;
import org.redisson.core.RSet;
import org.redisson.core.RTopic;
import org.redisson.misc.ReferenceMap;
import org.redisson.misc.ReferenceMap.ReferenceType;
import org.redisson.misc.ReferenceMap.RemoveValueListener;


public class Redisson {

    RemoveValueListener listener = new RemoveValueListener() {

        @Override
        public void onRemove(Object value) {
            if (value instanceof RedissonObject) {
                ((RedissonObject)value).close();
            }
        }

    };

    // 存储各种 Redisson 对象的线程安全映射
    // 用 ReferenceMap 来管理对象的生命周期
    private final ConcurrentMap<String, RedissonCountDownLatch> latchesMap = new ReferenceMap<String, RedissonCountDownLatch>(ReferenceType.STRONG, ReferenceType.SOFT, listener);
    private final ConcurrentMap<String, RedissonTopic> topicsMap = new ReferenceMap<String, RedissonTopic>(ReferenceType.STRONG, ReferenceType.SOFT, listener);
    private final ConcurrentMap<String, RedissonLock> locksMap = new ReferenceMap<String, RedissonLock>(ReferenceType.STRONG, ReferenceType.SOFT, listener);

    private final ConcurrentMap<String, RedissonAtomicLong> atomicLongsMap = new ReferenceMap<String, RedissonAtomicLong>(ReferenceType.STRONG, ReferenceType.SOFT);
    private final ConcurrentMap<String, RedissonQueue> queuesMap = new ReferenceMap<String, RedissonQueue>(ReferenceType.STRONG, ReferenceType.SOFT);
    private final ConcurrentMap<String, RedissonSet> setsMap = new ReferenceMap<String, RedissonSet>(ReferenceType.STRONG, ReferenceType.SOFT);
    private final ConcurrentMap<String, RedissonList> listsMap = new ReferenceMap<String, RedissonList>(ReferenceType.STRONG, ReferenceType.SOFT);
    private final ConcurrentMap<String, RedissonMap> mapsMap = new ReferenceMap<String, RedissonMap>(ReferenceType.STRONG, ReferenceType.SOFT);


    private final ConnectionManager connectionManager;   // 连接管理
    private final Config config;  // Redisson 配置

    private final UUID id = UUID.randomUUID();  // 生成唯一标识

    Redisson(Config config) {
        this.config = config;
        Config configCopy = new Config(config);
        connectionManager = new ConnectionManager(configCopy);
    }

    // 默认连接到 127.0.0.1:6379
    public static Redisson create() {
        Config config = new Config();
        config.addAddress("127.0.0.1:6379");
        return create(config);
    }

    public static Redisson create(Config config) {
        return new Redisson(config);
    }

    // 下面的 getxx() 都是用于获取各种 Redisson 分布式对象
    // 逻辑都是类似的,先检查缓存,如果不存在则创建新实例,然后返回。
    public <V> RList<V> getList(String name) {
        RedissonList<V> list = listsMap.get(name);
        if (list == null) {
            list = new RedissonList<V>(connectionManager, name);
            RedissonList<V> oldList = listsMap.putIfAbsent(name, list);
            if (oldList != null) {
                list = oldList;
            }
        }

        return list;
    }


    public <K, V> RMap<K, V> getMap(String name) {
        RedissonMap<K, V> map = mapsMap.get(name);
        if (map == null) {
            map = new RedissonMap<K, V>(connectionManager, name);
            RedissonMap<K, V> oldMap = mapsMap.putIfAbsent(name, map);
            if (oldMap != null) {
                map = oldMap;
            }
        }

        return map;
    }

    public RLock getLock(String name) {
        RedissonLock lock = locksMap.get(name);
        if (lock == null) {
            lock = new RedissonLock(connectionManager, name, id);
            RedissonLock oldLock = locksMap.putIfAbsent(name, lock);
            if (oldLock != null) {
                lock = oldLock;
            }
        }

        lock.subscribe();
        return lock;
    }

    public <V> RSet<V> getSet(String name) {
        RedissonSet<V> set = setsMap.get(name);
        if (set == null) {
            set = new RedissonSet<V>(connectionManager, name);
            RedissonSet<V> oldSet = setsMap.putIfAbsent(name, set);
            if (oldSet != null) {
                set = oldSet;
            }
        }

        return set;
    }

    public <M> RTopic<M> getTopic(String name) {
        RedissonTopic<M> topic = topicsMap.get(name);
        if (topic == null) {
            topic = new RedissonTopic<M>(connectionManager, name);
            RedissonTopic<M> oldTopic = topicsMap.putIfAbsent(name, topic);
            if (oldTopic != null) {
                topic = oldTopic;
            }
        }

        topic.subscribe();
        return topic;

    }

    public <V> RQueue<V> getQueue(String name) {
        RedissonQueue<V> queue = queuesMap.get(name);
        if (queue == null) {
            queue = new RedissonQueue<V>(connectionManager, name);
            RedissonQueue<V> oldQueue = queuesMap.putIfAbsent(name, queue);
            if (oldQueue != null) {
                queue = oldQueue;
            }
        }

        return queue;
    }

    public RAtomicLong getAtomicLong(String name) {
        RedissonAtomicLong atomicLong = atomicLongsMap.get(name);
        if (atomicLong == null) {
            atomicLong = new RedissonAtomicLong(connectionManager, name);
            RedissonAtomicLong oldAtomicLong = atomicLongsMap.putIfAbsent(name, atomicLong);
            if (oldAtomicLong != null) {
                atomicLong = oldAtomicLong;
            }
        }

        return atomicLong;

    }

    public RCountDownLatch getCountDownLatch(String name) {
        RedissonCountDownLatch latch = latchesMap.get(name);
        if (latch == null) {
            latch = new RedissonCountDownLatch(connectionManager, name);
            RedissonCountDownLatch oldLatch = latchesMap.putIfAbsent(name, latch);
            if (oldLatch != null) {
                latch = oldLatch;
            }
        }

        latch.subscribe();
        return latch;
    }

    //  关闭 Redisson 实例   但不会关闭 Redis 服务器
    public void shutdown() {
        connectionManager.shutdown();
    }

    public Config getConfig() {
        return config;
    }

}