七的博客

RedissonV1.0源码分析-工具类篇

源码分析

RedissonV1.0源码分析-工具类篇

这一篇主要是针对 Redisson 的几个工具类源码做下分析,从类型上划分不太好跟其他源码混一起。

但是这几个工具类又是源码中比较重要的一部分。

1. 发布订阅包装工具类 RedisPubSubTopicListenerWrapper

这个类是用于包装 Redis 发布/订阅 监听器的一个工具类。 通过这个类 Redisson 可以在不暴露底层 Redis 客户端库细节的情况下,提供发布/订阅功能。

package org.redisson;

import org.redisson.core.MessageListener;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;

public class RedisPubSubTopicListenerWrapper<K, V> extends RedisPubSubAdapter<K, V> {

    private final MessageListener<V> listener;  //  实际的消息监听器
    private final K name;  // 订阅的 channel 名称

    public RedisPubSubTopicListenerWrapper(MessageListener<V> listener, K name) {
        super();
        this.listener = listener;
        this.name = name;
    }

    @Override
    public void message(K channel, V message) {
        // 当收到消息时,先检查 channel 名是否匹配,如果匹配则调用实际监听器的 onMessage 方法
        // 允许一个监听器可以订阅多个  channel 
        if (name.equals(channel)) {
            listener.onMessage(message);
        }
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((listener == null) ? 0 : listener.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        RedisPubSubTopicListenerWrapper other = (RedisPubSubTopicListenerWrapper) obj;
        if (listener == null) {
            if (other.listener != null)
                return false;
        } else if (!listener.equals(other.listener))
            return false;
        return true;
    }



}

2. 可重复开关的线程门栓实现 ReclosableLatch

可重复开关的线程门栓实现, 基于 Java 并发包中的 java.util.concurrent.locks.AbstractQueuedSynchronizer (AQS) 类实现。

特点是可以创建一个可以反复打开和关闭的闭锁,这个工具类跟 Redis 没有任何关系,也不会往 Redis 读写数据等。ReclosableLatch 目前主要是用在 RedissonCountDownLatch 这个对象里面。

package org.redisson.misc;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class ReclosableLatch extends AbstractQueuedSynchronizer {

   private static final long serialVersionUID = 1744280161777661090l;

   // AQS 中使用的状态变量
   // OPEN_STATE = 0 表示开启状态
   // CLOSED_STATE = 1 表示关闭状态
   private static final int OPEN_STATE = 0, CLOSED_STATE = 1;

   public ReclosableLatch() {
   	  // 默认关闭状态
      setState(CLOSED_STATE);
   }

   public ReclosableLatch(boolean defaultOpen) {
      setState(defaultOpen ? OPEN_STATE : CLOSED_STATE);
   }

   // 重写 AQS 的 tryAcquireShared 方法
   @Override
   public final int tryAcquireShared(int ignored) {
   	  // 如果状态是 OPEN_STATE,返回 1 (允许获取共享锁) 。否则返回 -1 (不允许获取)。
      return getState() == OPEN_STATE ? 1 : -1;
   }

   // 重写 AQS 的 tryReleaseShared 方法
   @Override
   public final boolean tryReleaseShared(int state) {
   	  // 设置新的状态   总是返回 true,表示释放成功
      setState(state);
      return true;
   }

   public final void open() {
   	  // 警告不要直接使用 setState(),因为这不会通知被阻塞的线程。
   	  // 调用 AQS 的 releaseShared 方法,传入 OPEN_STATE
      releaseShared(OPEN_STATE);
   }

   public final void close() {
   	  // 警告不要直接使用 setState(),因为这不会通知被阻塞的线程。
      // 调用 AQS 的 releaseShared 方法,传入 CLOSED_STATE
      releaseShared(CLOSED_STATE);
   }

   // 是否处于开启状态
   public boolean isOpened() {
      return getState() == OPEN_STATE;
   }

   // await 方法使当前线程等待,直到门闩开启。
   public final void await() throws InterruptedException {
   	 // 调用 AQS 的 acquireSharedInterruptibly 方法,可以被中断
   	 // 传入的 1 是一个不会被使用的虚拟值
      acquireSharedInterruptibly(1);
   }

   // 调用 AQS 的 tryAcquireSharedNanos 方法,实现超时等待。
   public final boolean await(long time, TimeUnit unit) throws InterruptedException {
   	   // 传入的 1 是一个不会被使用的虚拟值
      return tryAcquireSharedNanos(1, unit.toNanos(time)); 
   }

   @Override
   public String toString() {
      int s = getState();
      String q = hasQueuedThreads() ? "non" : "";
      return "ReclosableLatch [State = " + s + ", " + q + "empty queue]";
   }
}



这里设计到 Java 的几个知识点。 Latch 是一种同步机制,翻译有好多种。 有的翻译成 闭锁、门闩等等。但是说的都是一个东西,就是用于阻塞线程一直到某个条件满足,堵塞的线程就会被唤醒。

跟 JDK 内置的 CountDownLatch 区别在于:

  • CountDownLatch 是一次性使用的,计数器到 0 之后,就不能重置或者重新使用。
  • ReclosableLatch 可以反复的打开关闭,跟开关电灯一样,开完就可以关。

跟 CyclicBarrier 就没什么可对比性了,CyclicBarrier 本来就是为了控制多个线程进行同步的场景。

可以看下面这个例子:

public class ReclosableLatchExample {

    public static void main(String[] args) throws InterruptedException {
        // 实例化这个对象
        final ReclosableLatch latch = new ReclosableLatch();

        // 创建并启动一个异步的线程,观察线程
        final Thread waitingThread = new Thread(() -> {
            try {
                // 这里等待闭锁打开
                latch.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        waitingThread.start();

        // 模拟一些业务耗时操作
        Thread.sleep(2000);

        // 打开闭锁, 上面这个线程里面的 await() 会被唤醒
        latch.open();

        // 等待上面这个线程完成,线程会被销毁掉
        waitingThread.join();


        // ===========================  下面开启第二次测试    ==============================================

        // 重新关闭闭锁
        latch.close();

        // 新启动另一个线程,等待闭锁再次打开
        final Thread anotherWaitingThread = new Thread(() -> {
            try {
                latch.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        anotherWaitingThread.start();

        // 再次模拟一些操作,然后打开闭锁
        Thread.sleep(2000);
        latch.open();

        // 等待上面线程完成  然后被销毁
        anotherWaitingThread.join();
    }
}

3. 支持引用的并发 HashMap 实现 ReferenceMap

这是一个并发 HashMap 的实现,支持对 key 和 value 使用软引用或弱引用。 类还是比较复杂的,里面好几个作者一起编写的代码。

总结下主要是作用就是通过使用软引用和弱引用,能够在内存压力下自动释放不再需要的对象。 同时保持线程安全和高性能, 这种实现特别适用于需要大量缓存但又不想耗尽内存的场景,Redisson 在某个场景下就会有这种情况。

为了更好的分析这个类的代码,把这个类的代码拆分成几部分分析,不然这个类太长。同时不用太纠结具体的代码实现,知道个大概意思即可。

3.1 引用类型 ReferenceType

这个枚举中定义的是 Java 中几种不同的引用类型,这些引用类型可以控制对象的声明周期跟垃圾回收时的一个行为。

public enum ReferenceType {
  		// 普通的强引用,不会被垃圾回收掉
        STRONG,
        // 软引用,在内存不足时可能被回收,内存充足时不会回收
        SOFT,
        // 弱引用,在下一次垃圾回收时会被回收
        WEAK,
        // 幻引用。    列出来了,但是好像没用上
        PHANTOM,
}

这几个枚举背后涉及的类:

  • 软引用 java.lang.ref.SoftReference
  • 弱引用 java.lang.ref.WeakReference
  • 幻引用 java.lang.ref.PhantomReference

3.2 内部引用接口以及实现

下面这几个接口都是为了在内存不足时,通过软引用或者弱引用的方式去区分,让垃圾收集器回收这些对象。

这样应用就不会因为内存不足而崩溃。 软引用比弱引用强一点,只有在内存即将耗尽时才会被回收,而弱引用在下一次垃圾收集时就可能被回收。

3.2.1 内部引用接口

interface InternalReference {

   void finalizeReferent();

   Object get();
}

3.2.2 软引用的键 SoftKeyReference

import java.lang.ref.SoftReference;

class SoftKeyReference extends SoftReference<Object> implements InternalReference {
        final int hashCode;

        SoftKeyReference(Object key) {
            super(key, FinalizableReferenceQueue.getInstance());
            this.hashCode = System.identityHashCode(key);
        }

        public void finalizeReferent() {
            // 值被回收时从 delegate 中移除自身
            delegate.remove(this);
        }

        @Override
        public int hashCode() {
            return this.hashCode;
        }

        @Override
        public boolean equals(Object o) {
            return referenceEquals(this, o);
        }
    }

3.2.3 弱引用的键 WeakKeyReference

import java.lang.ref.WeakReference;

class WeakKeyReference extends WeakReference<Object> implements InternalReference {
        final int hashCode;

        WeakKeyReference(Object key) {
            super(key, FinalizableReferenceQueue.getInstance());
            this.hashCode = System.identityHashCode(key);
        }

        public void finalizeReferent() {
            // 值被回收时从 delegate 中移除自身
            delegate.remove(this);
        }

        @Override
        public int hashCode() {
            return this.hashCode;
        }

        @Override
        public boolean equals(Object o) {
            return referenceEquals(this, o);
        }
    }

3.2.4 软引用的值 SoftValueReference

import java.lang.ref.SoftReference;

class SoftValueReference extends SoftReference<Object> implements InternalReference {
        final Object keyReference;

        SoftValueReference(Object keyReference, Object value) {
            super(value, FinalizableReferenceQueue.getInstance());
            this.keyReference = keyReference;
        }

        public void finalizeReferent() {
             // 值被回收时从 delegate 中移除对应的键值对
            delegate.remove(keyReference, this);
        }

        @Override
        public boolean equals(Object obj) {
            return referenceEquals(this, obj);
        }
    }

3.2.5 弱引用的值 WeakValueReference

import java.lang.ref.WeakReference;

class WeakValueReference extends WeakReference<Object> implements InternalReference {
        final Object keyReference;

        WeakValueReference(Object keyReference, Object value) {
            super(value, FinalizableReferenceQueue.getInstance());
            this.keyReference = keyReference;
        }

        public void finalizeReferent() {
             // 值被回收时从 delegate 中移除对应的键值对
            delegate.remove(keyReference, this);
        }

        @Override
        public boolean equals(Object obj) {
            return referenceEquals(this, obj);
        }
    }

3.3 构造函数实现

构造函数设置了 key 跟 value 的引用类型,同时初始化了内部的 ConcurrentHashMap。这个 ConcurrentHashMap 就是用来存放数据的。

     // 指定 key value 的引用类型
    // 传入 RemoveValueListener 用于在值被移除时触发回调
    // 创建了自定义的 ConcurrentHashMap,重写了 remove 方法以支持移除监听。
    public ReferenceMap(ReferenceType keyReferenceType, ReferenceType valueReferenceType, final RemoveValueListener<V> removeValueListener) {
        if ((keyReferenceType == null) || (valueReferenceType == null)) {
            throw new IllegalArgumentException("References types can not be null");
        }
        if (keyReferenceType == ReferenceType.PHANTOM || valueReferenceType == ReferenceType.PHANTOM) {
            throw new IllegalArgumentException("Phantom references not supported");
        }
        this.delegate = new ConcurrentHashMap<Object, Object>() {
        	// 重写移除方法
            @Override
            public Object remove(Object key) {
                Object res = super.remove(key);
                if (res != null && removeValueListener != null) {
                    // 调用传入的监听器方法
                    removeValueListener.onRemove((V)res);
                }
                return res;
            }

            @Override
            public boolean remove(Object key, Object value) {
                boolean res = super.remove(key, value);
                if (res && removeValueListener != null) {
                     // 调用传入的监听器方法
                    removeValueListener.onRemove((V)value);
                }
                return res;
            }

        };
        this.keyReferenceType = keyReferenceType;
        this.valueReferenceType = valueReferenceType;
}

3.4 创建引用

创建引用就是使用 SoftKeyReference 、WeakKeyReference 等包装后的引用。 如果是强引用的话就直接返回原对象。

Object referenceKey(K key) {
    switch (keyReferenceType) {
    case STRONG:
        return key;
    case SOFT:
        return new SoftKeyReference(key);
    case WEAK:
        return new WeakKeyReference(key);
    default:
        throw new AssertionError();
    }
}

Object referenceValue(Object keyReference, Object value) {
    switch (valueReferenceType) {
    case STRONG:
        return value;
    case SOFT:
        return new SoftValueReference(keyReference, value);
    case WEAK:
        return new WeakValueReference(keyReference, value);
    default:
        throw new AssertionError();
    }
}

3.4 解析引用

解析引用就是返回原始的 key 跟 value 对象。

K dereferenceKey(Object o) {
    return (K) dereference(keyReferenceType, o);
}

V dereferenceValue(Object o) {
    if (o == null) {
        return null;
    }
    Object value = dereference(valueReferenceType, o);
    if (o instanceof InternalReference) {
        InternalReference reference = (InternalReference) o;
        if (value == null) {
            reference.finalizeReferent(); // old value was garbage collected
        }
    }
    return (V) value;
}

3.5 后台线程清理垃圾回收引用

这个类主要是创建一个异步的线程去处理被垃圾回收器回收的引用。

static class FinalizableReferenceQueue extends ReferenceQueue<Object> {

        private FinalizableReferenceQueue() {
        }

        void cleanUp(Reference reference) {
            try {
                ((InternalReference) reference).finalizeReferent();
            } catch (Throwable t) {
                throw new IllegalStateException("Unable to clean up after reference", t);
            }
        }

        void start() {
            // 启动线程,循环的去调用清理方法
            Thread thread = new Thread("FinalizableReferenceQueue") {
                @Override
                @SuppressWarnings({ "InfiniteLoopStatement" })
                public void run() {
                    while (true) {
                        try {
                            cleanUp(remove());
                        } catch (InterruptedException iex) { /* ignore */
                        }
                    }
                }
            };
            thread.setDaemon(true);
            thread.start();
        }

        static final ReferenceQueue<Object> instance = createAndStart();

        static FinalizableReferenceQueue createAndStart() {
            FinalizableReferenceQueue queue = new FinalizableReferenceQueue();
            queue.start();
            return queue;
        }

        /**
         * Gets instance.
         */
        public static ReferenceQueue<Object> getInstance() {
            return instance;
        }
}