七的博客

MINA通信入门(八)-通信协议的编解码

网络通信电力Mina

MINA通信入门(八)-通信协议的编解码

前面的章节里面,我们还有一个比较重要的环节没仔细讲。 那就是一开始的编解码我们使用的是内置的 ProtocolCodecFilter 进行编解码,实际上的通讯中大部分时候是需要自己写编解码的逻辑的。

1. 什么是编解码

编解码就是指将应用程序中的数据对象转换为可以在网络上传输的格式(编码), 以及将接收到的网络数据转换为应用程序可以处理的数据对象(解码)的过程。

  • 编码的流程如下:

编码的流程

可以看出编码的过程其实就是将 Java 对象转换成可以在网络上传输的格式。如二进制、文本、JSON等。

  • 解码的流程如下:

解码的流程

解码的过程就是相反的,将对端传过来的网络数据转成 Java 对象,方便在业务中进行使用。

2. 编解码可以做什么

编解码在网络通信中是个重要的角色,主要有以下几个作用:

  • 数据格式转换。 编解码可以将应用程序中的数据对象转换为网络传输所需的格式,如二进制、文本、JSON等,并在接收端将其转换回应用程序可处理的数据对象。

  • 数据压缩和加密。 编码器可以对数据进行压缩和加密,减少网络传输的数据量和提高数据的安全性。解码器在接收端对数据进行解密和解压缩,恢复原始的数据对象。

  • 数据校验和完整性检查。 编码器可以在数据中添加校验码或完整性检查信息,以确保数据在传输过程中不会被篡改或损坏。解码器可以验证接收到的数据的完整性和正确性。

  • 协议解析。 编解码可以根据特定的通信协议对数据进行解析和封装。

简而言之,就是脏活累活都交给它干。

3. 怎么去实现编解码

在 Mina 中,实现编解码需要以下步骤:

  1. 定义数据对象:根据应用程序的需求,定义传输的数据对象,如电表读数、控制命令等。
  2. 实现编码器。继承 Mina 提供的编码器接口,如 ProtocolEncoderAdapter,并重写 encode方法。在该方法中,将数据对象转换为指定的网络传输格式。
  3. 实现解码器。继承 Mina 提供的解码器接口,如 ProtocolDecoderAdapter,并重写 decode方法。在该方法中,将接收到的网络数据转换为应用程序的数据对象。
  4. 注册编解码器。 在 Mina 中注册自定义的编码器和解码器,以便在数据传输时自动进行编解码。

4. 编解码案例

在实际应用中,电表通常使用各种自定义的通信协议与服务器进行数据交互。这些协议可能基于TCP、UDP、串口等不同的通信方式,数据格式也各不相同。

假设我们自定义一个协议,协议数据格式如下 ( 参考的 DLT645-2007 协议):

协议数据格式

  • 起始码: 固定为0x68

  • 地址码: 6个字节,表示电表的通信地址。

  • 控制码: 1个字节,表示命令类型,如读数据、写数据等。

  • 数据域: N个字节,表示命令的参数或返回的数据。

  • 校验码: 1个字节,用于检验数据的正确性。

  • 结束码: 固定为0x16

例如, 一个读取电表地址为 123456 的电压数据的命令如下:

68 12 34 56 00 00 00 11 04 33 33 34 34 C3 16

其中, 68 为起始码, 123456000000 为地址码, 11 为控制码(读数据), 04333334 为数据标识(电压), C3 为校验码, 16 为结束码。

我们就按照这个协议来写一个不算特别正规的编解码器例子,首先需要实现 ProtocolEncoderProtocolDecoder 接口,分别用于编码和解码数据。然后,将它们封装成一个 ProtocolCodecFilter ,添加到过滤器链中即可,这几步都是之前章节中讲述过的内容。

4.1 编写编码器

定义一个DLT645Encoder类,实现ProtocolEncoder接口,用于将Java对象编码为字节数组:

public class DLT645Encoder implements ProtocolEncoder {
    @Override
    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
        DLT645Message msg = (DLT645Message) message;
        IoBuffer buffer = IoBuffer.allocate(16);
        buffer.put((byte) 0x68);
        buffer.put(msg.getAddress());
        buffer.put(msg.getControl());
        buffer.put(msg.getData());
        buffer.put(getChecksum(buffer));
        buffer.put((byte) 0x16);
        buffer.flip();
        out.write(buffer);
    }

    @Override
    public void dispose(IoSession session) throws Exception {
    }

    private byte getChecksum(IoBuffer buffer) {
        byte[] array = buffer.array();
        int sum = 0;
        for (int i = 1; i < array.length - 2; i++) {
            sum += array[i] & 0xFF;
        }
        return (byte) (sum & 0xFF);
    }
}

encode 方法中,我们首先将Java对象转换为 DLT645Message 类型,然后按照协议格式,依次写入起始码、地址码、控制码、数据域和校验码。校验码通过 getChecksum方法计算得到。最后,我们写入结束码,并将 IoBuffer 翻转,写入输出流。

4.2 编写解码器

定义一个DLT645Decoder类,实现ProtocolDecoder接口,用于将字节数组解码为Java对象:

public class DLT645Decoder implements ProtocolDecoder {
    @Override
    public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
        if (in.remaining() < 12) {
            return;
        }
        in.mark();
        byte start = in.get();
        if (start != 0x68) {
            in.reset();
            return;
        }
        byte[] address = new byte[6];
        in.get(address);
        byte control = in.get();
        int length = in.getUnsigned();
        if (in.remaining() < length + 2) {
            in.reset();
            return;
        }
        byte[] data = new byte[length];
        in.get(data);
        byte checksum = in.get();
        byte end = in.get();
        if (end != 0x16) {
            in.reset();
            return;
        }
        if (getChecksum(address, control, data) != checksum) {
            in.reset();
            return;
        }
        DLT645Message message = new DLT645Message();
        message.setAddress(address);
        message.setControl(control);
        message.setData(data);
        out.write(message);
    }

    @Override
    public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
    }

    @Override
    public void dispose(IoSession session) throws Exception {
    }

    private byte getChecksum(byte[] address, byte control, byte[] data) {
        int sum = 0;
        for (byte b : address) {
            sum += b & 0xFF;
        }
        sum += control & 0xFF;
        for (byte b : data) {
            sum += b & 0xFF;
        }
        return (byte) (sum & 0xFF);
    }
}

decode 方法中:

  • 首先判断缓冲区中是否有足够的数据,如果没有,则直接返回,等待下次解码。
  • 然后按照协议格式,依次读取起始码、地址码、控制码、数据长度、数据域、校验码和结束码。
  • 如果起始码、结束码或校验码不正确,则重置缓冲区,丢弃当前的数据包。
  • 否则创建一个 DLT645Message 对象,设置相应的字段,并保存起来,丢给后续的过滤器或者是 IoHandler 使用。

4.3 注册到过滤器链中

定义好编码器和解码器后,我们可以将它们封装成一个ProtocolCodecFilter:

ProtocolCodecFilter filter = new ProtocolCodecFilter(new DLT645Encoder(), new DLT645Decoder());
acceptor.getFilterChain().addLast("codec", filter);

这样,当有新的数据包到达时, Mina 会自动调用解码器进行解码,将字节数组转换为 Java 对象,传递给 IoHandler 处理。

在发送数据时, Mina 会自动调用编码器进行编码,将 Java 对象转换为字节数组,发送给客户端。

5. 处理粘包和断包问题

这里额外提一下, TCP的通信中,由于TCP是面向流的协议,数据包之间没有明确的边界,因此可能出现粘包和断包的问题。

5.1 什么是粘包

粘包是指多个数据包被合并成一个大的数据包发送,导致接收方无法区分每个数据包的边界。例如,发送方依次发送了两个数据包 ABCDEF ,接收方可能一次性收到 ABCDEF ,无法分辨它们原本是两个独立的数据包。

什么是粘包

5.2 什么是粘包

断包是指一个大的数据包被拆分成多个小的数据包发送,导致接收方需要等待所有的小包都到达才能组装出完整的数据。例如,发送方发送了一个数据包 ABCDEF ,接收方可能先收到 ABC ,然后再收到 DEF ,需要将它们拼接起来才能得到原始的数据包。

什么是粘包

5.3 怎么处理粘包粘包问题

为了解决粘包和断包问题,需要在协议中引入一些机制,明确标识每个数据包的边界。常见的方法有以下几种:

  • 固定长度: 每个数据包的长度都是固定的,接收方可以根据长度来划分数据包的边界。 这种有点浪费空间,一般用的不多。

  • 特殊字符: 在每个数据包的开头或结尾加入特殊的字符,如\r\n$等,接收方可以根据这些字符来划分数据包的边界。

  • 长度字段: 在每个数据包的开头加入一个表示数据包长度的字段,接收方可以根据这个字段来划分数据包的边界。 这种最常用,在生产环境中也是用的比较广泛的一种。

  • 超时机制: 如果一段时间内没有收到新的数据,就认为当前的数据包已经结束,将已收到的数据作为一个完整的数据包处理。

5.4 实战案例

以上面的协议为例, 它使用了特殊字符和长度字段两种方式来解决粘包问题:

  • 每个数据包以 0x68 开头,以 0x16 结尾,接收方可以根据这两个特殊字符来划分数据包的边界。

  • 每个数据包的第 8 个字节表示数据域的长度,接收方可以根据这个长度字段来判断数据包是否接收完整。

在解码器中,我们需要根据协议的这些特点,正确地处理粘包和断包问题。 以下是一个修改后的 DLT645Decoder ,用于处理粘包问题:

public class DLT645Decoder implements ProtocolDecoder {
    @Override
    public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
        while (in.remaining() >= 12) {
            in.mark();
            byte start = in.get();
            if (start != 0x68) {
                in.reset();
                in.get(); // 跳过一个字节
                continue;
            }
            byte[] address = new byte[6];
            in.get(address);
            byte control = in.get();
            int length = in.getUnsigned();
            if (in.remaining() < length + 2) {
                in.reset();
                break;
            }
            byte[] data = new byte[length];
            in.get(data);
            byte checksum = in.get();
            byte end = in.get();
            if (end != 0x16) {
                continue;
            }
            if (getChecksum(address, control, data) != checksum) {
                continue;
            }
            DLT645Message message = new DLT645Message();
            message.setAddress(address);
            message.setControl(control);
            message.setData(data);
            out.write(message);
        }
    }
}

与之前的解码器相比,这个解码器有以下几点不同:

  • 使用 while 循环,不断地从缓冲区中读取数据,直到缓冲区中的数据不足 12 个字节(一个完整的数据包至少需要 12 个字节)。

  • 如果第一个字节不是 0x68 ,则跳过这个字节,继续查找下一个 0x68

  • 如果数据域的长度超过了缓冲区中剩余的字节数,则中断循环,等待更多的数据到达。

  • 如果最后一个字节不是0x16,或者校验码不正确,则跳过这个数据包,继续查找下一个数据包。

通过这样的处理,解码器可以正确地处理粘包问题,从一个大的缓冲区中解析出多个完整的数据包。

6. 总结

编解码是 Mina 通信中非常重要的一部分, 它允许我们将应用程序的数据对象与网络传输的数据格式进行转换。通过实现自定义的编码器和解码器,我们可以根据特定的通信协议和数据格式,实现电表和服务端之间的数据交换。

在实际应用中, 根据具体的协议和性能要求, 我们可以选择合适的编解码方式,如二进制、文本、JSON等。 同时,还可以考虑数据压缩,减少通信的流量耗费等。