七的博客

MINA通信入门(四)-理解过滤器链IoFilter

网络通信电力Mina

MINA通信入门(四)-理解过滤器链IoFilter

1. IoFilter 是什么

IoFilter 是 Mina 的核心结构之一,主要是处理 I/O 操作的过滤器。如果你有写过 Java Servlet 的话,你可能会有一丝熟悉的感觉,这个过滤器跟 Java Servelt 的过滤器的作用是类似的,你可以自定义实现自己的 xxFilter 。比方说做拦截、过滤网络传输中的消息等,定位就是在没有进入业务处理器之前的操作。

IoFilter

2. IoFilter 的职责有哪些

在生产环境中常用来将字节流转换成 Java 对象、日志打印、非法请求拦截、数据加密、数据压缩等等,这是非常实用的一个接口。 比如一些典型的过滤器:

  • LoggingFilter logs all events and requests. 记录所有网络事件和数据请求的过滤器。帮助开发调试网络程序,记录每一步的详细信息来分析问题。
  • ProtocolCodecFilter converts an incoming ByteBuffer into message POJO and vice versa. 用在字节流和高级 Java 对象之间进行转换。
  • CompressionFilter compresses all data. 自动压缩和解压缩经过 IoSession 的所有数据。 比方说在电力业务场景下,电表可能是走 GPRS 通信的,为了节省流量费用,会对请求进行压缩节省通讯数据包大小。
  • SSLFilter adds SSL - TLS - StartTLS support. 添加了对 SSL/TLS 安全协议的支持。可以确保数据在客户端和服务器之间的传输安全。

这里列举下 Mina 官网列举的可以直接使用的过滤器列表:

过滤器名称 作用描述
Blacklist 阻止来自特定黑名单地址的连接请求,增强安全性。
Buffered Write 类似于计算机中的缓冲区,先暂存数据,再批量发送,提高效率。
Compression 压缩传输数据,减少网络负载和提升传输速度。
Connection Throttle 控制连接请求的速率,避免服务器被过多请求同时击垮。
Error Generating
Executor 线程池执行器,用于分离 IO线程以及业务线程。
FileRegion Write
KeepAlive 定期发送心跳包,保持连接不被自动断开。
Logging 记录所有网络事件,方便后续的分析和调试。
MDC Injection Log4j 和 Logback 提供的跨多线程条件下日志上下文的功能,主要用于调试使用。
Noop 空的过滤器,什么都不干。 猜测是类似于算法中的链表哑结点作用,减少空判断。
Profiler 对网络事件进行性能分析,帮助优化应用性能的过滤器。
Protocol Codec 数据编解码过滤器。
Proxy 支持代理服务器功能,可以在客户端和服务器之间转发数据。
Reference Counting 跟踪此过滤器的引用次数,用于资源管理。
Session Attribute Initializing 在会话开始时初始化设置,准备好会话所需的属性。
Stream Write 处理流数据写入,优化大量数据的发送过程。
SslFilter 为数据传输加入SSL/TLS安全层,保护数据不被窃听。
Write Request 管理和优化数据的发送请求,确保数据正确高效地输出。

这些 Filter 有些我也没用过,不过官网上面都有对应的例子,点进去一个个看一看大概就知道怎么用以及它的作用了。

3. IoFilter 接口的定义

接口定义如下:

public interface IoFilter {
    /**
     * Invoked by {@link ReferenceCountingFilter} when this filter
     * is added to a {@link IoFilterChain} at the first time, so you can
     * initialize shared resources.  Please note that this method is never
     * called if you don't wrap a filter with {@link ReferenceCountingFilter}.
     * 
     * @throws Exception If an error occurred while processing the event
     */
    // 用于初始化Filter实例自身所需的资源,比如启动线程池等。在Filter实例被使用之前调用一次。
    void init() throws Exception;

    /**
     * Invoked by {@link ReferenceCountingFilter} when this filter
     * is not used by any {@link IoFilterChain} anymore, so you can destroy
     * shared resources.  Please note that this method is never called if
     * you don't wrap a filter with {@link ReferenceCountingFilter}.
     * 
     * @throws Exception If an error occurred while processing the event
     */
    // 用于销毁Filter实例初始化的资源,如关闭线程池等。在Filter实例被丢弃前调用一次。
    void destroy() throws Exception;

    /**
     * Invoked before this filter is added to the specified <tt>parent</tt>.
     * Please note that this method can be invoked more than once if
     * this filter is added to more than one parents.  This method is not
     * invoked before {@link #init()} is invoked.
     *
     * @param parent the parent who called this method
     * @param name the name assigned to this filter
     * @param nextFilter the {@link NextFilter} for this filter.  You can reuse
     *                   this object until this filter is removed from the chain.
     * @throws Exception If an error occurred while processing the event
     */
    // 在将该Filter实例添加到指定的IoFilterChain时被调用。参数parent是添加的目标链,name是过滤器名称,nextFilter用于调用链中下一个Filter。
    void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception;

    /**
     * Invoked after this filter is added to the specified <tt>parent</tt>.
     * Please note that this method can be invoked more than once if
     * this filter is added to more than one parents.  This method is not
     * invoked before {@link #init()} is invoked.
     *
     * @param parent the parent who called this method
     * @param name the name assigned to this filter
     * @param nextFilter the {@link NextFilter} for this filter.  You can reuse
     *                   this object until this filter is removed from the chain.
     * @throws Exception If an error occurred while processing the event
     */
    // 该Filter实例被添加到链之后调用,参数意义同onPreAdd。
    void onPostAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception;

    /**
     * Invoked before this filter is removed from the specified <tt>parent</tt>.
     * Please note that this method can be invoked more than once if
     * this filter is removed from more than one parents.
     * This method is always invoked before {@link #destroy()} is invoked.
     *
     * @param parent the parent who called this method
     * @param name the name assigned to this filter
     * @param nextFilter the {@link NextFilter} for this filter.  You can reuse
     *                   this object until this filter is removed from the chain.
     * @throws Exception If an error occurred while processing the event
     */
    // 将要从指定链中移除该实例时调用,可以做一些清理工作。参数意义同onPreAdd。
    void onPreRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception;

    /**
     * Invoked after this filter is removed from the specified <tt>parent</tt>.
     * Please note that this method can be invoked more than once if
     * this filter is removed from more than one parents.
     * This method is always invoked before {@link #destroy()} is invoked.
     *
     * @param parent the parent who called this method
     * @param name the name assigned to this filter
     * @param nextFilter the {@link NextFilter} for this filter.  You can reuse
     *                   this object until this filter is removed from the chain.
     * @throws Exception If an error occurred while processing the event
     */
    // 从链中移除该实例后调用,参数意义同onPreAdd。
    void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception;

    /**
     * Filters {@link IoHandler#sessionCreated(IoSession)} event.
     * 
     * @param nextFilter
     *            the {@link NextFilter} for this filter. You can reuse this
     *            object until this filter is removed from the chain.
     * @param session The {@link IoSession} which has received this event
     * @throws Exception If an error occurred while processing the event
     */
    // 当创建新的IoSession会话实例时调用该方法。
    void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception;

    /**
     * Filters {@link IoHandler#sessionOpened(IoSession)} event.
     * 
     * @param nextFilter
     *            the {@link NextFilter} for this filter. You can reuse this
     *            object until this filter is removed from the chain.
     * @param session The {@link IoSession} which has received this event
     * @throws Exception If an error occurred while processing the event
     */
    // 会话实例打开连接后调用。
    void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception;

    /**
     * Filters {@link IoHandler#sessionClosed(IoSession)} event.
     * 
     * @param nextFilter
     *            the {@link NextFilter} for this filter. You can reuse this
     *            object until this filter is removed from the chain.
     * @param session The {@link IoSession} which has received this event
     * @throws Exception If an error occurred while processing the event
     */
    // 会话实例关闭连接后调用。
    void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception;

    /**
     * Filters {@link IoHandler#sessionIdle(IoSession,IdleStatus)} event.
     * 
     * @param nextFilter
     *            the {@link NextFilter} for this filter. You can reuse this
     *            object until this filter is removed from the chain.
     * @param session The {@link IoSession} which has received this event
     * @param status The {@link IdleStatus} type
     * @throws Exception If an error occurred while processing the event
     */
    // 会话实例进入空闲状态时调用,status指示空闲类型。
    void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception;

    /**
     * Filters {@link IoHandler#exceptionCaught(IoSession,Throwable)} event.
     * 
     * @param nextFilter
     *            the {@link NextFilter} for this filter. You can reuse this
     *            object until this filter is removed from the chain.
     * @param session The {@link IoSession} which has received this event
     * @param cause The exception that cause this event to be received
     * @throws Exception If an error occurred while processing the event
     */
    // 会话实例发生异常时调用,cause为异常对象。
    void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception;

    /**
     * Filters {@link IoHandler#inputClosed(IoSession)} event.
     * 
     * @param nextFilter
     *            the {@link NextFilter} for this filter. You can reuse this
     *            object until this filter is removed from the chain.
     * @param session The {@link IoSession} which has received this event
     * @throws Exception If an error occurred while processing the event
     */
    // 会话实例的输入流关闭后调用该方法。
    void inputClosed(NextFilter nextFilter, IoSession session) throws Exception;

    /**
     * Filters {@link IoHandler#messageReceived(IoSession,Object)} event.
     * 
     * @param nextFilter
     *            the {@link NextFilter} for this filter. You can reuse this
     *            object until this filter is removed from the chain.
     * @param session The {@link IoSession} which has received this event
     * @param message The received message
     * @throws Exception If an error occurred while processing the event
     */
    // 从会话读取到消息报文后调用,message为消息对象。
    void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception;

    /**
     * Filters {@link IoHandler#messageSent(IoSession,Object)} event.
     * 
     * @param nextFilter
     *            the {@link NextFilter} for this filter. You can reuse this
     *            object until this filter is removed from the chain.
     * @param session The {@link IoSession} which has received this event
     * @param writeRequest The {@link WriteRequest} that contains the sent message
     * @throws Exception If an error occurred while processing the event
     */
    // 消息报文写出会话缓冲区后调用,writeRequest包含了报文数据。
    void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception;

    /**
     * Filters {@link IoSession#closeNow()} or a {@link IoSession#closeOnFlush()} method invocations.
     * 
     * @param nextFilter
     *            the {@link NextFilter} for this filter. You can reuse this
     *            object until this filter is removed from the chain.
     * @param session
     *            The {@link IoSession} which has to process this method
     *            invocation
     * @throws Exception If an error occurred while processing the event
     */
    // 调用IoSession.write方法时回调该方法。
    void filterClose(NextFilter nextFilter, IoSession session) throws Exception;

    /**
     * Filters {@link IoSession#write(Object)} method invocation.
     * 
     * @param nextFilter
     *            the {@link NextFilter} for this filter. You can reuse this
     *            object until this filter is removed from the chain.
     * @param session The {@link IoSession} which has to process this invocation
     * @param writeRequest The {@link WriteRequest} to process
     * @throws Exception If an error occurred while processing the event
     */
    // 调用IoSession关闭方法如closeNow时回调。
    void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception;
    
    /**
     * Propagate an event up to the {@link IoHandler}
     * 
     * @param nextFilter
     *            the {@link NextFilter} for this filter. You can reuse this
     *            object until this filter is removed from the chain.
     * @param session The {@link IoSession} which has to process this invocation
     * @param event The event to propagate
     * @throws Exception If an error occurred while processing the event
     */
    // 处理其他自定义FilterEvent事件时调用。
    void event(NextFilter nextFilter, IoSession session, FilterEvent event) throws Exception;

    // 每个IoFilter在处理完事件后,都需要决定是继续传递事件给下一个Filter,还是终止事件的传递。NextFilter接口就提供了传递事件的方式。
    /**
     * Represents the next {@link IoFilter} in {@link IoFilterChain}.
     */
    interface NextFilter {
        /**
         * Forwards <tt>sessionCreated</tt> event to next filter.
         * 
         * @param session The {@link IoSession} which has to process this invocation
         */
        // 将sessionCreated事件传递给下一个Filter。当创建新的会话实例时,会依次调用每个Filter的sessionCreated方法。
        void sessionCreated(IoSession session);

        /**
         * Forwards <tt>sessionOpened</tt> event to next filter.
         * 
         * @param session The {@link IoSession} which has to process this invocation
         */
        // 将sessionOpened事件传给下一个Filter。当会话连接打开时,会依次调用每个Filter的sessionOpened方法。
        void sessionOpened(IoSession session);

        /**
         * Forwards <tt>sessionClosed</tt> event to next filter.
         * 
         * @param session The {@link IoSession} which has to process this invocation
         */
        // 将sessionClosed事件传给下一个Filter。当会话连接关闭时,会依次调用每个Filter的sessionClosed方法。
        void sessionClosed(IoSession session);

        /**
         * Forwards <tt>sessionIdle</tt> event to next filter.
         * 
         * @param session The {@link IoSession} which has to process this invocation
         * @param status The {@link IdleStatus} type
         */
        // 将sessionIdle事件传给下一个Filter。当会话进入空闲状态时,会依次调用每个Filter的sessionIdle方法,status指示空闲的类型(读空闲、写空闲等)。
        void sessionIdle(IoSession session, IdleStatus status);

        /**
         * Forwards <tt>exceptionCaught</tt> event to next filter.
         * 
         * @param session The {@link IoSession} which has to process this invocation
         * @param cause The exception that cause this event to be received
         */
        // 将exceptionCaught事件传给下一个Filter。当会话实例发生异常时,会依次调用每个Filter的exceptionCaught方法,cause是发生的异常对象。
        void exceptionCaught(IoSession session, Throwable cause);

        /**
         * 
         * @param session The {@link IoSession} which has to process this invocation
         */
        // 将inputClosed事件传给下一个Filter。当会话的输入流关闭时,会依次调用每个Filter的inputClosed方法。
        void inputClosed(IoSession session);

        /**
         * Forwards <tt>messageReceived</tt> event to next filter.
         * 
         * @param session The {@link IoSession} which has to process this invocation
         * @param message The received message
         */
        // 将messageReceived事件传给下一个Filter。当从会话读取到消息报文时,会依次调用每个Filter的messageReceived方法,message为读取到的消息对象。
        void messageReceived(IoSession session, Object message);

        /**
         * Forwards <tt>messageSent</tt> event to next filter.
         * 
         * @param session The {@link IoSession} which has to process this invocation
         * @param writeRequest The {@link WriteRequest} to process
         */
        // 将messageSent事件传给下一个Filter。当消息报文写出会话发送缓冲区后,会依次调用每个Filter的messageSent方法,writeRequest包含了发送的报文数据。
        void messageSent(IoSession session, WriteRequest writeRequest);

        /**
         * Forwards <tt>filterWrite</tt> event to next filter.
         * 
         * @param session The {@link IoSession} which has to process this invocation
         * @param writeRequest The {@link WriteRequest} to process
         */
        // 将filterWrite事件传给下一个Filter。当应用程序调用会话的write方法发送数据时,会依次调用每个Filter的filterWrite方法。
        void filterWrite(IoSession session, WriteRequest writeRequest);

        /**
         * Forwards <tt>filterClose</tt> event to next filter.
         * 
         * @param session The {@link IoSession} which has to process this invocation
         */
        // 将filterClose事件传给下一个Filter。当应用程序调用会话的closeNow或closeOnFlush方法关闭会话时,会依次调用每个Filter的filterClose方法。
        void filterClose(IoSession session);

        /**
         * Forwards an event to next filter.
         * 
         * @param session The {@link IoSession} which has to process this invocation
         * @param event The event to propagate
         */
        // 将自定义事件传给下一个Filter。开发者可以触发自定义的FilterEvent事件,并通过该方法将事件传递给过滤器链中的下一个Filter进行处理。
        void event(IoSession session, FilterEvent event);
    }
}

比较常用的几个方法有:

  • initdestroy 方法分别在过滤器添加到过滤器链和从过滤器链移除时调用,可以用于初始化和清理资源。

  • onPreAddonPostAddonPreRemoveonPostRemove 方法在过滤器添加到过滤器链和从过滤器链移除的不同时刻调用,可以用于执行一些特定的操作。

  • sessionCreatedsessionOpenedsessionClosedsessionIdleexceptionCaught方法分别在会话创建、打开、关闭、空闲和异常时调用。

  • messageReceivedmessageSent 方法分别在消息接收和发送时调用,可以对消息进行拦截和处理。

  • filterWritefilterClose 方法分别在消息写入和会话关闭时调用,可以对写请求和关闭请求进行拦截和处理。

其中, NextFilter 参数表示过滤器链中的下一个过滤器,通过调用 nextFilter.xxx() 方法,可以将请求传递给下一个过滤器或 IoHandler 。

4. IoFilter 链的构成与顺序

在Mina中,多个 IoFilter 可以组成一个过滤器链( IoFilterChain ),对消息进行层层处理。当一个消息被接收或发送时,它会按照一定的顺序依次通过过滤器链中的每个过滤器。这样每一个 Filter 可以专注于干自己的活,不用堆一大堆的代码逻辑。

在 HelloWorld 例子中,我们其实就已经使用过 IoFilter 过滤器链了:

public class MinaServer {

    private static final int PORT = 8080;

    public static void main(String[] args) throws IOException {
        IoAcceptor acceptor = new NioSocketAcceptor();
        acceptor.setHandler(new ServerHandler());

        // 这里获取了当前的过滤器链,然后添加了一个处理协议编解码的 ProtocolCodecFilter
        acceptor.getFilterChain().addLast("codec",
                new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
    }
}    

过滤器链的构成和顺序可以通过 IoServicegetFilterChain() 方法获取和修改:

DefaultIoFilterChainBuilder builder = new DefaultIoFilterChainBuilder();
builder.addLast("logger", new LoggingFilter());
builder.addLast("codec", new ProtocolCodecFilter(new MyCodecFactory()));
builder.addLast("crypto", new MyCryptoFilter());

IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.setFilterChainBuilder(builder);

在上面的例子中,我们创建了一个 DefaultIoFilterChainBuilder 对象,并通过 addLast 方法依次添加了三个过滤器:日志过滤器、编解码过滤器和加密过滤器。然后,我们将这个过滤器链构建器设置给了IoAcceptor。上面的代码也可以改写成跟 HelloWorld 例子中类似的代码风格:

 IoAcceptor acceptor = new NioSocketAcceptor();
 final DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
 filterChain.addLast("logger", new LoggingFilter());
 filterChain.addLast("codec", new ProtocolCodecFilter(new MyCodecFactory()));
 filterChain.addLast("crypto", new MyCryptoFilter());

当有新的连接建立时, IoAcceptor 会根据这个构建器,为每个连接创建一个新的过滤器链实例。过滤器链中的过滤器会按照添加的顺序依次执行。

4.1 接收消息处理链路顺序

对于接收到的消息,过滤器链的执行顺序如下:

接收消息处理链路顺序

消息首先由 IoService 接收,然后依次通过日志过滤器、编解码过滤器、加密过滤器,最后到达 IoHandler 。

4.2 发送消息处理链路顺序

发送消息的时候,过滤器链的执行顺序相反:

发送消息处理链路顺序

消息首先由 IoHandler 发送,然后依次通过加密过滤器、编解码过滤器、日志过滤器,最后由 IoService 发送出去。

4. IoFilter 怎么去用

下面通过一个自定义 IoFilter 的例子,演示如何实现电表跟主站通信数据的加解密(具体的加解密逻辑忽略,不要纠结对不对)。大概流程如下:

IoFilter 流程

假设我们的电表通过 TCP 连接发送和接收加密后的数据,加密算法为简单的异或运算,密钥为一个固定的整数。我们可以定义一个 CryptoFilter 类,继承自 IoFilterAdapter , 对数据进行加解密:

public class CryptoFilter extends IoFilterAdapter {
    // 加解密的密钥,这里应该每一个电表一个密钥的。
    private final int key= 0xAA;

    @Override
    public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
        // 首先判断接收到的消息是否为`IoBuffer`类型,如果是,则对其中的每个字节进行异或解密,然后再传递给下一个过滤器。
        if (message instanceof IoBuffer) {
            IoBuffer buffer = (IoBuffer) message;
            int remaining = buffer.remaining();
            for (int i = 0; i < remaining; i++) {
                byte b = buffer.get(i);
                b = (byte) (b ^ key);
                buffer.put(i, b);
            }
        }
        nextFilter.messageReceived(session, message);
    }

    @Override
    public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
        // 对发送的消息进行类似的处理,对每个字节进行异或加密,然后再传递给下一个过滤器。
        Object message = writeRequest.getMessage();
        if (message instanceof IoBuffer) {
            IoBuffer buffer = (IoBuffer) message;
            int remaining = buffer.remaining();
            for (int i = 0; i < remaining; i++) {
                byte b = buffer.get(i);
                b = (byte) (b ^ key);
                buffer.put(i, b);
            }
        }
        nextFilter.messageSent(session, writeRequest);
    }
}

将其添加到过滤器链中即可:

DefaultIoFilterChainBuilder builder = new DefaultIoFilterChainBuilder();
builder.addLast("logger", new LoggingFilter());
builder.addLast("codec", new ProtocolCodecFilter(new MyCodecFactory()));
builder.addLast("crypto", new CryptoFilter(0xAA));

IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.setFilterChainBuilder(builder);

这样,电表发送的消息到达业务处理器 IoHandler 之前, 就会被自动解密; 同时消息被 IoHandler 发送之后,就会被自动加密。主站跟电表间都这样去加解密,达到通信安全的目的。

当然上面只是一个举例, 通过自定义 IoFilter , 我们可以在不修改业务处理器 IoHandler 代码的情况下,实现各种功能,如数据加密、压缩、签名验证等。

5. 小结

本章介绍了Mina框架中的过滤器链和 IoFilter 组件。

  • 过滤器链由多个 IoFilter 组成,对消息进行层层处理。过滤器链的构成和顺序可以通过 IoFilterChainBuilder 来配置。

  • IoFilter是一种拦截器,可以在消息的接收和发送过程中,对消息进行拦截、检查、修改或丢弃。通过实现IoFilter接口,我们可以自定义各种功能,如日志记录、数据压缩、加密解密、流量控制等。

在实际应用中,我们可以根据需要,选择合适的 IoFilter ,并按照适当的顺序组装成过滤器链。过滤器链可以显著提高代码的可重用性和可维护性,使得应用程序更加灵活和可扩展。

6. 参考链接