MINA通信入门(五)-理解业务逻辑处理器 IoHandler
MINA通信入门(五)-理解业务逻辑处理器 IoHandler
1. IoHandler 是什么
IoHandler 是 Mina 框架中的核心组件之一,它负责处理连接的I/O事件和业务逻辑。
当一个连接建立、接收到数据、发生异常或关闭时, Mina 会触发相应的 I/O 事件,并调用 IoHandler 的对应方法来处理这些事件。通过实现 IoHandler 接口,我们可以自定义连接的行为和业务逻辑。
2. IoHandler 接口定义
public interface IoHandler {
/**
* Invoked from an I/O processor thread when a new connection has been created.
* Because this method is supposed to be called from the same thread that
* handles I/O of multiple sessions, please implement this method to perform
* tasks that consumes minimal amount of time such as socket parameter
* and user-defined session attribute initialization.
*
* @param session The session being created
* @throws Exception If we get an exception while processing the create event
*/
// 当新连接创建时,从 I/O 处理器线程调用。应执行耗时最少的任务,如初始化会话属性等。
void sessionCreated(IoSession session) throws Exception;
/**
* Invoked when a connection has been opened. This method is invoked after
* {@link #sessionCreated(IoSession)}. The biggest difference from
* {@link #sessionCreated(IoSession)} is that it's invoked from other thread
* than an I/O processor thread once thread model is configured properly.
*
* @param session The session being opened
* @throws Exception If we get an exception while processing the open event
*/
// 当连接打开时调用,在 sessionCreated 之后调用。
void sessionOpened(IoSession session) throws Exception;
/**
* Invoked when a connection is closed.
*
* @param session The session being closed
* @throws Exception If we get an exception while processing the close event
*/
// 当连接关闭时调用。
void sessionClosed(IoSession session) throws Exception;
/**
* Invoked with the related {@link IdleStatus} when a connection becomes idle.
* This method is not invoked if the transport type is UDP; it's a known bug,
* and will be fixed in 2.0.
*
* @param session The idling session
* @param status The session's status
* @throws Exception If we get an exception while processing the idle event
*/
// 当连接空闲时,与相关的 IdleStatus 一起调用。对于 UDP 传输类型,不会调用此方法(已知问题,将在 2.0 中修复)。
void sessionIdle(IoSession session, IdleStatus status) throws Exception;
/**
* Invoked when any exception is thrown by user {@link IoHandler}
* implementation or by MINA. If <code>cause</code> is an instance of
* {@link IOException}, MINA will close the connection automatically.
*
* @param session The session for which we have got an exception
* @param cause The exception that has been caught
* @throws Exception If we get an exception while processing the caught exception
*/
// 当用户 IoHandler 实现或 Mina 抛出任何异常时调用。如果 cause 是 IOException 的实例,Mina 将自动关闭连接。
void exceptionCaught(IoSession session, Throwable cause) throws Exception;
/**
* Invoked when a message is received.
*
* @param session The session that is receiving a message
* @param message The received message
* @throws Exception If we get an exception while processing the received message
*/
// 当接收到消息时调用。
void messageReceived(IoSession session, Object message) throws Exception;
/**
* Invoked when a message written by {@link IoSession#write(Object)} is
* sent out.
*
* @param session The session that has sent a full message
* @param message The sent message
* @throws Exception If we get an exception while processing the sent message
*/
// 当通过 IoSession#write(Object) 写入的消息被发送出去时调用。
void messageSent(IoSession session, Object message) throws Exception;
/**
* Handle the closure of an half-duplex TCP channel
*
* @param session The session which input is being closed
* @throws Exception If we get an exception while closing the input
*/
// 处理半双工 TCP 通道的关闭。
void inputClosed(IoSession session) throws Exception;
/**
* Invoked when a filter event is fired. Each filter might sent a different event,
* this is very application specific.
*
* @param session The session for which we have an event to process
* @param event The event to process
* @throws Exception If we get an exception while processing the event
*/
// 当过滤器事件被触发时调用。
void event(IoSession session, FilterEvent event) throws Exception;
}
3. IoHandler 接口方法使用场景
可以稍微注意下整个生命周期的流向,对于理解下面的方法应用场景会有一定帮助。
3.1 sessionCreated
建立新的客户端连接时会触发。常见的业务处理:
- 记录客户端的 IP 地址和端口号等信息用于日志或统计。
- 初始化会话的属性,如认证状态、用户信息等。
- 根据需要分配资源,如数据库连接、缓存等。
- IM 应用可能会发送欢迎消息或初始化指令给客户端。
3.2 sessionOpened
连接已建立并准备好进行通信时触发。常见的业务处理:
- 向客户端发送握手或认证请求。
- 触发连接建立的事件或通知,如更新在线用户列表。
3.3 sessionClosed
客户端连接关闭时触发。常见的业务处理:
释放会话占用的资源,如数据库连接、缓存等。
保存连接的离线在线状态或历史数据。
触发连接关闭的事件或通知,如更新在线用户列表。
记录会话的持续时间、传输的数据量等信息用于统计或分析。
3.4 sessionIdle
连接空闲时,用于检测和处理非活动连接。常见的业务处理:
- 发送心跳包以保持连接的活跃状态。
- 检查客户端的响应,如果长时间无响应,可能需要关闭连接。
- 触发空闲事件,以便应用程序可以执行自定义的空闲处理逻辑。
3.5 exceptionCaught
处理连接过程中发生的异常。常见的业务处理:
记录异常的详细信息,如异常类型、消息和堆栈跟踪,用于错误分析和调试。
根据异常类型执行不同的处理逻辑,如关闭连接、尝试重连等。
向客户端发送错误消息或通知,以便其能够采取相应的措施。
3.6 messageReceived
接收到客户端发送的消息时触发,这也是我们最常用的一个方法。 这里就是我们程序收到了对端的数据,然后做一些业务处理,并给对端应答数据等,比较贴近业务,每种业务场景下的业务处理方式不一样。
3.7 messageSent
消息已成功发送给客户端时触发。常见的业务处理:
记录已发送消息的相关信息,如消息类型、大小、时间戳等,用于日志或统计。
更新会话的统计数据,如已发送的消息数量、字节数等。
触发消息发送完成的事件或回调,以便执行后续的操作。
3.8 inputClosed
半双工 TCP 通道的输入关闭时触发,这个平时用的也不多。常见的业务处理:
处理客户端主动关闭输入流的情况,如结束文件上传等。
根据需要关闭会话或执行其他清理操作。
3.9 event
过滤器事件触发时,用于处理自定义的过滤器事件。常见的业务处理:
- 解析和处理特定的过滤器事件,如执行数据转换、压缩、加密等。
- 根据事件类型执行相应的业务逻辑,如权限检查、数据校验等。
- 触发相关的事件或通知,以便其他组件能够响应过滤器事件。
IoHandler 接口中的方法大部分都是比较常用的,务必重点掌握。
4. IoHandler 怎么去用
4.1 引入 IoHandlerAdapter
实现 IoHandler 接口就需要实现所有的方法,即使我们不需要处理某些事件,也必须提供空的实现。为了简化开发, Mina 提供了一个IoHandlerAdapter 适配器类,它是 IoHandler 的默认实现,提供了所有方法的空实现。
我们可以继承 IoHandlerAdapter 类,只重写我们感兴趣的方法,而不必实现所有方法。下面是一个简单的例子:
public class MyHandler extends IoHandlerAdapter {
@Override
public void sessionCreated(IoSession session) throws Exception {
System.out.println("Session created: " + session.getId());
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
String str = (String) message;
System.out.println("Received message: " + str);
session.write("Echo: " + str);
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
cause.printStackTrace();
}
}
在这个例子中 , 我们重写了sessionCreated
、 messageReceived
和 exceptionCaught
方法,分别在连接创建、接收到消息和发生异常时打印日志。当接收到消息时,我们将其原样发送回客户端。
4.2 举例处理电表上报的数据
下面我们以电表数据处理为例,演示如何自定义IoHandler。
假设我们的电表通过TCP连接发送数据,我们自定义数据格式如下:
每个数据包由 18 个字节组成,前 4 个字节是电表ID, 接下来是电压、电流、功率,最后2个字节是校验码。
我们可以定义一个 DlmsMeterData
类来表示电表数据:
public class DlmsMeterData {
private int meterId;
private float voltage;
private float current;
private float power;
// 构造函数、getter和setter方法省略
}
然后,我们可以自定义一个DlmsMeterHandler
类,继承IoHandlerAdapter
,处理电表数据:
public class DlmsMeterHandler extends IoHandlerAdapter {
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
IoBuffer buffer = (IoBuffer) message;
while (buffer.hasRemaining()) {
if (buffer.remaining() < 18) {
break;
}
MeterData data = new MeterData();
data.setMeterId(buffer.getInt());
data.setVoltage(buffer.getFloat());
data.setCurrent(buffer.getFloat());
data.setPower(buffer.getFloat());
short crc = buffer.getShort();
// 这里跳过检查 crc 校验码,直接开始处理数据
processData(data);
}
}
private void processData(MeterData data) {
db.saveMeterDate(data.meterId(),data.voltage,data.current,data.power);
// TODO: 处理电表数据,如存储到数据库、发送报警等
System.out.println("Received meter data: " + data);
}
}
在 messageReceived
方法中,我们首先将接收到的消息转换为 IoBuffer
对象,然后循环处理其中的数据包。对于每个数据包,我们读取电表ID、电压、电流、功率和校验码,创建一个 DlmsMeterData
对象,然后调用 processData
方法进行后续处理。
注意:由于TCP是基于流的协议,一次接收到的数据可能不完整,也可能包含多个数据包。因此,我们需要根据数据包的长度来判断是否需要继续读取数据。上面只是简单的例子,用来了解生产环境下的一个大概用法。
5. IoHandler 线程安全避坑
IoHandler 是单例的,所有的连接共享同一个 IoHandler 实例。这意味着, IoHandler 必须是线程安全的,因为它可能被多个线程同时调用。
在编写 IoHandler 的实现时,需要注意以下几点:
避免使用实例变量,因为它们可能被多个线程同时访问和修改。如果必须使用实例变量,应该使用线程安全的数据结构,如ConcurrentHashMap、AtomicInteger 等。
避免在IoHandler中执行耗时的操作,如数据库查询、文件 I/O 等,因为这会阻塞其他连接的处理。如果必须执行耗时操作,应该将其放到单独的线程中异步执行。
不要在 IoHandler 中保存连接特定的状态,因为这可能导致多个连接之间的状态干扰。如果需要保存连接状态,应该将其存储在 IoSession 的属性中。
下面是一个线程安全的 IoHandler 实现示例:
public class MyHandler extends IoHandlerAdapter {
private final ConcurrentMap<Long, String> sessionData = new ConcurrentHashMap<>();
@Override
public void sessionCreated(IoSession session) throws Exception {
sessionData.put(session.getId(), "New Session");
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
String str = (String) message;
String data = sessionData.get(session.getId());
System.out.println("Received message from " + data + ": " + str);
session.write("Echo: " + str);
}
@Override
public void sessionClosed(IoSession session) throws Exception {
sessionData.remove(session.getId());
}
}
在这个例子中,我们使用 ConcurrentHashMap 存储每个连接的特定数据,在连接创建时初始化数据,在连接关闭时移除数据。在处理消息时,我们从Map中获取连接数据,打印日志,并发送响应消息。
由于 ConcurrentHashMap 是线程安全的,多个线程可以同时访问和修改连接数据,而不会导致竞态条件或数据不一致。
5. 总结
本章介绍了Mina框架中的IoHandler组件, 包括它的生命周期、如何自定义 IoHandler 处理特定格式的数据。
IoHandler是Mina框架的核心组件之一,负责处理连接的I/O事件和业务逻辑。我们可以通过继承IoHandlerAdapter类,并重写感兴趣的事件方法,来自定义IoHandler的行为。
在处理具体的业务数据时,我们需要根据数据格式,编写相应的解码逻辑。由于TCP是基于流的协议,我们需要注意数据的边界问题,避免粘包和拆包。
掌握 IoHandler 对于我们开发实际的业务逻辑处理非常重要,务必要重点掌握。