MINA通信入门(四)-理解过滤器链IoFilter
MINA通信入门(四)-理解过滤器链IoFilter
1. IoFilter 是什么
IoFilter 是 Mina 的核心结构之一,主要是处理 I/O 操作的过滤器。如果你有写过 Java Servlet 的话,你可能会有一丝熟悉的感觉,这个过滤器跟 Java Servelt 的过滤器的作用是类似的,你可以自定义实现自己的 xxFilter 。比方说做拦截、过滤网络传输中的消息等,定位就是在没有进入业务处理器之前的操作。
2. IoFilter 的职责有哪些
在生产环境中常用来将字节流转换成 Java 对象、日志打印、非法请求拦截、数据加密、数据压缩等等,这是非常实用的一个接口。 比如一些典型的过滤器:
- LoggingFilter logs all events and requests. 记录所有网络事件和数据请求的过滤器。帮助开发调试网络程序,记录每一步的详细信息来分析问题。
- ProtocolCodecFilter converts an incoming ByteBuffer into message POJO and vice versa. 用在字节流和高级 Java 对象之间进行转换。
- CompressionFilter compresses all data. 自动压缩和解压缩经过
IoSession
的所有数据。 比方说在电力业务场景下,电表可能是走 GPRS 通信的,为了节省流量费用,会对请求进行压缩节省通讯数据包大小。 - SSLFilter adds SSL - TLS - StartTLS support. 添加了对 SSL/TLS 安全协议的支持。可以确保数据在客户端和服务器之间的传输安全。
这里列举下 Mina 官网列举的可以直接使用的过滤器列表:
过滤器名称 | 作用描述 |
---|---|
Blacklist | 阻止来自特定黑名单地址的连接请求,增强安全性。 |
Buffered Write | 类似于计算机中的缓冲区,先暂存数据,再批量发送,提高效率。 |
Compression | 压缩传输数据,减少网络负载和提升传输速度。 |
Connection Throttle | 控制连接请求的速率,避免服务器被过多请求同时击垮。 |
Error Generating | |
Executor | 线程池执行器,用于分离 IO线程以及业务线程。 |
FileRegion Write | |
KeepAlive | 定期发送心跳包,保持连接不被自动断开。 |
Logging | 记录所有网络事件,方便后续的分析和调试。 |
MDC Injection | Log4j 和 Logback 提供的跨多线程条件下日志上下文的功能,主要用于调试使用。 |
Noop | 空的过滤器,什么都不干。 猜测是类似于算法中的链表哑结点作用,减少空判断。 |
Profiler | 对网络事件进行性能分析,帮助优化应用性能的过滤器。 |
Protocol Codec | 数据编解码过滤器。 |
Proxy | 支持代理服务器功能,可以在客户端和服务器之间转发数据。 |
Reference Counting | 跟踪此过滤器的引用次数,用于资源管理。 |
Session Attribute Initializing | 在会话开始时初始化设置,准备好会话所需的属性。 |
Stream Write | 处理流数据写入,优化大量数据的发送过程。 |
SslFilter | 为数据传输加入SSL/TLS安全层,保护数据不被窃听。 |
Write Request | 管理和优化数据的发送请求,确保数据正确高效地输出。 |
这些 Filter 有些我也没用过,不过官网上面都有对应的例子,点进去一个个看一看大概就知道怎么用以及它的作用了。
3. IoFilter 接口的定义
接口定义如下:
public interface IoFilter {
/**
* Invoked by {@link ReferenceCountingFilter} when this filter
* is added to a {@link IoFilterChain} at the first time, so you can
* initialize shared resources. Please note that this method is never
* called if you don't wrap a filter with {@link ReferenceCountingFilter}.
*
* @throws Exception If an error occurred while processing the event
*/
// 用于初始化Filter实例自身所需的资源,比如启动线程池等。在Filter实例被使用之前调用一次。
void init() throws Exception;
/**
* Invoked by {@link ReferenceCountingFilter} when this filter
* is not used by any {@link IoFilterChain} anymore, so you can destroy
* shared resources. Please note that this method is never called if
* you don't wrap a filter with {@link ReferenceCountingFilter}.
*
* @throws Exception If an error occurred while processing the event
*/
// 用于销毁Filter实例初始化的资源,如关闭线程池等。在Filter实例被丢弃前调用一次。
void destroy() throws Exception;
/**
* Invoked before this filter is added to the specified <tt>parent</tt>.
* Please note that this method can be invoked more than once if
* this filter is added to more than one parents. This method is not
* invoked before {@link #init()} is invoked.
*
* @param parent the parent who called this method
* @param name the name assigned to this filter
* @param nextFilter the {@link NextFilter} for this filter. You can reuse
* this object until this filter is removed from the chain.
* @throws Exception If an error occurred while processing the event
*/
// 在将该Filter实例添加到指定的IoFilterChain时被调用。参数parent是添加的目标链,name是过滤器名称,nextFilter用于调用链中下一个Filter。
void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception;
/**
* Invoked after this filter is added to the specified <tt>parent</tt>.
* Please note that this method can be invoked more than once if
* this filter is added to more than one parents. This method is not
* invoked before {@link #init()} is invoked.
*
* @param parent the parent who called this method
* @param name the name assigned to this filter
* @param nextFilter the {@link NextFilter} for this filter. You can reuse
* this object until this filter is removed from the chain.
* @throws Exception If an error occurred while processing the event
*/
// 该Filter实例被添加到链之后调用,参数意义同onPreAdd。
void onPostAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception;
/**
* Invoked before this filter is removed from the specified <tt>parent</tt>.
* Please note that this method can be invoked more than once if
* this filter is removed from more than one parents.
* This method is always invoked before {@link #destroy()} is invoked.
*
* @param parent the parent who called this method
* @param name the name assigned to this filter
* @param nextFilter the {@link NextFilter} for this filter. You can reuse
* this object until this filter is removed from the chain.
* @throws Exception If an error occurred while processing the event
*/
// 将要从指定链中移除该实例时调用,可以做一些清理工作。参数意义同onPreAdd。
void onPreRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception;
/**
* Invoked after this filter is removed from the specified <tt>parent</tt>.
* Please note that this method can be invoked more than once if
* this filter is removed from more than one parents.
* This method is always invoked before {@link #destroy()} is invoked.
*
* @param parent the parent who called this method
* @param name the name assigned to this filter
* @param nextFilter the {@link NextFilter} for this filter. You can reuse
* this object until this filter is removed from the chain.
* @throws Exception If an error occurred while processing the event
*/
// 从链中移除该实例后调用,参数意义同onPreAdd。
void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception;
/**
* Filters {@link IoHandler#sessionCreated(IoSession)} event.
*
* @param nextFilter
* the {@link NextFilter} for this filter. You can reuse this
* object until this filter is removed from the chain.
* @param session The {@link IoSession} which has received this event
* @throws Exception If an error occurred while processing the event
*/
// 当创建新的IoSession会话实例时调用该方法。
void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception;
/**
* Filters {@link IoHandler#sessionOpened(IoSession)} event.
*
* @param nextFilter
* the {@link NextFilter} for this filter. You can reuse this
* object until this filter is removed from the chain.
* @param session The {@link IoSession} which has received this event
* @throws Exception If an error occurred while processing the event
*/
// 会话实例打开连接后调用。
void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception;
/**
* Filters {@link IoHandler#sessionClosed(IoSession)} event.
*
* @param nextFilter
* the {@link NextFilter} for this filter. You can reuse this
* object until this filter is removed from the chain.
* @param session The {@link IoSession} which has received this event
* @throws Exception If an error occurred while processing the event
*/
// 会话实例关闭连接后调用。
void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception;
/**
* Filters {@link IoHandler#sessionIdle(IoSession,IdleStatus)} event.
*
* @param nextFilter
* the {@link NextFilter} for this filter. You can reuse this
* object until this filter is removed from the chain.
* @param session The {@link IoSession} which has received this event
* @param status The {@link IdleStatus} type
* @throws Exception If an error occurred while processing the event
*/
// 会话实例进入空闲状态时调用,status指示空闲类型。
void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception;
/**
* Filters {@link IoHandler#exceptionCaught(IoSession,Throwable)} event.
*
* @param nextFilter
* the {@link NextFilter} for this filter. You can reuse this
* object until this filter is removed from the chain.
* @param session The {@link IoSession} which has received this event
* @param cause The exception that cause this event to be received
* @throws Exception If an error occurred while processing the event
*/
// 会话实例发生异常时调用,cause为异常对象。
void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception;
/**
* Filters {@link IoHandler#inputClosed(IoSession)} event.
*
* @param nextFilter
* the {@link NextFilter} for this filter. You can reuse this
* object until this filter is removed from the chain.
* @param session The {@link IoSession} which has received this event
* @throws Exception If an error occurred while processing the event
*/
// 会话实例的输入流关闭后调用该方法。
void inputClosed(NextFilter nextFilter, IoSession session) throws Exception;
/**
* Filters {@link IoHandler#messageReceived(IoSession,Object)} event.
*
* @param nextFilter
* the {@link NextFilter} for this filter. You can reuse this
* object until this filter is removed from the chain.
* @param session The {@link IoSession} which has received this event
* @param message The received message
* @throws Exception If an error occurred while processing the event
*/
// 从会话读取到消息报文后调用,message为消息对象。
void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception;
/**
* Filters {@link IoHandler#messageSent(IoSession,Object)} event.
*
* @param nextFilter
* the {@link NextFilter} for this filter. You can reuse this
* object until this filter is removed from the chain.
* @param session The {@link IoSession} which has received this event
* @param writeRequest The {@link WriteRequest} that contains the sent message
* @throws Exception If an error occurred while processing the event
*/
// 消息报文写出会话缓冲区后调用,writeRequest包含了报文数据。
void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception;
/**
* Filters {@link IoSession#closeNow()} or a {@link IoSession#closeOnFlush()} method invocations.
*
* @param nextFilter
* the {@link NextFilter} for this filter. You can reuse this
* object until this filter is removed from the chain.
* @param session
* The {@link IoSession} which has to process this method
* invocation
* @throws Exception If an error occurred while processing the event
*/
// 调用IoSession.write方法时回调该方法。
void filterClose(NextFilter nextFilter, IoSession session) throws Exception;
/**
* Filters {@link IoSession#write(Object)} method invocation.
*
* @param nextFilter
* the {@link NextFilter} for this filter. You can reuse this
* object until this filter is removed from the chain.
* @param session The {@link IoSession} which has to process this invocation
* @param writeRequest The {@link WriteRequest} to process
* @throws Exception If an error occurred while processing the event
*/
// 调用IoSession关闭方法如closeNow时回调。
void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception;
/**
* Propagate an event up to the {@link IoHandler}
*
* @param nextFilter
* the {@link NextFilter} for this filter. You can reuse this
* object until this filter is removed from the chain.
* @param session The {@link IoSession} which has to process this invocation
* @param event The event to propagate
* @throws Exception If an error occurred while processing the event
*/
// 处理其他自定义FilterEvent事件时调用。
void event(NextFilter nextFilter, IoSession session, FilterEvent event) throws Exception;
// 每个IoFilter在处理完事件后,都需要决定是继续传递事件给下一个Filter,还是终止事件的传递。NextFilter接口就提供了传递事件的方式。
/**
* Represents the next {@link IoFilter} in {@link IoFilterChain}.
*/
interface NextFilter {
/**
* Forwards <tt>sessionCreated</tt> event to next filter.
*
* @param session The {@link IoSession} which has to process this invocation
*/
// 将sessionCreated事件传递给下一个Filter。当创建新的会话实例时,会依次调用每个Filter的sessionCreated方法。
void sessionCreated(IoSession session);
/**
* Forwards <tt>sessionOpened</tt> event to next filter.
*
* @param session The {@link IoSession} which has to process this invocation
*/
// 将sessionOpened事件传给下一个Filter。当会话连接打开时,会依次调用每个Filter的sessionOpened方法。
void sessionOpened(IoSession session);
/**
* Forwards <tt>sessionClosed</tt> event to next filter.
*
* @param session The {@link IoSession} which has to process this invocation
*/
// 将sessionClosed事件传给下一个Filter。当会话连接关闭时,会依次调用每个Filter的sessionClosed方法。
void sessionClosed(IoSession session);
/**
* Forwards <tt>sessionIdle</tt> event to next filter.
*
* @param session The {@link IoSession} which has to process this invocation
* @param status The {@link IdleStatus} type
*/
// 将sessionIdle事件传给下一个Filter。当会话进入空闲状态时,会依次调用每个Filter的sessionIdle方法,status指示空闲的类型(读空闲、写空闲等)。
void sessionIdle(IoSession session, IdleStatus status);
/**
* Forwards <tt>exceptionCaught</tt> event to next filter.
*
* @param session The {@link IoSession} which has to process this invocation
* @param cause The exception that cause this event to be received
*/
// 将exceptionCaught事件传给下一个Filter。当会话实例发生异常时,会依次调用每个Filter的exceptionCaught方法,cause是发生的异常对象。
void exceptionCaught(IoSession session, Throwable cause);
/**
*
* @param session The {@link IoSession} which has to process this invocation
*/
// 将inputClosed事件传给下一个Filter。当会话的输入流关闭时,会依次调用每个Filter的inputClosed方法。
void inputClosed(IoSession session);
/**
* Forwards <tt>messageReceived</tt> event to next filter.
*
* @param session The {@link IoSession} which has to process this invocation
* @param message The received message
*/
// 将messageReceived事件传给下一个Filter。当从会话读取到消息报文时,会依次调用每个Filter的messageReceived方法,message为读取到的消息对象。
void messageReceived(IoSession session, Object message);
/**
* Forwards <tt>messageSent</tt> event to next filter.
*
* @param session The {@link IoSession} which has to process this invocation
* @param writeRequest The {@link WriteRequest} to process
*/
// 将messageSent事件传给下一个Filter。当消息报文写出会话发送缓冲区后,会依次调用每个Filter的messageSent方法,writeRequest包含了发送的报文数据。
void messageSent(IoSession session, WriteRequest writeRequest);
/**
* Forwards <tt>filterWrite</tt> event to next filter.
*
* @param session The {@link IoSession} which has to process this invocation
* @param writeRequest The {@link WriteRequest} to process
*/
// 将filterWrite事件传给下一个Filter。当应用程序调用会话的write方法发送数据时,会依次调用每个Filter的filterWrite方法。
void filterWrite(IoSession session, WriteRequest writeRequest);
/**
* Forwards <tt>filterClose</tt> event to next filter.
*
* @param session The {@link IoSession} which has to process this invocation
*/
// 将filterClose事件传给下一个Filter。当应用程序调用会话的closeNow或closeOnFlush方法关闭会话时,会依次调用每个Filter的filterClose方法。
void filterClose(IoSession session);
/**
* Forwards an event to next filter.
*
* @param session The {@link IoSession} which has to process this invocation
* @param event The event to propagate
*/
// 将自定义事件传给下一个Filter。开发者可以触发自定义的FilterEvent事件,并通过该方法将事件传递给过滤器链中的下一个Filter进行处理。
void event(IoSession session, FilterEvent event);
}
}
比较常用的几个方法有:
init
和destroy
方法分别在过滤器添加到过滤器链和从过滤器链移除时调用,可以用于初始化和清理资源。onPreAdd
、onPostAdd
、onPreRemove
和onPostRemove
方法在过滤器添加到过滤器链和从过滤器链移除的不同时刻调用,可以用于执行一些特定的操作。sessionCreated
、sessionOpened
、sessionClosed
、sessionIdle
和exceptionCaught
方法分别在会话创建、打开、关闭、空闲和异常时调用。messageReceived
和messageSent
方法分别在消息接收和发送时调用,可以对消息进行拦截和处理。filterWrite
和filterClose
方法分别在消息写入和会话关闭时调用,可以对写请求和关闭请求进行拦截和处理。
其中, NextFilter
参数表示过滤器链中的下一个过滤器,通过调用 nextFilter.xxx()
方法,可以将请求传递给下一个过滤器或 IoHandler 。
4. IoFilter 链的构成与顺序
在Mina中,多个 IoFilter 可以组成一个过滤器链( IoFilterChain ),对消息进行层层处理。当一个消息被接收或发送时,它会按照一定的顺序依次通过过滤器链中的每个过滤器。这样每一个 Filter 可以专注于干自己的活,不用堆一大堆的代码逻辑。
在 HelloWorld 例子中,我们其实就已经使用过 IoFilter 过滤器链了:
public class MinaServer {
private static final int PORT = 8080;
public static void main(String[] args) throws IOException {
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.setHandler(new ServerHandler());
// 这里获取了当前的过滤器链,然后添加了一个处理协议编解码的 ProtocolCodecFilter
acceptor.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
}
}
过滤器链的构成和顺序可以通过 IoService
的 getFilterChain()
方法获取和修改:
DefaultIoFilterChainBuilder builder = new DefaultIoFilterChainBuilder();
builder.addLast("logger", new LoggingFilter());
builder.addLast("codec", new ProtocolCodecFilter(new MyCodecFactory()));
builder.addLast("crypto", new MyCryptoFilter());
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.setFilterChainBuilder(builder);
在上面的例子中,我们创建了一个 DefaultIoFilterChainBuilder
对象,并通过 addLast
方法依次添加了三个过滤器:日志过滤器、编解码过滤器和加密过滤器。然后,我们将这个过滤器链构建器设置给了IoAcceptor。上面的代码也可以改写成跟 HelloWorld 例子中类似的代码风格:
IoAcceptor acceptor = new NioSocketAcceptor();
final DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
filterChain.addLast("logger", new LoggingFilter());
filterChain.addLast("codec", new ProtocolCodecFilter(new MyCodecFactory()));
filterChain.addLast("crypto", new MyCryptoFilter());
当有新的连接建立时, IoAcceptor 会根据这个构建器,为每个连接创建一个新的过滤器链实例。过滤器链中的过滤器会按照添加的顺序依次执行。
4.1 接收消息处理链路顺序
对于接收到的消息,过滤器链的执行顺序如下:
消息首先由 IoService 接收,然后依次通过日志过滤器、编解码过滤器、加密过滤器,最后到达 IoHandler 。
4.2 发送消息处理链路顺序
发送消息的时候,过滤器链的执行顺序相反:
消息首先由 IoHandler 发送,然后依次通过加密过滤器、编解码过滤器、日志过滤器,最后由 IoService 发送出去。
4. IoFilter 怎么去用
下面通过一个自定义 IoFilter 的例子,演示如何实现电表跟主站通信数据的加解密(具体的加解密逻辑忽略,不要纠结对不对)。大概流程如下:
假设我们的电表通过 TCP 连接发送和接收加密后的数据,加密算法为简单的异或运算,密钥为一个固定的整数。我们可以定义一个 CryptoFilter
类,继承自 IoFilterAdapter
, 对数据进行加解密:
public class CryptoFilter extends IoFilterAdapter {
// 加解密的密钥,这里应该每一个电表一个密钥的。
private final int key= 0xAA;
@Override
public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
// 首先判断接收到的消息是否为`IoBuffer`类型,如果是,则对其中的每个字节进行异或解密,然后再传递给下一个过滤器。
if (message instanceof IoBuffer) {
IoBuffer buffer = (IoBuffer) message;
int remaining = buffer.remaining();
for (int i = 0; i < remaining; i++) {
byte b = buffer.get(i);
b = (byte) (b ^ key);
buffer.put(i, b);
}
}
nextFilter.messageReceived(session, message);
}
@Override
public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
// 对发送的消息进行类似的处理,对每个字节进行异或加密,然后再传递给下一个过滤器。
Object message = writeRequest.getMessage();
if (message instanceof IoBuffer) {
IoBuffer buffer = (IoBuffer) message;
int remaining = buffer.remaining();
for (int i = 0; i < remaining; i++) {
byte b = buffer.get(i);
b = (byte) (b ^ key);
buffer.put(i, b);
}
}
nextFilter.messageSent(session, writeRequest);
}
}
将其添加到过滤器链中即可:
DefaultIoFilterChainBuilder builder = new DefaultIoFilterChainBuilder();
builder.addLast("logger", new LoggingFilter());
builder.addLast("codec", new ProtocolCodecFilter(new MyCodecFactory()));
builder.addLast("crypto", new CryptoFilter(0xAA));
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.setFilterChainBuilder(builder);
这样,电表发送的消息到达业务处理器 IoHandler 之前, 就会被自动解密; 同时消息被 IoHandler 发送之后,就会被自动加密。主站跟电表间都这样去加解密,达到通信安全的目的。
当然上面只是一个举例, 通过自定义 IoFilter , 我们可以在不修改业务处理器 IoHandler 代码的情况下,实现各种功能,如数据加密、压缩、签名验证等。
5. 小结
本章介绍了Mina框架中的过滤器链和 IoFilter 组件。
过滤器链由多个 IoFilter 组成,对消息进行层层处理。过滤器链的构成和顺序可以通过 IoFilterChainBuilder 来配置。
IoFilter是一种拦截器,可以在消息的接收和发送过程中,对消息进行拦截、检查、修改或丢弃。通过实现IoFilter接口,我们可以自定义各种功能,如日志记录、数据压缩、加密解密、流量控制等。
在实际应用中,我们可以根据需要,选择合适的 IoFilter ,并按照适当的顺序组装成过滤器链。过滤器链可以显著提高代码的可重用性和可维护性,使得应用程序更加灵活和可扩展。
6. 参考链接
- Mina Filters Introduction https://mina.apache.org/mina-project/userguide/ch5-filters/ch5-filters.html#introduction