RedissonV1.0源码分析-分布式对象篇
RedissonV1.0源码分析-分布式对象篇
这篇主要分析的是 Redisson V1.0 版本分布式对象的实现,也是最为核心的源码分析。
这个版本的 Redisson 一共提供了下面几个分布式对象:
分布式 AtomicLong
分布式 CountDownLatch
分布式 List
分布式 Map
分布式 Queue
分布式 Set
分布式 Topic
分布式 Lock
可以看出 V1.0 版本这些分布式对象基本可以满足实际项目中大部分的场景需求。
Redisson 中每一个分布式对象都有各自的接口抽象,从接口的定义就可以直接看出来接口可以实现什么功能。
1. 分布式对象基类
1.1 RObject 接口定义
这个接口是 Redisson 中所有分布式对象的基础接口。
package org.redisson.core;
public interface RObject {
// 获取对象的名称
String getName();
}
1.2 RedissonObject 抽象类定义
RedissonObject 是一个基础的抽象类,不能被直接实例化,只能用作其他类的基类。
package org.redisson;
abstract class RedissonObject {
private String name; // 分布式对象的名称,用于 Redis 里面存储
public RedissonObject(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void close() {
}
}
2. 分布式 AtomicLong
分布式 AtomicLong 对象主要是利用了 Redis 的元素值自增、递减等特性。
2.1 RAtomicLong 接口定义
Java 标准库中 AtomicLong 的分布式替代版本,跟 AtomicLong 的功能几乎一样。
package org.redisson.core;
public interface RAtomicLong extends RObject {
// 减 1, 再返回减之前的值
long getAndDecrement();
// 加指定数值,再返回加之后的值
long addAndGet(long delta);
// 如果当前的值等于预期的值,原子性地设置为新的值。 返回值是操作是否成功
boolean compareAndSet(long expect, long update);
// 减 1, 再返回减之后的值
long decrementAndGet();
// 获取当前值
long get();
// 加指定数值,再返回加之前的值
long getAndAdd(long delta);
// 设置新的值,返回旧的值
long getAndSet(long newValue);
// 自增1,然后返回自增后的值
long incrementAndGet();
// 自增1,然后返回自增前的值
long getAndIncrement();
// 设置新的值
void set(long newValue);
}
2.2 RedissonAtomicLong 源码实现
分布式 AtomicLong 的实现。
package org.redisson;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.RAtomicLong;
import com.lambdaworks.redis.RedisConnection;
public class RedissonAtomicLong extends RedissonObject implements RAtomicLong {
private final ConnectionManager connectionManager;
RedissonAtomicLong(ConnectionManager connectionManager, String name) {
super(name);
this.connectionManager = connectionManager;
}
@Override
public long addAndGet(long delta) {
// 获取一个可用连接
RedisConnection<String, Object> conn = connectionManager.connection();
try {
// 调用 incrby 方法,实际上调用的是 INCRBY 指令
return conn.incrby(getName(), delta);
} finally {
// 释放当前连接
connectionManager.release(conn);
}
}
@Override
public boolean compareAndSet(long expect, long update) {
RedisConnection<String, Object> conn = connectionManager.connection();
try {
// 无限循环,直到设置成功为止
while (true) {
// 使用 Redis 的 WATCH 命令监视 key
conn.watch(getName());
// 如果当前值不等于期望值,放弃事务并直接返回 false
Long value = (Long) conn.get(getName());
if (value != expect) {
conn.discard();
return false;
}
// 开始事务、设置新的值
conn.multi();
conn.set(getName(), update);
// 返回结果大小为1 则说明事务成功,返回 true。
if (conn.exec().size() == 1) {
return true;
}
}
} finally {
connectionManager.release(conn);
}
}
@Override
public long decrementAndGet() {
RedisConnection<String, Object> conn = connectionManager.connection();
try {
// 调用 decr 方法,实际上调用的是 Redis 的 DECR 指令
return conn.decr(getName());
} finally {
connectionManager.release(conn);
}
}
@Override
public long get() {
RedisConnection<String, Object> conn = connectionManager.connection();
try {
// 调用 decr 方法,实际上调用的是 Redis 的 GET 指令
return (Long) conn.get(getName());
} finally {
connectionManager.release(conn);
}
}
@Override
public long getAndAdd(long delta) {
while (true) {
long current = get();
long next = current + delta;
if (compareAndSet(current, next))
return current;
}
}
@Override
public long getAndSet(long newValue) {
RedisConnection<String, Object> conn = connectionManager.connection();
try {
// 实际上调用的是 Redis 的 GETSET 指令
return (Long) conn.getset(getName(), newValue);
} finally {
connectionManager.release(conn);
}
}
@Override
public long incrementAndGet() {
RedisConnection<String, Object> conn = connectionManager.connection();
try {
// 调用 incr 方法,实际上调用的是 Redis 的 INCR 指令
return conn.incr(getName());
} finally {
connectionManager.release(conn);
}
}
@Override
public long getAndIncrement() {
return getAndAdd(1);
}
public long getAndDecrement() {
return getAndAdd(-1);
}
@Override
public void set(long newValue) {
RedisConnection<String, Object> conn = connectionManager.connection();
try {
// 调用 Redis SET 指令设置值
conn.set(getName(), newValue);
} finally {
connectionManager.release(conn);
}
}
public String toString() {
return Long.toString(get());
}
}
3. 分布式 CountDownLatch
分布式 CountDownLatch 主要是利用了 Redis 的 get、set 、发布订阅机制实现。
3.1 RCountDownLatch 接口定义
这个就是相当于 Java 标准库里 java.util.concurrent.CountDownLatch 的分布式替代版本。
比如 java.util.concurrent.CountDownLatch 更有优势的地方在于计数值可以通过 trySetCount 方法重置。
运用在多个 JVM 或服务器之间去共享或者同步状态。
package org.redisson.core;
import java.util.concurrent.TimeUnit;
public interface RCountDownLatch extends RObject {
// 方法会导致当前线程等待,直到计数器降到零。除非是当前线程被中断。
// 计数器为零,方法会立即返回。
void await() throws InterruptedException;
// 支持超时时间的等待。
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
// 计数器减1
void countDown();
// 获取当前计数器的值。 用于调试跟测试
long getCount();
// 在计数器到0或者还没有设置的时候,设置一个新的计数器值。
/ 如果之前设置成功了计数值则返回 true
/ 如果之前的计数值还没有到达到 0 就返回 false
boolean trySetCount(long count);
}
3.2 RedissonCountDownLatch 源码实现
Java java.util.concurrent.CountDownLatch 的分布式实现。
package org.redisson;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.ConnectionManager.PubSubEntry;
import org.redisson.core.RCountDownLatch;
import org.redisson.misc.ReclosableLatch;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
public class RedissonCountDownLatch extends RedissonObject implements RCountDownLatch {
private final CountDownLatch subscribeLatch = new CountDownLatch(1); // 确保订阅操作只执行一次
private final String groupName = "redisson_countdownlatch_"; // 构建 Redis 中的 channel 名称的前缀
private static final Integer zeroCountMessage = 0; // 发布/订阅模式中传递消息的常量
private static final Integer newCountMessage = 1;
private final AtomicBoolean subscribeOnce = new AtomicBoolean(); // 确保只订阅一次
private final ReclosableLatch msg = new ReclosableLatch(); // 可重复的闭锁,用于线程间的同步
private final ConnectionManager connectionManager; // 连接管理
private PubSubEntry pubSubEntry; // 发布订阅的对象
RedissonCountDownLatch(ConnectionManager connectionManager, String name) {
super(name);
this.connectionManager = connectionManager;
}
public void subscribe() {
// 防止多线程环境下重复订阅
if (subscribeOnce.compareAndSet(false, true)) {
RedisPubSubAdapter<String, Integer> listener = new RedisPubSubAdapter<String, Integer>() {
@Override
public void subscribed(String channel, long count) {
// 查订阅的 channel 是否是期望的。是预期的调用 subscribeLatch.countDown(),释放等待订阅完成的线程
if (getChannelName().equals(channel)) {
subscribeLatch.countDown();
}
}
@Override
public void message(String channel, Integer message) {
if (!getChannelName().equals(channel)) {
return;
}
// 收到值为0 表示计数已经到达零,调用 msg.open() 来释放等待的线程
if (message.equals(zeroCountMessage)) {
msg.open();
}
// 收到值为 1 表示设置了新的计数,调用 msg.close() 来重置等待状态
if (message.equals(newCountMessage)) {
msg.close();
}
}
};
// 订阅指定的 channel,并关联创建的监听器
pubSubEntry = connectionManager.subscribe(listener, getChannelName());
}
// 等待订阅操作完成。await() 会阻塞当前线程,直到 countDown() 被调用
try {
subscribeLatch.await();
} catch (InterruptedException e) {
// 等待过程中被中断,重新设置线程的中断状态
Thread.currentThread().interrupt();
}
}
public void await() throws InterruptedException {
// 循环会一直执行,直到计数器的值变为 0 或小于 0
// 这里会循环通过访问 Redis 来获取最新的计数值
while (getCount() > 0) {
// waiting for message
msg.await();
}
}
// 带超时时间的等待
@Override
public boolean await(long time, TimeUnit unit) throws InterruptedException {
time = unit.toMillis(time);
while (getCount() > 0) {
if (time <= 0) {
return false;
}
// 记录下当前的时间
long current = System.currentTimeMillis();
// 等待消息,传入超时时间等待
msg.await(time, TimeUnit.MILLISECONDS);
// 扣减本次用掉的时间
long elapsed = System.currentTimeMillis() - current;
time = time - elapsed;
}
return true;
}
@Override
public void countDown() {
// 当前计数已经小于等于 0 直接返回
if (getCount() <= 0) {
return;
}
RedisConnection<String, Object> connection = connectionManager.connection();
try {
// 调用 Redis 指令对数值减 1
Long val = connection.decr(getName());
// 等于 0 的操作
if (val == 0) {
// 开启事务 > 删除计数器 > 发布消息通知等待的其他线程 > 执行事务检查操作结果
connection.multi();
connection.del(getName());
connection.publish(getChannelName(), zeroCountMessage);
if (connection.exec().size() != 2) {
throw new IllegalStateException();
}
} else if (val < 0) {
// 小于 0 是异常情况,直接删除掉
connection.del(getName());
}
} finally {
connectionManager.release(connection);
}
}
private String getChannelName() {
return groupName + getName();
}
// 获取计数器当前值
@Override
public long getCount() {
RedisConnection<String, Object> connection = connectionManager.connection();
try {
// 直接调用 Redis get 指令查询计数器的当前值, 查询不到默认为 0
Number val = (Number) connection.get(getName());
if (val == null) {
return 0;
}
return val.longValue();
} finally {
connectionManager.release(connection);
}
}
// 尝试设置计数器的值
@Override
public boolean trySetCount(long count) {
RedisConnection<String, Object> connection = connectionManager.connection();
try {
// 使用 WATCH 命令监视计数器键,为后续的事务做准备
connection.watch(getName());
// 获取这个名称的计数器是不是有值,有值就放弃本次操作
Long oldValue = (Long) connection.get(getName());
if (oldValue != null) {
connection.discard();
return false;
}
开始事务 > 设置计数器的值到 Redis > 发布一个消息通知其他线程有新的计数值设置 > 执行事务检查结果
connection.multi();
connection.set(getName(), count);
connection.publish(getChannelName(), newCountMessage);
return connection.exec().size() == 2;
} finally {
connectionManager.release(connection);
}
}
public void close() {
connectionManager.unsubscribe(pubSubEntry, getChannelName());
}
}
4. 分布式 List
分布式 List 对象主要是利用了 Redis 的 List 类型存储数据。
4.1 RList 接口定义
这接口就是内置的 List 接口的分布式实现,没很特殊的地方。
package org.redisson.core;
import java.util.List;
public interface RList<V> extends List<V>, RObject {
}
4.2 RedissonList 源码实现
java.util.List 接口的分布式实现,主要是利用了 Redis 的 List 结构。
package org.redisson;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.RList;
import com.lambdaworks.redis.RedisConnection;
public class RedissonList<V> extends RedissonObject implements RList<V> {
private int batchSize = 50; // 批量操作的大小 默认为50。 这个参数会影响一些批量操作的性能
private final ConnectionManager connectionManager;
RedissonList(ConnectionManager connectionManager, String name) {
super(name);
this.connectionManager = connectionManager;
}
protected ConnectionManager getConnectionManager() {
return connectionManager;
}
@Override
public int size() {
RedisConnection<String, Object> connection = connectionManager.connection();
try {
// 调用 Redis LLEN 命令获取 List 的长度
return connection.llen(getName()).intValue();
} finally {
connectionManager.release(connection);
}
}
@Override
public boolean isEmpty() {
return size() == 0;
}
@Override
public boolean contains(Object o) {
return indexOf(o) != -1;
}
@Override
public Iterator<V> iterator() {
return listIterator();
}
@Override
public Object[] toArray() {
List<V> list = subList(0, size());
return list.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
List<V> list = subList(0, size());
return list.toArray(a);
}
@Override
public boolean add(V e) {
return addAll(Collections.singleton(e));
}
@Override
public boolean remove(Object o) {
RedisConnection<String, Object> connection = connectionManager.connection();
try {
// 使用 Redis LREM 移除目标元素。
return connection.lrem(getName(), 1, o) > 0;
} finally {
connectionManager.release(connection);
}
}
@Override
public boolean containsAll(Collection<?> c) {
// 发送 Redis 命令查询是否非空
if (isEmpty()) {
return false;
}
RedisConnection<String, Object> connection = connectionManager.connection();
try {
// 创建集合的副本
Collection<Object> copy = new ArrayList<Object>(c);
// 计算需要分多少批操作
int to = div(size(), batchSize);
for (int i = 0; i < to; i++) {
// 调用 LRANGE 命令获取一批元素
List<Object> range = connection.lrange(getName(), i*batchSize, i*batchSize + batchSize - 1);
// 迭代需要判断是否存在的元素
for (Iterator<Object> iterator = copy.iterator(); iterator.hasNext();) {
Object obj = iterator.next();
// 看元素是不是在本次 Redis 查询出来的数据中
int index = range.indexOf(obj);
// 找到了从副本集合中移除
if (index != -1) {
iterator.remove();
}
}
}
// 复制集合为空,说明所有元素都被找到
return copy.isEmpty();
} finally {
connectionManager.release(connection);
}
}
@Override
public boolean addAll(Collection<? extends V> c) {
RedisConnection<String, Object> conn = connectionManager.connection();
try {
// 调用 Redis RPUSH 将集合中的所有元素添加到 List 的末尾
conn.rpush(getName(), c.toArray());
return true;
} finally {
connectionManager.release(conn);
}
}
@Override
public boolean addAll(int index, Collection<? extends V> coll) {
// 先检查下标是不是超过 redis list 总元素大小了
checkPosition(index);
// 如果index小于当前列表的大小,则需要在中间插入元素
if (index < size()) {
RedisConnection<String, Object> conn = connectionManager.connection();
try {
while (true) {
// WATCH命令监视列表的变化
conn.watch(getName());
// LRANGE 获取从 index 到 List 末尾的所有元素
List<Object> tail = conn.lrange(getName(), index, size());
// 开启事务
conn.multi();
// LTRIM 命令保留从开始到 index-1 的元素,移除 index 及其后的元素
conn.ltrim(getName(), 0, index - 1);
// RPUSH命令将集合入参中的元素插入到 List 中
conn.rpush(getName(), coll.toArray());
// 再次使用 RPUSH 命令将之前获取的尾部元素重新添加到列表中
conn.rpush(getName(), tail.toArray());
// 执行事务。 返回的结果大小为3则为执行成功
if (conn.exec().size() == 3) {
return true;
}
}
} finally {
connectionManager.release(conn);
}
} else {
// 下标在 List 末尾的情况
return addAll(coll);
}
}
@Override
public boolean removeAll(Collection<?> c) {
RedisConnection<String, Object> conn = connectionManager.connection();
try {
boolean result = false;
// 使用 LREM 命令逐个移除
for (Object object : c) {
boolean res = conn.lrem(getName(), 0, object) > 0;
if (!result) {
result = res;
}
}
return result;
} finally {
connectionManager.release(conn);
}
}
// 保留传入集合中的元素,其他的删除掉
@Override
public boolean retainAll(Collection<?> c) {
boolean changed = false;
// 通过迭代器遍历 List,移除不在集合中的元素
for (Iterator<V> iterator = iterator(); iterator.hasNext();) {
V object = iterator.next();
if (!c.contains(object)) {
iterator.remove();
changed = true;
}
}
return changed;
}
@Override
public void clear() {
RedisConnection<String, Object> conn = connectionManager.connection();
try {
// DEL 命令删除元素
conn.del(getName());
} finally {
connectionManager.release(conn);
}
}
@Override
public V get(int index) {
// 先检查下标是不是超过 redis list 总元素大小
checkIndex(index);
RedisConnection<String, Object> conn = connectionManager.connection();
try {
// LINDEX 命令获取指定索引的元素
return (V) conn.lindex(getName(), index);
} finally {
connectionManager.release(conn);
}
}
private void checkIndex(int index) {
int size = size();
if (!isInRange(index, size))
throw new IndexOutOfBoundsException("index: " + index + " but current size: "+ size);
}
private boolean isInRange(int index, int size) {
return index >= 0 && index < size;
}
private void checkPosition(int index) {
int size = size();
if (!isPositionInRange(index, size))
throw new IndexOutOfBoundsException("index: " + index + " but current size: "+ size);
}
private boolean isPositionInRange(int index, int size) {
return index >= 0 && index <= size;
}
@Override
public V set(int index, V element) {
// 先检查下标是不是超过 redis list 总元素大小
checkIndex(index);
RedisConnection<String, Object> conn = connectionManager.connection();
try {
// LINDEX 获取之前的对象
V prev = (V) conn.lindex(getName(), index);
// LSET 设置值 指定下标
conn.lset(getName(), index, element);
return prev;
} finally {
connectionManager.release(conn);
}
}
@Override
public void add(int index, V element) {
addAll(index, Collections.singleton(element));
}
// 计算要处理的总批次
private int div(int p, int q) {
int div = p / q;
int rem = p - q * div; // equal to p % q
if (rem == 0) {
return div;
}
return div + 1;
}
@Override
public V remove(int index) {
checkIndex(index);
RedisConnection<String, Object> conn = connectionManager.connection();
try {
if (index == 0) {
return (V) conn.lpop(getName());
}
while (true) {
// WATCH 命令监视列表的变化
conn.watch(getName());
// LINDEX 命令获取指定下标元素
V prev = (V) conn.lindex(getName(), index);
// LRANG 取出从 index+1 到列表末尾的所有元素。
List<Object> tail = conn.lrange(getName(), index + 1, size());
// 开启事务
conn.multi();
// LTRIM 命令保留 Redis List 从开始到 index-1 的元素 。 移除index及其后的元素
conn.ltrim(getName(), 0, index - 1);
// RPUSH 命令将之前获取的尾部元素重新添加到 Redis List中
conn.rpush(getName(), tail.toArray());
// 返回的结果大小为 2 则为事务执行成功,返回上一个元素的值
if (conn.exec().size() == 2) {
return prev;
}
}
} finally {
connectionManager.release(conn);
}
}
@Override
public int indexOf(Object o) {
if (isEmpty()) {
return -1;
}
RedisConnection<String, Object> conn = connectionManager.connection();
try {
// 计算总批次
int to = div(size(), batchSize);
for (int i = 0; i < to; i++) {
// LRANG 命令取出一批数据,判断是否在这一批数据里面
List<Object> range = conn.lrange(getName(), i*batchSize, i*batchSize + batchSize - 1);
int index = range.indexOf(o);
if (index != -1) {
return index + i*batchSize;
}
}
return -1;
} finally {
connectionManager.release(conn);
}
}
@Override
public int lastIndexOf(Object o) {
if (isEmpty()) {
return -1;
}
RedisConnection<String, Object> conn = connectionManager.connection();
try {
// 拿到 Redis List 总元素大小
int size = size();
// 计算要处理的总批次
int to = div(size, batchSize);
// 循环处理
for (int i = 1; i <= to; i++) {
int startIndex = -i*batchSize;
// LRANGE命令获取当前批次的元素
List<Object> range = conn.lrange(getName(), startIndex, size - (i-1)*batchSize);
// 从 Redis 获取的数据范围内查找元素的最后一个索引
int index = range.lastIndexOf(o);
if (index != -1) {
// 找到了该元素,计算其在整个列表中的实际索引并返回
return Math.max(size + startIndex, 0) + index;
}
}
return -1;
} finally {
connectionManager.release(conn);
}
}
@Override
public ListIterator<V> listIterator() {
return listIterator(0);
}
// 迭代器实现
// 支持双向遍历和修改
@Override
public ListIterator<V> listIterator(final int ind) {
return new ListIterator<V>() {
// 当前迭代器指向的索引,初始化为ind - 1,因为 next() 会先增加索引
private int currentIndex = ind - 1;
// 是否执行过remove 操作,防止重复删除
private boolean removeExecuted;
@Override
public boolean hasNext() {
// 检查是否存在下一个元素,确保 currentIndex+1 小于列表大小且列表非空
int size = size();
return currentIndex+1 < size && size > 0;
}
@Override
public V next() {
// 返回下一个元素,如果没有下一个元素则抛出NoSuchElementException
if (!hasNext()) {
throw new NoSuchElementException("No such element at index " + currentIndex);
}
// 更新 currentIndex 并重置 removeExecuted 状态
currentIndex++;
removeExecuted = false;
return RedissonList.this.get(currentIndex);
}
@Override
public void remove() {
// 已执行过remove()则抛出IllegalStateException。
if (removeExecuted) {
throw new IllegalStateException("Element been already deleted");
}
// 调用 remove 方法 更新 currentIndex 和 removeExecuted
RedissonList.this.remove(currentIndex);
currentIndex--;
removeExecuted = true;
}
@Override
public boolean hasPrevious() {
int size = size();
return currentIndex-1 < size && size > 0 && currentIndex >= 0;
}
@Override
public V previous() {
if (!hasPrevious()) {
throw new NoSuchElementException("No such element at index " + currentIndex);
}
removeExecuted = false;
V res = RedissonList.this.get(currentIndex);
currentIndex--;
return res;
}
@Override
public int nextIndex() {
return currentIndex + 1;
}
@Override
public int previousIndex() {
return currentIndex;
}
@Override
public void set(V e) {
if (currentIndex >= size()-1) {
throw new IllegalStateException();
}
RedissonList.this.set(currentIndex, e);
}
@Override
public void add(V e) {
RedissonList.this.add(currentIndex+1, e);
currentIndex++;
}
};
}
@Override
public List<V> subList(int fromIndex, int toIndex) {
int size = size();
if (fromIndex < 0 || toIndex > size) {
throw new IndexOutOfBoundsException("fromIndex: " + fromIndex + " toIndex: " + toIndex + " size: " + size);
}
if (fromIndex > toIndex) {
throw new IllegalArgumentException("fromIndex: " + fromIndex + " toIndex: " + toIndex);
}
RedisConnection<String, Object> conn = connectionManager.connection();
try {
// LRANGE 命令获取指定范围的元素
return (List<V>) conn.lrange(getName(), fromIndex, toIndex - 1);
} finally {
connectionManager.release(conn);
}
}
public String toString() {
Iterator<V> it = iterator();
if (! it.hasNext())
return "[]";
StringBuilder sb = new StringBuilder();
sb.append('[');
for (;;) {
V e = it.next();
sb.append(e == this ? "(this Collection)" : e);
if (! it.hasNext())
return sb.append(']').toString();
sb.append(',').append(' ');
}
}
}
5. 分布式 Map
分布式 Map 对象主要是利用了 Redis 的 Hash 类型存储数据。
5.1 RMap 接口定义
分布式 Map 实现,也没有很特殊的地方。 实现了 java.util.concurrent.ConcurrentMap 跟 java.util.Map 接口
package org.redisson.core;
import java.util.concurrent.ConcurrentMap;
public interface RMap<K, V> extends ConcurrentMap<K, V>, RObject {
}
5.2 RedissonMap 源码实现
分布式 Map 实现
package org.redisson;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.RMap;
import com.lambdaworks.redis.RedisConnection;
//TODO implement watching by keys instead of map name
public class RedissonMap<K, V> extends RedissonObject implements RMap<K, V> {
private final ConnectionManager connectionManager;
RedissonMap(ConnectionManager connectionManager, String name) {
super(name);
this.connectionManager = connectionManager;
}
@Override
public int size() {
RedisConnection<String, Object> connection = connectionManager.connection();
try {
// HLEN 命令查询 hash 总元素个数
return connection.hlen(getName()).intValue();
} finally {
connectionManager.release(connection);
}
}
@Override
public boolean isEmpty() {
return size() == 0;
}
@Override
public boolean containsKey(Object key) {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
// HEXISTS 命令查看元素是否存在
return connection.hexists(getName(), key);
} finally {
connectionManager.release(connection);
}
}
@Override
public boolean containsValue(Object value) {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
// HVALS 命令查看 value 是否存在
return connection.hvals(getName()).contains(value);
} finally {
connectionManager.release(connection);
}
}
@Override
public V get(Object key) {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
// HGET 命令查询 key
return (V) connection.hget(getName(), key);
} finally {
connectionManager.release(connection);
}
}
@Override
public V put(K key, V value) {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
// HGET 命令查询旧的值
V prev = (V) connection.hget(getName(), key);
// HSET 命令设置新的值
connection.hset(getName(), key, value);
return prev;
} finally {
connectionManager.release(connection);
}
}
@Override
public V remove(Object key) {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
// HGET 命令查询旧的值
V prev = (V) connection.hget(getName(), key);
// HDEL 移除元素
connection.hdel(getName(), key);
return prev;
} finally {
connectionManager.release(connection);
}
}
@Override
public void putAll(Map<? extends K, ? extends V> map) {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
// HMSET 设置一批 hash 值
connection.hmset(getName(), (Map<Object, Object>) map);
} finally {
connectionManager.release(connection);
}
}
@Override
public void clear() {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
// HDEL 命令删除元素
connection.del(getName());
} finally {
connectionManager.release(connection);
}
}
@Override
public Set<K> keySet() {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
// HKEYS 返回所有的 hash key
return new LinkedHashSet<K>((Collection<? extends K>) connection.hkeys(getName()));
} finally {
connectionManager.release(connection);
}
}
@Override
public Collection<V> values() {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
// HVALS 返回所有的 hash value
return (Collection<V>) connection.hvals(getName());
} finally {
connectionManager.release(connection);
}
}
@Override
public Set<java.util.Map.Entry<K, V>> entrySet() {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
// HKEYS 命令获取 Redis 哈希表中所有的键s, 并将其转换为 LinkedHashSet 保持顺序
Map<Object, Object> map = connection.hgetall(getName());
Map<K, V> result = new HashMap<K, V>();
for (java.util.Map.Entry<Object, Object> entry : map.entrySet()) {
result.put((K)entry.getKey(), (V)entry.getValue());
}
return result.entrySet();
} finally {
connectionManager.release(connection);
}
}
@Override
public V putIfAbsent(K key, V value) {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
while (true) {
// HSETNX 命令设置键值对,如果键不存在则设置成功
Boolean res = connection.hsetnx(getName(), key, value);
// 设置失败,说明键已存在,返回当前键的值
if (!res) {
V result = get(key);
if (result != null) {
return result;
}
} else {
return null;
}
}
} finally {
connectionManager.release(connection);
}
}
private boolean isEquals(RedisConnection<Object, Object> connection, Object key, Object value) {
Object val = connection.hget(getName(), key);
return (value != null && value.equals(val)) || (value == null && val == null);
}
@Override
public boolean remove(Object key, Object value) {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
while (true) {
// watch命令监视键
connection.watch(getName());
// 检查键是否存在并且值又匹配
if (connection.hexists(getName(), key)
&& isEquals(connection, key, value)) {
// 开启事务 > 删除键 > 判断事务执行成功
connection.multi();
connection.hdel(getName(), key);
if (connection.exec().size() == 1) {
return true;
}
} else {
return false;
}
}
} finally {
connectionManager.release(connection);
}
}
@Override
public boolean replace(K key, V oldValue, V newValue) {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
while (true) {
// watch命令监视键
connection.watch(getName());
// 检查键是否存在并且值又匹配
if (connection.hexists(getName(), key)
&& isEquals(connection, key, oldValue)) {
// 开启事务 > 设置键 > 判断事务执行成功
connection.multi();
connection.hset(getName(), key, newValue);
if (connection.exec().size() == 1) {
return true;
}
} else {
return false;
}
}
} finally {
connectionManager.release(connection);
}
}
@Override
public V replace(K key, V value) {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
while (true) {
// watch命令监视键
connection.watch(getName());
// 检查键是否存在并且值又匹配
if (connection.hexists(getName(), key)) {
// 获取旧的值
V prev = (V) connection.hget(getName(), key);
// 开始事务 > 设置值 > 检查事务执行是否成功
connection.multi();
connection.hset(getName(), key, value);
if (connection.exec().size() == 1) {
// 返回旧的值
return prev;
}
}
return null;
}
} finally {
connectionManager.release(connection);
}
}
}
6. 分布式 Queue
分布式 Queue 对象主要是利用了 Redis 的 List 类型存储数据。
6.1 RQueue 接口定义
分布式队列接口,实现了 java.util.List 接口。
package org.redisson.core;
import java.util.Queue;
public interface RQueue<V> extends Queue<V>, RObject {
}
6.1 RedissonQueue 源码实现
分布式队列实现。 总体来说实现还是比较简单,主要是利用了 Redis List 的相关 API 实现。
package org.redisson;
import java.util.NoSuchElementException;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.RQueue;
import com.lambdaworks.redis.RedisConnection;
public class RedissonQueue<V> extends RedissonList<V> implements RQueue<V> {
RedissonQueue(ConnectionManager connectionManager, String name) {
super(connectionManager, name);
}
// 将元素添加到队列中
@Override
public boolean offer(V e) {
return add(e);
}
// 获取队列的第一个元素
public V getFirst() {
RedisConnection<String, Object> connection = getConnectionManager().connection();
try {
// LINDEX 命令读取 Redis List 的第一个元素
V value = (V) connection.lindex(getName(), 0);
if (value == null) {
// 元素不存在,抛出异常
throw new NoSuchElementException();
}
return value;
} finally {
getConnectionManager().release(connection);
}
}
// 移除并返回队列的第一个元素
public V removeFirst() {
RedisConnection<String, Object> connection = getConnectionManager().connection();
try {
// lpop 命令从 Redis List 中弹出第一个元素
V value = (V) connection.lpop(getName());
if (value == null) {
// 元素不存在,抛出异常
throw new NoSuchElementException();
}
return value;
} finally {
getConnectionManager().release(connection);
}
}
@Override
public V remove() {
return removeFirst();
}
@Override
public V poll() {
RedisConnection<String, Object> connection = getConnectionManager().connection();
try {
// 尝试从队列中移除 并返回第一个元素
return (V) connection.lpop(getName());
} finally {
getConnectionManager().release(connection);
}
}
// 返回队列的第一个元素
@Override
public V element() {
return getFirst();
}
// 返回队列的第一个元素 但不移除
@Override
public V peek() {
if (isEmpty()) {
return null;
}
return get(0);
}
}
7. 分布式 Set
分布式 List 对象主要是利用了 Redis 的 Set 类型存储数据。
7.1 RSet 接口定义
分布式集合接口,实现了 java.util.Set 接口。
package org.redisson.core;
import java.util.Set;
public interface RSet<V> extends Set<V>, RObject {
}
7.2 RedissonSet 源码实现
java.util.Set 的分布式集合实现。
package org.redisson;
import java.util.Collection;
import java.util.Iterator;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.RSet;
import com.lambdaworks.redis.RedisConnection;
public class RedissonSet<V> extends RedissonObject implements RSet<V> {
private final ConnectionManager connectionManager;
RedissonSet(ConnectionManager connectionManager, String name) {
super(name);
this.connectionManager = connectionManager;
}
@Override
public int size() {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
// 调用 scard 命令获取集合的大小
return connection.scard(getName()).intValue();
} finally {
connectionManager.release(connection);
}
}
// 集合是否为空
@Override
public boolean isEmpty() {
return size() == 0;
}
@Override
public boolean contains(Object o) {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
// 使用 Redis 的 sismember 命令进行判断包含该元素
return connection.sismember(getName(), o);
} finally {
connectionManager.release(connection);
}
}
@Override
public Iterator<V> iterator() {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
// TODO use SSCAN in case of usage Redis 2.8 后面用 SSCAN 命令来优化大集合的迭代
// 使用 Redis 的 smembers 命令获取集合中的所有元素
return (Iterator<V>) connection.smembers(getName()).iterator();
} finally {
connectionManager.release(connection);
}
}
@Override
public Object[] toArray() {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
// 将集合中的元素转换为数组返回
return connection.smembers(getName()).toArray();
} finally {
connectionManager.release(connection);
}
}
@Override
public <T> T[] toArray(T[] a) {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
return connection.smembers(getName()).toArray(a);
} finally {
connectionManager.release(connection);
}
}
@Override
public boolean add(V e) {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
// sadd 命令添加元素 返回是否成功添加
return connection.sadd(getName(), e) > 0;
} finally {
connectionManager.release(connection);
}
}
@Override
public boolean remove(Object o) {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
// srem 命令移除元素,并返回是否成功移除
return connection.srem(getName(), o) > 0;
} finally {
connectionManager.release(connection);
}
}
@Override
public boolean containsAll(Collection<?> c) {
// 循环执行 sismember 命令进行判断包含该元素
for (Object object : c) {
if (!contains(object)) {
return false;
}
}
return true;
}
@Override
public boolean addAll(Collection<? extends V> c) {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
// 调用 sadd 命令添加所有元素
return connection.sadd(getName(), c.toArray()) > 0;
} finally {
connectionManager.release(connection);
}
}
// 保留传入集合中的这些元素,其他的删除掉
@Override
public boolean retainAll(Collection<?> c) {
boolean changed = false;
for (Iterator<V> iterator = iterator(); iterator.hasNext();) {
V object = iterator.next();
// 循环调用 sismember 命令进行判断包含该元素
if (!c.contains(object)) {
// 调用删除命令
iterator.remove();
changed = true;
}
}
return changed;
}
@Override
public boolean removeAll(Collection<?> c) {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
// 使用 srem 命令移除指定的元素
return connection.srem(getName(), c.toArray()) > 0;
} finally {
connectionManager.release(connection);
}
}
@Override
public void clear() {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
// 使用 Redis 的 del 命令删除集合
connection.del(getName());
} finally {
connectionManager.release(connection);
}
}
}
8. 分布式 Topic
分布式 List 对象主要是利用了 Redis 的发布订阅机制,Redisson 中很多功能都会利用 Topic 机制去实现。
8.1 Topic MessageListener 接口定义
用于消息监听的接口,这个接口实现了 java.util.EventListener 接口,表明这是一个事件监听器。 EventListener 接口本身是个空接口,不包含任务方法定义。
package org.redisson.core;
import java.util.EventListener;
public interface MessageListener<M> extends EventListener {
// 订阅的 Redis topic 接收到新消息时,onMessage 方法会被调用
void onMessage(M msg);
}
8.2 RTopic 接口定义
分布式 Topic 接口,实现了发布-订阅模式,可以用来在分布式系统中进行消息传递。
package org.redisson.core;
// 消息会被传递到 Redis 集群中的所有消息监听器。
public interface RTopic<M> extends RObject {
// 向此 Topic 的所有订阅者发布消息
void publish(M message);
// 订阅此 Topic 。当任何消息发布到这个 Topic 的时候,会调用 MessageListener.onMessage() 。
int addListener(MessageListener<M> listener);
// 通过 listenerId 移除这个 Topic 的监听器。
void removeListener(int listenerId);
}
8.3 RedissonTopic 源码实现
分布式 Topic 的实现。
package org.redisson;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.ConnectionManager.PubSubEntry;
import org.redisson.core.MessageListener;
import org.redisson.core.RTopic;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
public class RedissonTopic<M> extends RedissonObject implements RTopic<M> {
private final CountDownLatch subscribeLatch = new CountDownLatch(1); // 确保订阅操作只执行一次
private final AtomicBoolean subscribeOnce = new AtomicBoolean(); // 标记是否已经订阅
private final Map<Integer, RedisPubSubTopicListenerWrapper<String, M>> listeners =
new ConcurrentHashMap<Integer, RedisPubSubTopicListenerWrapper<String, M>>(); // 所有的监听器
private final ConnectionManager connectionManager;
private PubSubEntry pubSubEntry; // 订阅的条目
RedissonTopic(ConnectionManager connectionManager, String name) {
super(name);
this.connectionManager = connectionManager;
}
public void subscribe() {
// 保证只订阅一次
if (subscribeOnce.compareAndSet(false, true)) {
RedisPubSubAdapter<String, M> listener = new RedisPubSubAdapter<String, M>() {
@Override
public void subscribed(String channel, long count) {
// 订阅成功时,调用 countDown 方法
if (channel.equals(getName())) {
subscribeLatch.countDown();
}
}
};
// 执行订阅操作
pubSubEntry = connectionManager.subscribe(listener, getName());
}
try {
// 确保订阅完成后再继续执行后续的逻辑
subscribeLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 发布消息到 topic
@Override
public void publish(M message) {
RedisConnection<String, Object> conn = connectionManager.connection();
try {
// 使用 publish 命令发送消息
conn.publish(getName(), message);
} finally {
connectionManager.release(conn);
}
}
// 添加消息监听器
@Override
public int addListener(MessageListener<M> listener) {
RedisPubSubTopicListenerWrapper<String, M> pubSubListener = new RedisPubSubTopicListenerWrapper<String, M>(listener, getName());
listeners.put(pubSubListener.hashCode(), pubSubListener);
pubSubEntry.addListener(pubSubListener);
return pubSubListener.hashCode();
}
// 移除监听器
@Override
public void removeListener(int listenerId) {
RedisPubSubTopicListenerWrapper<String, M> pubSubListener = listeners.remove(listenerId);
pubSubEntry.removeListener(pubSubListener);
}
// 取消订阅
@Override
public void close() {
// 使用 unsubscribe 命令取消订阅
connectionManager.unsubscribe(pubSubEntry, getName());
}
}
9. 分布式 Lock
这一个版本的分布式锁,主要是采用 Redis 的 SETNX 命令实现分布式锁的获取和释放。
9.1 RLock 接口定义
Java 标准库中 java.util.concurrent.locks.Lock 接口的分布式实现,可以用来实现分布式锁。
package org.redisson.core;
import java.util.concurrent.locks.Lock;
public interface RLock extends Lock, RObject {
// 强制解锁,不考虑锁的状态
void forceUnlock();
// 检查这个锁是否被 Redisson 集群中的任何线程锁定。 锁定返回 true,否则返回 false
boolean isLocked();
// 检查这个锁是否被当前线程持有。 当前线程持有则返回 true,否则返回 false
boolean isHeldByCurrentThread();
// 当前线程对这个锁的持有次数。如果这个锁没有被当前线程持有则返回 0
// 这个主要是支持重入特性,可以通过这个值来判断重入多少次
int getHoldCount();
}
9.2 RedissonLock 源码实现
java.util.concurrent.locks.Lock 的可重入分布式锁实现。 Redisson 用的最多的场景估计就是这个了,很多人都是从这个功能了解到 Redisson 的。
package org.redisson;
import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.ConnectionManager.PubSubEntry;
import org.redisson.core.RLock;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
public class RedissonLock extends RedissonObject implements RLock {
public static class LockValue implements Serializable {
private static final long serialVersionUID = -8895632286065689476L;
private UUID id; // 锁的唯一ID
private Long threadId; // 线程ID
// need for reentrant support
private int counter; // 计数器,支持重入锁
public LockValue() {
}
public LockValue(UUID id, Long threadId) {
super();
this.id = id;
this.threadId = threadId;
}
public void decCounter() {
counter--;
}
public void incCounter() {
counter++;
}
public int getCounter() {
return counter;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((id == null) ? 0 : id.hashCode());
result = prime * result + ((threadId == null) ? 0 : threadId.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
LockValue other = (LockValue) obj;
if (id == null) {
if (other.id != null)
return false;
} else if (!id.equals(other.id))
return false;
if (threadId == null) {
if (other.threadId != null)
return false;
} else if (!threadId.equals(other.threadId))
return false;
return true;
}
}
private final ConnectionManager connectionManager;
private final UUID id; // 锁的唯一ID
private final String groupName = "redisson_lock_"; // 锁的组名前缀,所有锁的名称都将以此为前缀
private static final Integer unlockMessage = 0; // 发布订阅机制中的解锁消息值
private final CountDownLatch subscribeLatch = new CountDownLatch(1); // 确保订阅操作完成后再进行其他操作
private final AtomicBoolean subscribeOnce = new AtomicBoolean(); // 确保订阅操作只执行一次的标记
private final Semaphore msg = new Semaphore(1); // 信号量控制锁的获取和释放,初始许可数为1。
private PubSubEntry pubSubEntry;
RedissonLock(ConnectionManager connectionManager, String name, UUID id) {
super(name);
this.connectionManager = connectionManager;
this.id = id;
}
// 订阅 Redis channel ,接收解锁消息
public void subscribe() {
// 保证订阅操作只执行一次
if (subscribeOnce.compareAndSet(false, true)) {
// 获取信号量,确保在订阅完成之前不进行锁操作
msg.acquireUninterruptibly();
RedisPubSubAdapter<String, Integer> listener = new RedisPubSubAdapter<String, Integer>() {
@Override
public void subscribed(String channel, long count) {
// 减少 subscribeLatch 的计数
if (getChannelName().equals(channel)) {
subscribeLatch.countDown();
}
}
@Override
public void message(String channel, Integer message) {
// 接收到解锁消息时释放信号量
if (message.equals(unlockMessage) && getChannelName().equals(channel)) {
msg.release();
}
}
};
// 订阅指定 channel
pubSubEntry = connectionManager.subscribe(listener, getChannelName());
}
try {
// 等待订阅完成
subscribeLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 支持打断的锁获取
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
// 如果线程在等待获取锁时被中断,捕获异常并设置线程的中断状态
Thread.currentThread().interrupt();
return;
}
}
private String getKeyName() {
return groupName + getName();
}
private String getChannelName() {
return groupName + getName();
}
@Override
public void lockInterruptibly() throws InterruptedException {
// 循环调用 tryLock 方法直到成功拿到锁
while (!tryLock()) {
// 没拿到锁阻塞当前线程,一直到信号量可用。 这是通过订阅 Redis channel 来监听解锁消息实现
msg.acquire();
}
}
@Override
public boolean tryLock() {
// 表示当前线程的锁状态,并增加计数器
LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
currentLock.incCounter();
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
// 调用 setnx 加锁
Boolean res = connection.setnx(getKeyName(), currentLock);
// 锁已被其他线程持有
if (!res) {
LockValue lock = (LockValue) connection.get(getKeyName());
// 如果 lock 不为 null且等于 currentLock , 表示当前线程已持有锁 , 就增加计数器并更新锁状态
if (lock != null && lock.equals(currentLock)) {
lock.incCounter();
connection.set(getKeyName(), lock);
return true;
}
}
return res;
} finally {
connectionManager.release(connection);
}
}
// 带超时时间获取锁,过了超时时间就算获取锁失败
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
time = unit.toMillis(time);
while (!tryLock()) {
if (time <= 0) {
return false;
}
long current = System.currentTimeMillis();
// waiting for message
msg.tryAcquire(time, TimeUnit.MILLISECONDS);
long elapsed = System.currentTimeMillis() - current;
time -= elapsed;
}
return true;
}
// 解锁流程
@Override
public void unlock() {
// 当前线程的锁状态
LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
// 查看当前连接的锁信息
LockValue lock = (LockValue) connection.get(getKeyName());
// 如果 lock 不为 null 且等于 currentLock , 说明已经被当前线程持有了
if (lock != null && lock.equals(currentLock)) {
// 计数器大于0,说明其他地方还要用这个锁
if (lock.getCounter() > 1) {
lock.decCounter();
connection.set(getKeyName(), lock);
} else {
// 释放锁
unlock(connection);
}
} else {
// 锁不属于当前线程,抛出异常
throw new IllegalMonitorStateException("Attempt to unlock lock, not locked by current id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
} finally {
connectionManager.release(connection);
}
}
// 实际执行锁的释放,从 redis 删除锁
private void unlock(RedisConnection<Object, Object> connection) {
int counter = 0;
while (counter < 5) {
// 开启事务 > 删除 redis 锁 > 发布解锁消息到 channel > 判断事务执行是否成功
connection.multi();
connection.del(getKeyName());
connection.publish(getChannelName(), unlockMessage);
if (connection.exec().size() == 2) {
return;
}
// 如果事务失败,重试最多5次. 5次尝试后仍未成功抛出异常
counter++;
}
throw new IllegalStateException("Can't unlock lock after 5 attempts. Current id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
@Override
public Condition newCondition() {
// TODO implement
throw new UnsupportedOperationException();
}
// 关闭锁的订阅
@Override
public void close() {
connectionManager.unsubscribe(pubSubEntry, getChannelName());
}
// 强制结果
@Override
public void forceUnlock() {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
// 循环解锁
// 这里看着会导致无限循环,应该在某个条件下退出循环。
while (true) {
LockValue lock = (LockValue) connection.get(getKeyName());
// 如果锁存在,则调用 unlock() 方法释放锁
if (lock != null) {
unlock(connection);
}
}
} finally {
connectionManager.release(connection);
}
}
// 锁是否被持有
@Override
public boolean isLocked() {
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
// 获取当前锁的值
LockValue lock = (LockValue) connection.get(getKeyName());
// 锁存在返回true
return lock != null;
} finally {
connectionManager.release(connection);
}
}
// 检查锁是否被当前线程持有
@Override
public boolean isHeldByCurrentThread() {
LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
// 获取当前锁的值
LockValue lock = (LockValue) connection.get(getKeyName());
// 如果锁存在且等于currentLock 返回true
return lock != null && lock.equals(currentLock);
} finally {
connectionManager.release(connection);
}
}
// 返回当前线程持有锁的次数
@Override
public int getHoldCount() {
// 当前线程的锁状态
LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
RedisConnection<Object, Object> connection = connectionManager.connection();
try {
LockValue lock = (LockValue) connection.get(getKeyName());
// 如果锁存在且等于currentLock,则返回计数器的值
if (lock != null && lock.equals(currentLock)) {
return lock.getCounter();
}
return 0;
} finally {
connectionManager.release(connection);
}
}
}
重点看下这个锁的实现机制:
- 核心是利用 Redis SETNX 命令来实现分布式锁的获取和释放。
- 获取锁的时候,SETNX 命令尝试设置一个KEY(通常是锁的标识符)。如果 KEY 不存在,SETNX 会成功设置 KEY 并返回 true,表示锁获取成功。KEY 已经存在,SETNX 命令会返回 false,表示锁已经被其他客户端持有。
- 释放锁的时候,只有持有锁的客户端可以释放锁。这个是通过检查持有锁时设置的唯一标识来确保锁可以安全释放。释放锁是通过 Redis 的 DEL 命令删除元素。
- 锁是支持重入的。
这个版本的锁问题还是挺多。比如没有设置超时时间的,比如像客户端在持有锁的过程中关闭了,这样锁就会一直不释放。 通过设置 Redis 元素的过期时间,就可以达到自动释放锁的目的。
10. 对外核心类工具类 Redisson
这个类提供了访问各种 Redisson 分布式对象和功能的入口点,我们平时使用 Redisson 通常也就是直接使用这个工具类。
这个工具类没啥特别,主要是以下几个点:
- 获取各种 Redisson 分布式对象时,都是先检查缓存,缓存中有就直接获取缓存对象。如果不存在则创建新实例,然后返回。这里说的缓存,是指的 Redisson 分布式对象,不是说的 Redis 的数据缓存。
- 对象缓存采用了 Key Value 的引用类型,Key 都是使用的是强引用,Value 是软引用。引用主要是用到了 ReferenceMap 这个工具类。这样做的目的是在内存不足时自动回收不再使用的对象,减少内存泄漏的风险。软引用可以在内存不足的时候回收掉,强引用则不允许被回收。
package org.redisson;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.RAtomicLong;
import org.redisson.core.RCountDownLatch;
import org.redisson.core.RList;
import org.redisson.core.RLock;
import org.redisson.core.RMap;
import org.redisson.core.RQueue;
import org.redisson.core.RSet;
import org.redisson.core.RTopic;
import org.redisson.misc.ReferenceMap;
import org.redisson.misc.ReferenceMap.ReferenceType;
import org.redisson.misc.ReferenceMap.RemoveValueListener;
public class Redisson {
RemoveValueListener listener = new RemoveValueListener() {
@Override
public void onRemove(Object value) {
if (value instanceof RedissonObject) {
((RedissonObject)value).close();
}
}
};
// 存储各种 Redisson 对象的线程安全映射
// 用 ReferenceMap 来管理对象的生命周期
private final ConcurrentMap<String, RedissonCountDownLatch> latchesMap = new ReferenceMap<String, RedissonCountDownLatch>(ReferenceType.STRONG, ReferenceType.SOFT, listener);
private final ConcurrentMap<String, RedissonTopic> topicsMap = new ReferenceMap<String, RedissonTopic>(ReferenceType.STRONG, ReferenceType.SOFT, listener);
private final ConcurrentMap<String, RedissonLock> locksMap = new ReferenceMap<String, RedissonLock>(ReferenceType.STRONG, ReferenceType.SOFT, listener);
private final ConcurrentMap<String, RedissonAtomicLong> atomicLongsMap = new ReferenceMap<String, RedissonAtomicLong>(ReferenceType.STRONG, ReferenceType.SOFT);
private final ConcurrentMap<String, RedissonQueue> queuesMap = new ReferenceMap<String, RedissonQueue>(ReferenceType.STRONG, ReferenceType.SOFT);
private final ConcurrentMap<String, RedissonSet> setsMap = new ReferenceMap<String, RedissonSet>(ReferenceType.STRONG, ReferenceType.SOFT);
private final ConcurrentMap<String, RedissonList> listsMap = new ReferenceMap<String, RedissonList>(ReferenceType.STRONG, ReferenceType.SOFT);
private final ConcurrentMap<String, RedissonMap> mapsMap = new ReferenceMap<String, RedissonMap>(ReferenceType.STRONG, ReferenceType.SOFT);
private final ConnectionManager connectionManager; // 连接管理
private final Config config; // Redisson 配置
private final UUID id = UUID.randomUUID(); // 生成唯一标识
Redisson(Config config) {
this.config = config;
Config configCopy = new Config(config);
connectionManager = new ConnectionManager(configCopy);
}
// 默认连接到 127.0.0.1:6379
public static Redisson create() {
Config config = new Config();
config.addAddress("127.0.0.1:6379");
return create(config);
}
public static Redisson create(Config config) {
return new Redisson(config);
}
// 下面的 getxx() 都是用于获取各种 Redisson 分布式对象
// 逻辑都是类似的,先检查缓存,如果不存在则创建新实例,然后返回。
public <V> RList<V> getList(String name) {
RedissonList<V> list = listsMap.get(name);
if (list == null) {
list = new RedissonList<V>(connectionManager, name);
RedissonList<V> oldList = listsMap.putIfAbsent(name, list);
if (oldList != null) {
list = oldList;
}
}
return list;
}
public <K, V> RMap<K, V> getMap(String name) {
RedissonMap<K, V> map = mapsMap.get(name);
if (map == null) {
map = new RedissonMap<K, V>(connectionManager, name);
RedissonMap<K, V> oldMap = mapsMap.putIfAbsent(name, map);
if (oldMap != null) {
map = oldMap;
}
}
return map;
}
public RLock getLock(String name) {
RedissonLock lock = locksMap.get(name);
if (lock == null) {
lock = new RedissonLock(connectionManager, name, id);
RedissonLock oldLock = locksMap.putIfAbsent(name, lock);
if (oldLock != null) {
lock = oldLock;
}
}
lock.subscribe();
return lock;
}
public <V> RSet<V> getSet(String name) {
RedissonSet<V> set = setsMap.get(name);
if (set == null) {
set = new RedissonSet<V>(connectionManager, name);
RedissonSet<V> oldSet = setsMap.putIfAbsent(name, set);
if (oldSet != null) {
set = oldSet;
}
}
return set;
}
public <M> RTopic<M> getTopic(String name) {
RedissonTopic<M> topic = topicsMap.get(name);
if (topic == null) {
topic = new RedissonTopic<M>(connectionManager, name);
RedissonTopic<M> oldTopic = topicsMap.putIfAbsent(name, topic);
if (oldTopic != null) {
topic = oldTopic;
}
}
topic.subscribe();
return topic;
}
public <V> RQueue<V> getQueue(String name) {
RedissonQueue<V> queue = queuesMap.get(name);
if (queue == null) {
queue = new RedissonQueue<V>(connectionManager, name);
RedissonQueue<V> oldQueue = queuesMap.putIfAbsent(name, queue);
if (oldQueue != null) {
queue = oldQueue;
}
}
return queue;
}
public RAtomicLong getAtomicLong(String name) {
RedissonAtomicLong atomicLong = atomicLongsMap.get(name);
if (atomicLong == null) {
atomicLong = new RedissonAtomicLong(connectionManager, name);
RedissonAtomicLong oldAtomicLong = atomicLongsMap.putIfAbsent(name, atomicLong);
if (oldAtomicLong != null) {
atomicLong = oldAtomicLong;
}
}
return atomicLong;
}
public RCountDownLatch getCountDownLatch(String name) {
RedissonCountDownLatch latch = latchesMap.get(name);
if (latch == null) {
latch = new RedissonCountDownLatch(connectionManager, name);
RedissonCountDownLatch oldLatch = latchesMap.putIfAbsent(name, latch);
if (oldLatch != null) {
latch = oldLatch;
}
}
latch.subscribe();
return latch;
}
// 关闭 Redisson 实例 但不会关闭 Redis 服务器
public void shutdown() {
connectionManager.shutdown();
}
public Config getConfig() {
return config;
}
}