Netty快速入门系列(七)-案例总结
Netty快速入门系列(七)-案例总结
最后一个章节将通过一个小案例串联前面的知识点,作为一个结束的章节。
本章节将会涉及前面的知识点:
Netty 服务端以及客户端的初始化。
ChannelHandler 的编写。
ChannelPipeline 的配置。
编解码器的编写。
粘包拆包的处理。
1. 案例背景
在平时生活中,家里的智能电表会可能通过网络请求上报一些窃电事件、电量等信息给电力系统。案例就是模拟这种场景,电表作为客户端定时推送数据给电力系统。
大概的通信流程如下:
电表上报的数据结构如下:
服务端应答给电表端的数据结构如下:
2. 代码编写
编写服务端跟客户端代码之前,先将公用的对象先建立好。
新建电表数据请求对象 MeterReportData :
public class MeterReportData {
/**
* 当前系统激活的协议的版本号
*/
public static final byte SYSTEM_ACTIVE_VERSION = 1;
/**
* 当前消息的版本
*/
private byte version;
/**
* 消息的类型
*/
private byte messageType;
/**
* 智能电表编号
*/
private String meterId;
/**
* 读数
*/
private double reading;
/**
* 上报的时间戳
*/
private long timestamp;
// set get 方法
}
数据上报请求应答对象 MeterReportResponse:
public class MeterReportResponse {
/**
* 电表ID
*/
private String meterId;
/**
* 确认状态
*/
private byte ackStatus;
/**
* 确认消息
*/
private String message;
// set get
}
2.1 服务端实现
服务端的代码编写顺序如下:
2.1.1 编写 MeterReportDataDecoder
import com.suny.netty.MeterReportData;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class MeterReportDataDecoder extends ByteToMessageDecoder {
private static final byte EXPECTED_VERSION = 1;
// 版本(1) + 消息长度(4) + 消息类型(1)
private static final int HEADER_LENGTH = 6;
private static final int METER_ID_LENGTH = 10;
// 电表ID(10) + 读数(8) + 时间戳(8)
private static final int FIXED_CONTENT_LENGTH = METER_ID_LENGTH + 8 + 8;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 数据不足,等待更多数据
if (in.readableBytes() < HEADER_LENGTH + FIXED_CONTENT_LENGTH) {
return;
}
in.markReaderIndex();
// 读取消息版本
byte version = in.readByte();
if (version != EXPECTED_VERSION) {
in.resetReaderIndex();
throw new IllegalStateException("Unexpected version: " + version);
}
// 读取长度域
int messageLength = in.readInt();
if (messageLength != FIXED_CONTENT_LENGTH + 1) {
in.resetReaderIndex();
throw new IllegalStateException("Invalid message length: " + messageLength);
}
// 读取消息类型
byte messageType = in.readByte();
// 读取电表ID
final byte[] meterIdBytes = new byte[METER_ID_LENGTH];
in.readBytes(meterIdBytes);
final String meterId = new String(meterIdBytes, StandardCharsets.UTF_8).trim();
// 读取数据读数
double reading = in.readDouble();
// 读取时间戳
long timestamp = in.readLong();
final MeterReportData meterData = new MeterReportData();
meterData.setVersion(version);
meterData.setMessageType(messageType);
meterData.setMeterId(meterId);
meterData.setReading(reading);
meterData.setTimestamp(timestamp);
out.add(meterData);
}
}
2.1.2 编写 MeterReportDataHandler
package com.suny.netty.server;
import com.suny.netty.MeterReportData;
import com.suny.netty.MeterReportResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class MeterReportDataHandler extends SimpleChannelInboundHandler<MeterReportData> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MeterReportData msg) {
System.out.println("Received meter data: " + msg);
// todo 这里可以添加更多的数据处理逻辑,比如存储到数据库等
// 创建并发送响应给电表
final MeterReportResponse response = new MeterReportResponse();
response.setMeterId(msg.getMeterId());
response.setAckStatus((byte) 10);
response.setMessage("Data received successfully");
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
2.1.3 编写 MeterResponseEncoder
package com.suny.netty.server;
import com.suny.netty.MeterReportResponse;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.nio.charset.StandardCharsets;
public class MeterResponseEncoder extends MessageToByteEncoder<MeterReportResponse> {
private static final byte VERSION = 1;
private static final int METER_ID_LENGTH = 10;
@Override
protected void encode(ChannelHandlerContext ctx, MeterReportResponse meterReportResponse, ByteBuf out) throws Exception {
final byte[] messageBytes = meterReportResponse.getMessage().getBytes(StandardCharsets.UTF_8);
// 消息类型(1) + 电表ID(10) + 状态(1) + 消息长度
final int messageLength = 1 + METER_ID_LENGTH + 1 + messageBytes.length;
out.writeByte(VERSION);
out.writeInt(messageLength);
// 假设2是响应消息的类型
out.writeByte((byte) 2);
out.writeBytes(padRight(meterReportResponse.getMeterId(), METER_ID_LENGTH).getBytes(StandardCharsets.UTF_8));
out.writeByte(meterReportResponse.getAckStatus());
out.writeBytes(messageBytes);
}
private String padRight(String s, int n) {
return String.format("%-" + n + "s", s);
}
}
2.1.4 编写 MeterDataServer
package com.suny.netty.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class MeterDataServer {
private final int port;
public MeterDataServer(int port) {
this.port = port;
}
public static void main(String[] args) throws Exception {
int port = 8080;
new MeterDataServer(port).start();
}
public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
final ChannelPipeline p = ch.pipeline();
p.addLast(new MeterReportDataDecoder());
p.addLast(new MeterResponseEncoder());
p.addLast(new MeterReportDataHandler());
}
});
ChannelFuture f = b.bind(port).sync();
System.out.println("MeterDataServer started on port " + port);
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
2.3. 客户端实现
2.3.1 编写 MeterReportDataEncoder
package com.suny.netty.client;
import com.suny.netty.MeterReportData;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.nio.charset.StandardCharsets;
public class MeterReportDataEncoder extends MessageToByteEncoder<MeterReportData> {
private static final byte VERSION = 1;
private static final int METER_ID_LENGTH = 10;
// 消息类型(1) + 电表ID(10) + 读数(8) + 时间戳(8)
private static final int MESSAGE_LENGTH = 1 + METER_ID_LENGTH + 8 + 8;
@Override
protected void encode(ChannelHandlerContext ctx, MeterReportData msg, ByteBuf out) throws Exception {
out.writeByte(VERSION);
out.writeInt(MESSAGE_LENGTH);
out.writeByte(msg.getMessageType());
final byte[] meterIdBytes = padRight(msg.getMeterId(), METER_ID_LENGTH).getBytes(StandardCharsets.UTF_8);
out.writeBytes(meterIdBytes);
out.writeDouble(msg.getReading());
out.writeLong(msg.getTimestamp());
}
private String padRight(String s, int n) {
return String.format("%-" + n + "s", s);
}
}
2.3.2 编写 MeterReportResponseDecoder
package com.suny.netty.client;
import com.suny.netty.MeterReportResponse;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class MeterReportResponseDecoder extends ByteToMessageDecoder {
private static final byte EXPECTED_VERSION = 1;
// 版本(1) + 消息长度(4) + 消息类型(1)
private static final int HEADER_LENGTH = 6;
private static final int METER_ID_LENGTH = 10;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 数据不足,等待更多数据
if (in.readableBytes() < HEADER_LENGTH) {
return;
}
in.markReaderIndex();
// 读取版本号
final byte version = in.readByte();
if (version != EXPECTED_VERSION) {
in.resetReaderIndex();
throw new IllegalStateException("Unexpected version: " + version);
}
// 读取整个消息长度
final int contentLength = in.readInt();
if (in.readableBytes() < contentLength) {
in.resetReaderIndex();
return; // 数据不足,等待更多数据
}
// 读取一个字节的消息类型,这里假设上报数据应答的数据类型为2
final byte messageType = in.readByte();
if (messageType != 2) {
in.resetReaderIndex();
throw new IllegalStateException("Unexpected message type: " + messageType);
}
// 读取电表ID
final byte[] meterIdBytes = new byte[METER_ID_LENGTH];
in.readBytes(meterIdBytes);
final String meterId = new String(meterIdBytes, StandardCharsets.UTF_8).trim();
// 读取 ack 状态
final byte status = in.readByte();
// 减去消息类型、电表ID和状态的长度
final int messageContentLength = contentLength - 1 - METER_ID_LENGTH - 1;
// 读取应答消息
final byte[] messageBytes = new byte[messageContentLength];
in.readBytes(messageBytes);
final String message = new String(messageBytes, StandardCharsets.UTF_8);
final MeterReportResponse response = new MeterReportResponse();
response.setMeterId(meterId);
response.setAckStatus(status);
response.setMessage(message);
out.add(response);
}
}
2.3.3 编写 MeterClientHandler
package com.suny.netty.client;
import com.suny.netty.MeterReportResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class MeterClientHandler extends SimpleChannelInboundHandler<MeterReportResponse> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MeterReportResponse msg) {
System.out.println("==================================");
System.out.println("Received response from server:");
System.out.println("Meter ID: " + msg.getMeterId());
System.out.println("Ack Status: " + msg.getAckStatus());
System.out.println("Message: " + msg.getMessage());
System.out.println("==================================\n");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
2.3.4 编写 MockMeterClient
package com.suny.netty.client;
import com.suny.netty.MeterReportData;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.Random;
public class MockMeterClient {
private final String host;
private final int port;
public MockMeterClient(String host, int port) {
this.host = host;
this.port = port;
}
public static void main(String[] args) throws Exception {
String host = "localhost";
int port = 8080;
new MockMeterClient(host, port).run();
}
public void run() throws Exception {
final EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MeterReportDataEncoder());
ch.pipeline().addLast(new MeterReportResponseDecoder());
ch.pipeline().addLast(new MeterClientHandler());
}
});
final ChannelFuture f = b.connect(host, port).sync();
final Channel channel = f.channel();
// 公用一个 tcp 连接. 模拟多个电表上报数据
for (int i = 0; i < 1; i++) {
sendMeterData(channel, "meter10621", new Random().nextDouble());
sendMeterData(channel, "meter10622", new Random().nextDouble());
sendMeterData(channel, "meter10623", new Random().nextDouble());
}
channel.closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
private void sendMeterData(Channel channel, String meterId, double reading) {
final MeterReportData meterData = new MeterReportData();
meterData.setVersion(MeterReportData.SYSTEM_ACTIVE_VERSION);
meterData.setMessageType((byte) 1);
meterData.setMeterId(meterId);
meterData.setReading(reading);
meterData.setTimestamp(System.currentTimeMillis());
meterData.setMessageType((byte) 1);
// 发送数据给对端
channel.writeAndFlush(meterData);
System.out.println("Sent meter data: " + meterData);
}
}
3. 程序测试
先运行服务端程序,控制台将会输出:
MeterDataServer started on port 8080
再运行客户端程序,控制台输出:
Sent meter data: MeterReportData(version=1, messageType=1, meterId=meter10621, reading=0.9, timestamp=xxxxxxxxx)
Sent meter data: MeterReportData(version=1, messageType=1, meterId=meter10622, reading=0.5, timestamp=xxxxxxxxx)
Sent meter data: MeterReportData(version=1, messageType=1, meterId=meter10623, reading=0.6, timestamp=xxxxxxxxx)
==================================
Received response from server:
Meter ID: meter10621
Ack Status: 10
Message: Data received successfully
==================================
==================================
Received response from server:
Meter ID: meter10622
Ack Status: 10
Message: Data received successfully
==================================
==================================
Received response from server:
Meter ID: meter10623
Ack Status: 10
Message: Data received successfully
==================================
服务端控制台输出:
Received meter data: MeterReportData(version=1, messageType=1, meterId=meter10621, reading=0.9, timestamp=xxxxxxxxx)
Received meter data: MeterReportData(version=1, messageType=1, meterId=meter10622, reading=0.5, timestamp=xxxxxxxxx)
Received meter data: MeterReportData(version=1, messageType=1, meterId=meter10623, reading=0.6, timestamp=xxxxxxxxx)