七的博客

Netty快速入门系列(七)-案例总结

网络通信

Netty快速入门系列(七)-案例总结

最后一个章节将通过一个小案例串联前面的知识点,作为一个结束的章节。

本章节将会涉及前面的知识点:

  • Netty 服务端以及客户端的初始化。

  • ChannelHandler 的编写。

  • ChannelPipeline 的配置。

  • 编解码器的编写。

  • 粘包拆包的处理。

1. 案例背景

在平时生活中,家里的智能电表会可能通过网络请求上报一些窃电事件、电量等信息给电力系统。案例就是模拟这种场景,电表作为客户端定时推送数据给电力系统。

大概的通信流程如下:

案例流程

电表上报的数据结构如下:

案例通信协议结构

服务端应答给电表端的数据结构如下:

服务端应答数据结构

2. 代码编写

编写服务端跟客户端代码之前,先将公用的对象先建立好。

新建电表数据请求对象 MeterReportData :

public class MeterReportData {
    /**
     * 当前系统激活的协议的版本号
     */
    public static final byte SYSTEM_ACTIVE_VERSION = 1;


    /**
     * 当前消息的版本
     */
    private byte version;

    /**
     * 消息的类型
     */
    private byte messageType;

    /**
     * 智能电表编号
     */
    private String meterId;
    /**
     * 读数
     */
    private double reading;
    /**
     * 上报的时间戳
     */
    private long timestamp;

    // set get 方法
}

数据上报请求应答对象 MeterReportResponse:

public class MeterReportResponse {
    /**
     * 电表ID
     */
    private String meterId;

    /**
     * 确认状态
     */
    private byte ackStatus;

    /**
     * 确认消息
     */
    private String message;

    // set get
}

2.1 服务端实现

服务端的代码编写顺序如下:

服务端代码编写顺序

2.1.1 编写 MeterReportDataDecoder

import com.suny.netty.MeterReportData;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.nio.charset.StandardCharsets;
import java.util.List;

public class MeterReportDataDecoder extends ByteToMessageDecoder {
    private static final byte EXPECTED_VERSION = 1;

    // 版本(1) + 消息长度(4) + 消息类型(1)
    private static final int HEADER_LENGTH = 6;
    private static final int METER_ID_LENGTH = 10;

    // 电表ID(10) + 读数(8) + 时间戳(8)
    private static final int FIXED_CONTENT_LENGTH = METER_ID_LENGTH + 8 + 8;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        // 数据不足,等待更多数据
        if (in.readableBytes() < HEADER_LENGTH + FIXED_CONTENT_LENGTH) {
            return;
        }

        in.markReaderIndex();

        // 读取消息版本
        byte version = in.readByte();
        if (version != EXPECTED_VERSION) {
            in.resetReaderIndex();
            throw new IllegalStateException("Unexpected version: " + version);
        }

        // 读取长度域
        int messageLength = in.readInt();
        if (messageLength != FIXED_CONTENT_LENGTH + 1) {
            in.resetReaderIndex();
            throw new IllegalStateException("Invalid message length: " + messageLength);
        }

        // 读取消息类型
        byte messageType = in.readByte();

        // 读取电表ID
        final byte[] meterIdBytes = new byte[METER_ID_LENGTH];
        in.readBytes(meterIdBytes);
        final String meterId = new String(meterIdBytes, StandardCharsets.UTF_8).trim();

        // 读取数据读数
        double reading = in.readDouble();
        // 读取时间戳
        long timestamp = in.readLong();

        final MeterReportData meterData = new MeterReportData();
        meterData.setVersion(version);
        meterData.setMessageType(messageType);
        meterData.setMeterId(meterId);
        meterData.setReading(reading);
        meterData.setTimestamp(timestamp);

        out.add(meterData);
    }
}

2.1.2 编写 MeterReportDataHandler

package com.suny.netty.server;

import com.suny.netty.MeterReportData;
import com.suny.netty.MeterReportResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;


public class MeterReportDataHandler extends SimpleChannelInboundHandler<MeterReportData> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MeterReportData msg) {
        System.out.println("Received meter data: " + msg);


        // todo 这里可以添加更多的数据处理逻辑,比如存储到数据库等


        // 创建并发送响应给电表
        final MeterReportResponse response = new MeterReportResponse();
        response.setMeterId(msg.getMeterId());
        response.setAckStatus((byte) 10);
        response.setMessage("Data received successfully");

        ctx.writeAndFlush(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

2.1.3 编写 MeterResponseEncoder

package com.suny.netty.server;

import com.suny.netty.MeterReportResponse;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

import java.nio.charset.StandardCharsets;

public class MeterResponseEncoder extends MessageToByteEncoder<MeterReportResponse> {
    private static final byte VERSION = 1;
    private static final int METER_ID_LENGTH = 10;

    @Override
    protected void encode(ChannelHandlerContext ctx, MeterReportResponse meterReportResponse, ByteBuf out) throws Exception {
        final byte[] messageBytes = meterReportResponse.getMessage().getBytes(StandardCharsets.UTF_8);

        // 消息类型(1) + 电表ID(10) + 状态(1) + 消息长度
        final int messageLength = 1 + METER_ID_LENGTH + 1 + messageBytes.length;

        out.writeByte(VERSION);
        out.writeInt(messageLength);

        // 假设2是响应消息的类型
        out.writeByte((byte) 2);
        out.writeBytes(padRight(meterReportResponse.getMeterId(), METER_ID_LENGTH).getBytes(StandardCharsets.UTF_8));
        out.writeByte(meterReportResponse.getAckStatus());
        out.writeBytes(messageBytes);
    }

    private String padRight(String s, int n) {
        return String.format("%-" + n + "s", s);
    }
}

2.1.4 编写 MeterDataServer

package com.suny.netty.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;


public class MeterDataServer {
    private final int port;

    public MeterDataServer(int port) {
        this.port = port;
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new MeterDataServer(port).start();
    }

    public void start() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            final ChannelPipeline p = ch.pipeline();
                            p.addLast(new MeterReportDataDecoder());
                            p.addLast(new MeterResponseEncoder());
                            p.addLast(new MeterReportDataHandler());
                        }
                    });

            ChannelFuture f = b.bind(port).sync();
            System.out.println("MeterDataServer started on port " + port);
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

2.3. 客户端实现

客户端代码编写顺序

2.3.1 编写 MeterReportDataEncoder

package com.suny.netty.client;

import com.suny.netty.MeterReportData;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

import java.nio.charset.StandardCharsets;

public class MeterReportDataEncoder extends MessageToByteEncoder<MeterReportData> {
    private static final byte VERSION = 1;
    private static final int METER_ID_LENGTH = 10;

    // 消息类型(1) + 电表ID(10) + 读数(8) + 时间戳(8)
    private static final int MESSAGE_LENGTH = 1 + METER_ID_LENGTH + 8 + 8;

    @Override
    protected void encode(ChannelHandlerContext ctx, MeterReportData msg, ByteBuf out) throws Exception {
        out.writeByte(VERSION);
        out.writeInt(MESSAGE_LENGTH);
        out.writeByte(msg.getMessageType());

        final byte[] meterIdBytes = padRight(msg.getMeterId(), METER_ID_LENGTH).getBytes(StandardCharsets.UTF_8);
        out.writeBytes(meterIdBytes);

        out.writeDouble(msg.getReading());
        out.writeLong(msg.getTimestamp());
    }

    private String padRight(String s, int n) {
        return String.format("%-" + n + "s", s);
    }
}

2.3.2 编写 MeterReportResponseDecoder

package com.suny.netty.client;

import com.suny.netty.MeterReportResponse;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.nio.charset.StandardCharsets;
import java.util.List;

public class MeterReportResponseDecoder extends ByteToMessageDecoder {
    private static final byte EXPECTED_VERSION = 1;
    // 版本(1) + 消息长度(4) + 消息类型(1)
    private static final int HEADER_LENGTH = 6;
    private static final int METER_ID_LENGTH = 10;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 数据不足,等待更多数据
        if (in.readableBytes() < HEADER_LENGTH) {
            return;
        }

        in.markReaderIndex();

        // 读取版本号
        final byte version = in.readByte();
        if (version != EXPECTED_VERSION) {
            in.resetReaderIndex();
            throw new IllegalStateException("Unexpected version: " + version);
        }

        // 读取整个消息长度
        final int contentLength = in.readInt();
        if (in.readableBytes() < contentLength) {
            in.resetReaderIndex();
            return; // 数据不足,等待更多数据
        }

        // 读取一个字节的消息类型,这里假设上报数据应答的数据类型为2
        final byte messageType = in.readByte();
        if (messageType != 2) {
            in.resetReaderIndex();
            throw new IllegalStateException("Unexpected message type: " + messageType);
        }


        // 读取电表ID
        final byte[] meterIdBytes = new byte[METER_ID_LENGTH];
        in.readBytes(meterIdBytes);
        final String meterId = new String(meterIdBytes, StandardCharsets.UTF_8).trim();


        // 读取 ack 状态
        final byte status = in.readByte();

        // 减去消息类型、电表ID和状态的长度
        final int messageContentLength = contentLength - 1 - METER_ID_LENGTH - 1;

        // 读取应答消息
        final byte[] messageBytes = new byte[messageContentLength];
        in.readBytes(messageBytes);
        final String message = new String(messageBytes, StandardCharsets.UTF_8);

        final MeterReportResponse response = new MeterReportResponse();
        response.setMeterId(meterId);
        response.setAckStatus(status);
        response.setMessage(message);

        out.add(response);
    }
}

2.3.3 编写 MeterClientHandler

package com.suny.netty.client;

import com.suny.netty.MeterReportResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class MeterClientHandler extends SimpleChannelInboundHandler<MeterReportResponse> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MeterReportResponse msg) {
        System.out.println("==================================");
        System.out.println("Received response from server:");
        System.out.println("Meter ID: " + msg.getMeterId());
        System.out.println("Ack Status: " + msg.getAckStatus());
        System.out.println("Message: " + msg.getMessage());
        System.out.println("==================================\n");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

2.3.4 编写 MockMeterClient

package com.suny.netty.client;

import com.suny.netty.MeterReportData;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.util.Random;

public class MockMeterClient {
    private final String host;
    private final int port;

    public MockMeterClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public static void main(String[] args) throws Exception {
        String host = "localhost";
        int port = 8080;
        new MockMeterClient(host, port).run();
    }

    public void run() throws Exception {
        final EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new MeterReportDataEncoder());
                            ch.pipeline().addLast(new MeterReportResponseDecoder());
                            ch.pipeline().addLast(new MeterClientHandler());
                        }
                    });

            final ChannelFuture f = b.connect(host, port).sync();
            final Channel channel = f.channel();

            // 公用一个 tcp 连接. 模拟多个电表上报数据
            for (int i = 0; i < 1; i++) {
                sendMeterData(channel, "meter10621", new Random().nextDouble());
                sendMeterData(channel, "meter10622", new Random().nextDouble());
                sendMeterData(channel, "meter10623", new Random().nextDouble());
            }

            channel.closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    private void sendMeterData(Channel channel, String meterId, double reading) {
        final MeterReportData meterData = new MeterReportData();
        meterData.setVersion(MeterReportData.SYSTEM_ACTIVE_VERSION);
        meterData.setMessageType((byte) 1);
        meterData.setMeterId(meterId);
        meterData.setReading(reading);
        meterData.setTimestamp(System.currentTimeMillis());

        meterData.setMessageType((byte) 1);
        // 发送数据给对端
        channel.writeAndFlush(meterData);

        System.out.println("Sent meter data: " + meterData);
    }
}

3. 程序测试

先运行服务端程序,控制台将会输出:

MeterDataServer started on port 8080

再运行客户端程序,控制台输出:

Sent meter data: MeterReportData(version=1, messageType=1, meterId=meter10621, reading=0.9, timestamp=xxxxxxxxx)
Sent meter data: MeterReportData(version=1, messageType=1, meterId=meter10622, reading=0.5, timestamp=xxxxxxxxx)
Sent meter data: MeterReportData(version=1, messageType=1, meterId=meter10623, reading=0.6, timestamp=xxxxxxxxx)


==================================
Received response from server:
Meter ID: meter10621
Ack Status: 10
Message: Data received successfully
==================================

==================================
Received response from server:
Meter ID: meter10622
Ack Status: 10
Message: Data received successfully
==================================

==================================
Received response from server:
Meter ID: meter10623
Ack Status: 10
Message: Data received successfully
==================================


服务端控制台输出:

Received meter data: MeterReportData(version=1, messageType=1, meterId=meter10621, reading=0.9, timestamp=xxxxxxxxx)
Received meter data: MeterReportData(version=1, messageType=1, meterId=meter10622, reading=0.5, timestamp=xxxxxxxxx)
Received meter data: MeterReportData(version=1, messageType=1, meterId=meter10623, reading=0.6, timestamp=xxxxxxxxx)