七的博客

MINA通信入门(五)-理解业务逻辑处理器 IoHandler

网络通信电力Mina

MINA通信入门(五)-理解业务逻辑处理器 IoHandler

1. IoHandler 是什么

IoHandler 是 Mina 框架中的核心组件之一,它负责处理连接的I/O事件和业务逻辑。

当一个连接建立、接收到数据、发生异常或关闭时, Mina 会触发相应的 I/O 事件,并调用 IoHandler 的对应方法来处理这些事件。通过实现 IoHandler 接口,我们可以自定义连接的行为和业务逻辑。

IoHandler 是什么

2. IoHandler 接口定义

public interface IoHandler {
    /**
     * Invoked from an I/O processor thread when a new connection has been created.
     * Because this method is supposed to be called from the same thread that
     * handles I/O of multiple sessions, please implement this method to perform
     * tasks that consumes minimal amount of time such as socket parameter
     * and user-defined session attribute initialization.
     * 
     * @param session The session being created
     * @throws Exception If we get an exception while processing the create event
     */
    // 当新连接创建时,从 I/O 处理器线程调用。应执行耗时最少的任务,如初始化会话属性等。
    void sessionCreated(IoSession session) throws Exception;

    /**
     * Invoked when a connection has been opened.  This method is invoked after
     * {@link #sessionCreated(IoSession)}.  The biggest difference from
     * {@link #sessionCreated(IoSession)} is that it's invoked from other thread
     * than an I/O processor thread once thread model is configured properly.
     * 
     * @param session The session being opened
     * @throws Exception If we get an exception while processing the open event
     */
    // 当连接打开时调用,在 sessionCreated 之后调用。
    void sessionOpened(IoSession session) throws Exception;

    /**
     * Invoked when a connection is closed.
     * 
     * @param session The session being closed
     * @throws Exception If we get an exception while processing the close event
     */
    // 当连接关闭时调用。
    void sessionClosed(IoSession session) throws Exception;

    /**
     * Invoked with the related {@link IdleStatus} when a connection becomes idle.
     * This method is not invoked if the transport type is UDP; it's a known bug,
     * and will be fixed in 2.0.
     * 
     * @param session The idling session 
     * @param status The session's status
     * @throws Exception If we get an exception while processing the idle event
     */
    // 当连接空闲时,与相关的 IdleStatus 一起调用。对于 UDP 传输类型,不会调用此方法(已知问题,将在 2.0 中修复)。
    void sessionIdle(IoSession session, IdleStatus status) throws Exception;

    /**
     * Invoked when any exception is thrown by user {@link IoHandler}
     * implementation or by MINA.  If <code>cause</code> is an instance of
     * {@link IOException}, MINA will close the connection automatically.
     * 
     * @param session The session for which we have got an exception
     * @param cause The exception that has been caught
     * @throws Exception If we get an exception while processing the caught exception
     */
    // 当用户 IoHandler 实现或 Mina 抛出任何异常时调用。如果 cause 是 IOException 的实例,Mina 将自动关闭连接。
    void exceptionCaught(IoSession session, Throwable cause) throws Exception;

    /**
     * Invoked when a message is received.
     * 
     * @param session The session that is receiving a message
     * @param message The received message
     * @throws Exception If we get an exception while processing the received message
     */
    // 当接收到消息时调用。
    void messageReceived(IoSession session, Object message) throws Exception;

    /**
     * Invoked when a message written by {@link IoSession#write(Object)} is
     * sent out.
     * 
     * @param session The session that has sent a full message
     * @param message The sent message
     * @throws Exception If we get an exception while processing the sent message 
     */
    // 当通过 IoSession#write(Object) 写入的消息被发送出去时调用。
    void messageSent(IoSession session, Object message) throws Exception;

    /**
     * Handle the closure of an half-duplex TCP channel
     * 
     * @param session The session which input is being closed
     * @throws Exception If we get an exception while closing the input
     */
    // 处理半双工 TCP 通道的关闭。
    void inputClosed(IoSession session) throws Exception;
    
    /**
     * Invoked when a filter event is fired. Each filter might sent a different event,
     * this is very application specific.
     * 
     * @param session The session for which we have an event to process
     * @param event The event to process
     * @throws Exception If we get an exception while processing the event 
     */
    // 当过滤器事件被触发时调用。
    void event(IoSession session, FilterEvent event) throws Exception;
}

3. IoHandler 接口方法使用场景

可以稍微注意下整个生命周期的流向,对于理解下面的方法应用场景会有一定帮助。

3.1 sessionCreated

建立新的客户端连接时会触发。常见的业务处理:

  • 记录客户端的 IP 地址和端口号等信息用于日志或统计。
  • 初始化会话的属性,如认证状态、用户信息等。
  • 根据需要分配资源,如数据库连接、缓存等。
  • IM 应用可能会发送欢迎消息或初始化指令给客户端。

3.2 sessionOpened

连接已建立并准备好进行通信时触发。常见的业务处理:

  • 向客户端发送握手或认证请求。
  • 触发连接建立的事件或通知,如更新在线用户列表。

3.3 sessionClosed

客户端连接关闭时触发。常见的业务处理:

  • 释放会话占用的资源,如数据库连接、缓存等。

  • 保存连接的离线在线状态或历史数据。

  • 触发连接关闭的事件或通知,如更新在线用户列表。

  • 记录会话的持续时间、传输的数据量等信息用于统计或分析。

3.4 sessionIdle

连接空闲时,用于检测和处理非活动连接。常见的业务处理:

  • 发送心跳包以保持连接的活跃状态。
  • 检查客户端的响应,如果长时间无响应,可能需要关闭连接。
  • 触发空闲事件,以便应用程序可以执行自定义的空闲处理逻辑。

3.5 exceptionCaught

处理连接过程中发生的异常。常见的业务处理:

  • 记录异常的详细信息,如异常类型、消息和堆栈跟踪,用于错误分析和调试。

  • 根据异常类型执行不同的处理逻辑,如关闭连接、尝试重连等。

  • 向客户端发送错误消息或通知,以便其能够采取相应的措施。

3.6 messageReceived

接收到客户端发送的消息时触发,这也是我们最常用的一个方法。 这里就是我们程序收到了对端的数据,然后做一些业务处理,并给对端应答数据等,比较贴近业务,每种业务场景下的业务处理方式不一样。

3.7 messageSent

消息已成功发送给客户端时触发。常见的业务处理:

  • 记录已发送消息的相关信息,如消息类型、大小、时间戳等,用于日志或统计。

  • 更新会话的统计数据,如已发送的消息数量、字节数等。

  • 触发消息发送完成的事件或回调,以便执行后续的操作。

3.8 inputClosed

半双工 TCP 通道的输入关闭时触发,这个平时用的也不多。常见的业务处理:

  • 处理客户端主动关闭输入流的情况,如结束文件上传等。

  • 根据需要关闭会话或执行其他清理操作。

3.9 event

过滤器事件触发时,用于处理自定义的过滤器事件。常见的业务处理:

  • 解析和处理特定的过滤器事件,如执行数据转换、压缩、加密等。
  • 根据事件类型执行相应的业务逻辑,如权限检查、数据校验等。
  • 触发相关的事件或通知,以便其他组件能够响应过滤器事件。

IoHandler 接口中的方法大部分都是比较常用的,务必重点掌握。

4. IoHandler 怎么去用

4.1 引入 IoHandlerAdapter

实现 IoHandler 接口就需要实现所有的方法,即使我们不需要处理某些事件,也必须提供空的实现。为了简化开发, Mina 提供了一个IoHandlerAdapter 适配器类,它是 IoHandler 的默认实现,提供了所有方法的空实现。

我们可以继承 IoHandlerAdapter 类,只重写我们感兴趣的方法,而不必实现所有方法。下面是一个简单的例子:

public class MyHandler extends IoHandlerAdapter {
    @Override
    public void sessionCreated(IoSession session) throws Exception {
        System.out.println("Session created: " + session.getId());
    }

    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
        String str = (String) message;
        System.out.println("Received message: " + str);
        session.write("Echo: " + str);
    }

    @Override
    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
        cause.printStackTrace();
    }
}

在这个例子中 , 我们重写了sessionCreatedmessageReceivedexceptionCaught 方法,分别在连接创建、接收到消息和发生异常时打印日志。当接收到消息时,我们将其原样发送回客户端。

4.2 举例处理电表上报的数据

下面我们以电表数据处理为例,演示如何自定义IoHandler。

假设我们的电表通过TCP连接发送数据,我们自定义数据格式如下:

image-处理电表上报的数据

每个数据包由 18 个字节组成,前 4 个字节是电表ID, 接下来是电压、电流、功率,最后2个字节是校验码。

我们可以定义一个 DlmsMeterData类来表示电表数据:

public class DlmsMeterData {
    private int meterId;
    private float voltage;
    private float current;
    private float power;
    
    // 构造函数、getter和setter方法省略
}

然后,我们可以自定义一个DlmsMeterHandler类,继承IoHandlerAdapter,处理电表数据:

public class DlmsMeterHandler extends IoHandlerAdapter {
    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
        IoBuffer buffer = (IoBuffer) message;
        while (buffer.hasRemaining()) {
            if (buffer.remaining() < 18) {
                break;
            }
            MeterData data = new MeterData();
            data.setMeterId(buffer.getInt());
            data.setVoltage(buffer.getFloat());
            data.setCurrent(buffer.getFloat());
            data.setPower(buffer.getFloat());
            short crc = buffer.getShort();
            // 这里跳过检查 crc 校验码,直接开始处理数据
            processData(data);
        }
    }
    
    private void processData(MeterData data) {
        db.saveMeterDate(data.meterId(),data.voltage,data.current,data.power);
        
        
        // TODO: 处理电表数据,如存储到数据库、发送报警等
        System.out.println("Received meter data: " + data);
    }
}

messageReceived 方法中,我们首先将接收到的消息转换为 IoBuffer 对象,然后循环处理其中的数据包。对于每个数据包,我们读取电表ID、电压、电流、功率和校验码,创建一个 DlmsMeterData 对象,然后调用 processData 方法进行后续处理。

注意:由于TCP是基于流的协议,一次接收到的数据可能不完整,也可能包含多个数据包。因此,我们需要根据数据包的长度来判断是否需要继续读取数据。上面只是简单的例子,用来了解生产环境下的一个大概用法。

5. IoHandler 线程安全避坑

IoHandler 是单例的,所有的连接共享同一个 IoHandler 实例。这意味着, IoHandler 必须是线程安全的,因为它可能被多个线程同时调用。

在编写 IoHandler 的实现时,需要注意以下几点:

  • 避免使用实例变量,因为它们可能被多个线程同时访问和修改。如果必须使用实例变量,应该使用线程安全的数据结构,如ConcurrentHashMap、AtomicInteger 等。

  • 避免在IoHandler中执行耗时的操作,如数据库查询、文件 I/O 等,因为这会阻塞其他连接的处理。如果必须执行耗时操作,应该将其放到单独的线程中异步执行。

  • 不要在 IoHandler 中保存连接特定的状态,因为这可能导致多个连接之间的状态干扰。如果需要保存连接状态,应该将其存储在 IoSession 的属性中。

下面是一个线程安全的 IoHandler 实现示例:

public class MyHandler extends IoHandlerAdapter {
    private final ConcurrentMap<Long, String> sessionData = new ConcurrentHashMap<>();

    @Override
    public void sessionCreated(IoSession session) throws Exception {
        sessionData.put(session.getId(), "New Session");
    }

    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
        String str = (String) message;
        String data = sessionData.get(session.getId());
        System.out.println("Received message from " + data + ": " + str);
        session.write("Echo: " + str);
    }

    @Override
    public void sessionClosed(IoSession session) throws Exception {
        sessionData.remove(session.getId());
    }
}

在这个例子中,我们使用 ConcurrentHashMap 存储每个连接的特定数据,在连接创建时初始化数据,在连接关闭时移除数据。在处理消息时,我们从Map中获取连接数据,打印日志,并发送响应消息。

由于 ConcurrentHashMap 是线程安全的,多个线程可以同时访问和修改连接数据,而不会导致竞态条件或数据不一致。

5. 总结

本章介绍了Mina框架中的IoHandler组件, 包括它的生命周期、如何自定义 IoHandler 处理特定格式的数据。

  • IoHandler是Mina框架的核心组件之一,负责处理连接的I/O事件和业务逻辑。我们可以通过继承IoHandlerAdapter类,并重写感兴趣的事件方法,来自定义IoHandler的行为。

  • 在处理具体的业务数据时,我们需要根据数据格式,编写相应的解码逻辑。由于TCP是基于流的协议,我们需要注意数据的边界问题,避免粘包和拆包。

掌握 IoHandler 对于我们开发实际的业务逻辑处理非常重要,务必要重点掌握。

6. 参考链接