RedissonV1.0源码分析-工具类篇
RedissonV1.0源码分析-工具类篇
这一篇主要是针对 Redisson 的几个工具类源码做下分析,从类型上划分不太好跟其他源码混一起。
但是这几个工具类又是源码中比较重要的一部分。
1. 发布订阅包装工具类 RedisPubSubTopicListenerWrapper
这个类是用于包装 Redis 发布/订阅 监听器的一个工具类。 通过这个类 Redisson 可以在不暴露底层 Redis 客户端库细节的情况下,提供发布/订阅功能。
package org.redisson;
import org.redisson.core.MessageListener;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
public class RedisPubSubTopicListenerWrapper<K, V> extends RedisPubSubAdapter<K, V> {
private final MessageListener<V> listener; // 实际的消息监听器
private final K name; // 订阅的 channel 名称
public RedisPubSubTopicListenerWrapper(MessageListener<V> listener, K name) {
super();
this.listener = listener;
this.name = name;
}
@Override
public void message(K channel, V message) {
// 当收到消息时,先检查 channel 名是否匹配,如果匹配则调用实际监听器的 onMessage 方法
// 允许一个监听器可以订阅多个 channel
if (name.equals(channel)) {
listener.onMessage(message);
}
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((listener == null) ? 0 : listener.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
RedisPubSubTopicListenerWrapper other = (RedisPubSubTopicListenerWrapper) obj;
if (listener == null) {
if (other.listener != null)
return false;
} else if (!listener.equals(other.listener))
return false;
return true;
}
}
2. 可重复开关的线程门栓实现 ReclosableLatch
可重复开关的线程门栓实现, 基于 Java 并发包中的 java.util.concurrent.locks.AbstractQueuedSynchronizer (AQS) 类实现。
特点是可以创建一个可以反复打开和关闭的闭锁,这个工具类跟 Redis 没有任何关系,也不会往 Redis 读写数据等。ReclosableLatch 目前主要是用在 RedissonCountDownLatch 这个对象里面。
package org.redisson.misc;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class ReclosableLatch extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1744280161777661090l;
// AQS 中使用的状态变量
// OPEN_STATE = 0 表示开启状态
// CLOSED_STATE = 1 表示关闭状态
private static final int OPEN_STATE = 0, CLOSED_STATE = 1;
public ReclosableLatch() {
// 默认关闭状态
setState(CLOSED_STATE);
}
public ReclosableLatch(boolean defaultOpen) {
setState(defaultOpen ? OPEN_STATE : CLOSED_STATE);
}
// 重写 AQS 的 tryAcquireShared 方法
@Override
public final int tryAcquireShared(int ignored) {
// 如果状态是 OPEN_STATE,返回 1 (允许获取共享锁) 。否则返回 -1 (不允许获取)。
return getState() == OPEN_STATE ? 1 : -1;
}
// 重写 AQS 的 tryReleaseShared 方法
@Override
public final boolean tryReleaseShared(int state) {
// 设置新的状态 总是返回 true,表示释放成功
setState(state);
return true;
}
public final void open() {
// 警告不要直接使用 setState(),因为这不会通知被阻塞的线程。
// 调用 AQS 的 releaseShared 方法,传入 OPEN_STATE
releaseShared(OPEN_STATE);
}
public final void close() {
// 警告不要直接使用 setState(),因为这不会通知被阻塞的线程。
// 调用 AQS 的 releaseShared 方法,传入 CLOSED_STATE
releaseShared(CLOSED_STATE);
}
// 是否处于开启状态
public boolean isOpened() {
return getState() == OPEN_STATE;
}
// await 方法使当前线程等待,直到门闩开启。
public final void await() throws InterruptedException {
// 调用 AQS 的 acquireSharedInterruptibly 方法,可以被中断
// 传入的 1 是一个不会被使用的虚拟值
acquireSharedInterruptibly(1);
}
// 调用 AQS 的 tryAcquireSharedNanos 方法,实现超时等待。
public final boolean await(long time, TimeUnit unit) throws InterruptedException {
// 传入的 1 是一个不会被使用的虚拟值
return tryAcquireSharedNanos(1, unit.toNanos(time));
}
@Override
public String toString() {
int s = getState();
String q = hasQueuedThreads() ? "non" : "";
return "ReclosableLatch [State = " + s + ", " + q + "empty queue]";
}
}
这里设计到 Java 的几个知识点。 Latch 是一种同步机制,翻译有好多种。 有的翻译成 闭锁、门闩等等。但是说的都是一个东西,就是用于阻塞线程一直到某个条件满足,堵塞的线程就会被唤醒。
跟 JDK 内置的 CountDownLatch 区别在于:
- CountDownLatch 是一次性使用的,计数器到 0 之后,就不能重置或者重新使用。
- ReclosableLatch 可以反复的打开关闭,跟开关电灯一样,开完就可以关。
跟 CyclicBarrier 就没什么可对比性了,CyclicBarrier 本来就是为了控制多个线程进行同步的场景。
可以看下面这个例子:
public class ReclosableLatchExample {
public static void main(String[] args) throws InterruptedException {
// 实例化这个对象
final ReclosableLatch latch = new ReclosableLatch();
// 创建并启动一个异步的线程,观察线程
final Thread waitingThread = new Thread(() -> {
try {
// 这里等待闭锁打开
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
waitingThread.start();
// 模拟一些业务耗时操作
Thread.sleep(2000);
// 打开闭锁, 上面这个线程里面的 await() 会被唤醒
latch.open();
// 等待上面这个线程完成,线程会被销毁掉
waitingThread.join();
// =========================== 下面开启第二次测试 ==============================================
// 重新关闭闭锁
latch.close();
// 新启动另一个线程,等待闭锁再次打开
final Thread anotherWaitingThread = new Thread(() -> {
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
anotherWaitingThread.start();
// 再次模拟一些操作,然后打开闭锁
Thread.sleep(2000);
latch.open();
// 等待上面线程完成 然后被销毁
anotherWaitingThread.join();
}
}
3. 支持引用的并发 HashMap 实现 ReferenceMap
这是一个并发 HashMap 的实现,支持对 key 和 value 使用软引用或弱引用。 类还是比较复杂的,里面好几个作者一起编写的代码。
总结下主要是作用就是通过使用软引用和弱引用,能够在内存压力下自动释放不再需要的对象。 同时保持线程安全和高性能, 这种实现特别适用于需要大量缓存但又不想耗尽内存的场景,Redisson 在某个场景下就会有这种情况。
为了更好的分析这个类的代码,把这个类的代码拆分成几部分分析,不然这个类太长。同时不用太纠结具体的代码实现,知道个大概意思即可。
3.1 引用类型 ReferenceType
这个枚举中定义的是 Java 中几种不同的引用类型,这些引用类型可以控制对象的声明周期跟垃圾回收时的一个行为。
public enum ReferenceType {
// 普通的强引用,不会被垃圾回收掉
STRONG,
// 软引用,在内存不足时可能被回收,内存充足时不会回收
SOFT,
// 弱引用,在下一次垃圾回收时会被回收
WEAK,
// 幻引用。 列出来了,但是好像没用上
PHANTOM,
}
这几个枚举背后涉及的类:
- 软引用 java.lang.ref.SoftReference
- 弱引用 java.lang.ref.WeakReference
- 幻引用 java.lang.ref.PhantomReference
3.2 内部引用接口以及实现
下面这几个接口都是为了在内存不足时,通过软引用或者弱引用的方式去区分,让垃圾收集器回收这些对象。
这样应用就不会因为内存不足而崩溃。 软引用比弱引用强一点,只有在内存即将耗尽时才会被回收,而弱引用在下一次垃圾收集时就可能被回收。
3.2.1 内部引用接口
interface InternalReference {
void finalizeReferent();
Object get();
}
3.2.2 软引用的键 SoftKeyReference
import java.lang.ref.SoftReference;
class SoftKeyReference extends SoftReference<Object> implements InternalReference {
final int hashCode;
SoftKeyReference(Object key) {
super(key, FinalizableReferenceQueue.getInstance());
this.hashCode = System.identityHashCode(key);
}
public void finalizeReferent() {
// 值被回收时从 delegate 中移除自身
delegate.remove(this);
}
@Override
public int hashCode() {
return this.hashCode;
}
@Override
public boolean equals(Object o) {
return referenceEquals(this, o);
}
}
3.2.3 弱引用的键 WeakKeyReference
import java.lang.ref.WeakReference;
class WeakKeyReference extends WeakReference<Object> implements InternalReference {
final int hashCode;
WeakKeyReference(Object key) {
super(key, FinalizableReferenceQueue.getInstance());
this.hashCode = System.identityHashCode(key);
}
public void finalizeReferent() {
// 值被回收时从 delegate 中移除自身
delegate.remove(this);
}
@Override
public int hashCode() {
return this.hashCode;
}
@Override
public boolean equals(Object o) {
return referenceEquals(this, o);
}
}
3.2.4 软引用的值 SoftValueReference
import java.lang.ref.SoftReference;
class SoftValueReference extends SoftReference<Object> implements InternalReference {
final Object keyReference;
SoftValueReference(Object keyReference, Object value) {
super(value, FinalizableReferenceQueue.getInstance());
this.keyReference = keyReference;
}
public void finalizeReferent() {
// 值被回收时从 delegate 中移除对应的键值对
delegate.remove(keyReference, this);
}
@Override
public boolean equals(Object obj) {
return referenceEquals(this, obj);
}
}
3.2.5 弱引用的值 WeakValueReference
import java.lang.ref.WeakReference;
class WeakValueReference extends WeakReference<Object> implements InternalReference {
final Object keyReference;
WeakValueReference(Object keyReference, Object value) {
super(value, FinalizableReferenceQueue.getInstance());
this.keyReference = keyReference;
}
public void finalizeReferent() {
// 值被回收时从 delegate 中移除对应的键值对
delegate.remove(keyReference, this);
}
@Override
public boolean equals(Object obj) {
return referenceEquals(this, obj);
}
}
3.3 构造函数实现
构造函数设置了 key 跟 value 的引用类型,同时初始化了内部的 ConcurrentHashMap。这个 ConcurrentHashMap 就是用来存放数据的。
// 指定 key value 的引用类型
// 传入 RemoveValueListener 用于在值被移除时触发回调
// 创建了自定义的 ConcurrentHashMap,重写了 remove 方法以支持移除监听。
public ReferenceMap(ReferenceType keyReferenceType, ReferenceType valueReferenceType, final RemoveValueListener<V> removeValueListener) {
if ((keyReferenceType == null) || (valueReferenceType == null)) {
throw new IllegalArgumentException("References types can not be null");
}
if (keyReferenceType == ReferenceType.PHANTOM || valueReferenceType == ReferenceType.PHANTOM) {
throw new IllegalArgumentException("Phantom references not supported");
}
this.delegate = new ConcurrentHashMap<Object, Object>() {
// 重写移除方法
@Override
public Object remove(Object key) {
Object res = super.remove(key);
if (res != null && removeValueListener != null) {
// 调用传入的监听器方法
removeValueListener.onRemove((V)res);
}
return res;
}
@Override
public boolean remove(Object key, Object value) {
boolean res = super.remove(key, value);
if (res && removeValueListener != null) {
// 调用传入的监听器方法
removeValueListener.onRemove((V)value);
}
return res;
}
};
this.keyReferenceType = keyReferenceType;
this.valueReferenceType = valueReferenceType;
}
3.4 创建引用
创建引用就是使用 SoftKeyReference 、WeakKeyReference 等包装后的引用。 如果是强引用的话就直接返回原对象。
Object referenceKey(K key) {
switch (keyReferenceType) {
case STRONG:
return key;
case SOFT:
return new SoftKeyReference(key);
case WEAK:
return new WeakKeyReference(key);
default:
throw new AssertionError();
}
}
Object referenceValue(Object keyReference, Object value) {
switch (valueReferenceType) {
case STRONG:
return value;
case SOFT:
return new SoftValueReference(keyReference, value);
case WEAK:
return new WeakValueReference(keyReference, value);
default:
throw new AssertionError();
}
}
3.4 解析引用
解析引用就是返回原始的 key 跟 value 对象。
K dereferenceKey(Object o) {
return (K) dereference(keyReferenceType, o);
}
V dereferenceValue(Object o) {
if (o == null) {
return null;
}
Object value = dereference(valueReferenceType, o);
if (o instanceof InternalReference) {
InternalReference reference = (InternalReference) o;
if (value == null) {
reference.finalizeReferent(); // old value was garbage collected
}
}
return (V) value;
}
3.5 后台线程清理垃圾回收引用
这个类主要是创建一个异步的线程去处理被垃圾回收器回收的引用。
static class FinalizableReferenceQueue extends ReferenceQueue<Object> {
private FinalizableReferenceQueue() {
}
void cleanUp(Reference reference) {
try {
((InternalReference) reference).finalizeReferent();
} catch (Throwable t) {
throw new IllegalStateException("Unable to clean up after reference", t);
}
}
void start() {
// 启动线程,循环的去调用清理方法
Thread thread = new Thread("FinalizableReferenceQueue") {
@Override
@SuppressWarnings({ "InfiniteLoopStatement" })
public void run() {
while (true) {
try {
cleanUp(remove());
} catch (InterruptedException iex) { /* ignore */
}
}
}
};
thread.setDaemon(true);
thread.start();
}
static final ReferenceQueue<Object> instance = createAndStart();
static FinalizableReferenceQueue createAndStart() {
FinalizableReferenceQueue queue = new FinalizableReferenceQueue();
queue.start();
return queue;
}
/**
* Gets instance.
*/
public static ReferenceQueue<Object> getInstance() {
return instance;
}
}