Netty-13-TCP粘包和拆包

Netty-13-TCP粘包和拆包

前言

  • TCP是面向连接的,面向流的,提供高可靠性服务。
  • 收发两端(客户端和服务器端)都要有一一成对的socket,因此,发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。
  • 这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通信是无消息保护边界

通常的解决方案

  1. 发送端每发送一次消息,就需要在消息的内容之前携带消息的长度

  2. 这样,接收方每次先接受消息的长度,再根据长度去读取消息剩余的元素

  3. 如果 socket 中还有没有读取的内容,也只能放在下一次读取事件中读取

1. 拆包、粘包的图解

mark

假设客户端同时发送了两个数据包D1和D2给服务端,由于服务端一次读取到字节数是不确定的,固可能存在以下四种情况:

  1. 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包

  2. 服务端一次接受到了两个数据包,D1和D2粘合在一起,称之为TCP粘包

  3. 服务端分两次读取到了数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这称之为TCP拆包

  4. 服务端分两次读取到了数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余部分内容D1_2和完整的D2包。

2. TCP 拆包、粘包

本实例主要演示出现拆包和粘包的场景。

客户端:

我们将使用循环连续发送10个String类型的字符串。这里相当于发送了10次。

1
2
3
4
5
6
7
8
9
10
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//使用客户端发送10条数据,hello,server
for (int i = 0; i < 10; i++) {
String msg = "server" + i + " ";
System.out.println("发送消息 " + msg);
ByteBuf byteBuf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8);
ctx.writeAndFlush(byteBuf);
}
}

服务端:

我们接受客户端发过来的字符串。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private int count = 0;
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] bytes = new byte[msg.readableBytes()];
msg.readBytes(bytes);

//将buffer转成字符串
String message = new String(bytes, CharsetUtil.UTF_8);

System.out.println("服务器接收到数据 " + message);
System.out.println("服务器接收到消息量 = " + (++this.count));

//服务器回送数据到客户端,回送一个随机Id
ByteBuf response = Unpooled.copiedBuffer(UUID.randomUUID().toString() + "--", CharsetUtil.UTF_8);
ctx.writeAndFlush(response);
}

服务端输出结果如下:

mark

  • 可以看到,服务端直接一次就把我们客户端10次发送的内容读取完成了。
  • 这里也印证了我们开篇所说的,当数据量小且发送间隔短,如果我们客户端每次发送的都是不同的结果,这种情况下我们就不知道客户端返回了多少次结果以及每次结果究竟是什么。这就是我们本篇需要解决的问题。

3. TCP 问题解决方案

mark

  • 在数据包的前面加上一个固定字节数的数据长度,如加上一个 int (固定四个字节)类型的数据内容长度
  • 就算客户端同时发送两个数据包到服务端,当服务端接收时,也可以先读取四个字节的长度,然后根据长度获取相应消息的内容,这样就不会出现多读取或者少读取的情况了。

3.1 解决方案代码演示

  • 使用自定义协议 + 编解码器 来解决

  • 关键就是要解决 服务器端每次读取数据长度的问题

  • 这个问题解决,就不会出现服务器多读或少读数据的问题,从而避免的TCP 粘包、拆包 。

  1. 自定义协议(重要)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 自定义协议
public class Message {
private int len; //关键
private byte[] content;

public int getLen(){
return len;
}

public void setLen(int len) {
this.len = len;
}

public byte[] getContent(){
return content;
}

public void setContent(byte[] content){
this.content = content;
}
}
  1. 客户端引导器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class MyClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();

try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new MyClientInitializer()); // 自定义初始化类


ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
channelFuture.channel().closeFuture().sync(); // 监听关闭端口事件
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 关闭连接
group.shutdownGracefully();
}
}
}
  1. 客户端pipeline
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();

// 加入多个handler
pipeline.addLast(new MyMessageEncoder()); // 加入编码器
pipeline.addLast(new MyMessageDecoder()); // 加入解码器
pipeline.addLast(new MyClientHandler()); // 加入自定义处理器
}
}
  1. 客户端Handler(重要)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.nio.charset.Charset;

public class MyClientHandler extends SimpleChannelInboundHandler<Message> {
// 记录发送了几条消息
private int count;

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 客户端发送5条数据 "今天天气冷,吃火锅" 编号
for (int i = 0; i < 5; i++) {
String msg = "今天天气冷,吃火锅";

// 封装数据对象
byte[] content = msg.getBytes(Charset.forName("utf-8")); // 数据内容转换成字节数组
int length = msg.getBytes(Charset.forName("utf-8")).length; // 数据长度

// 封装成自定义数据包对象
Message message = new Message();
message.setLen(length);
message.setContent(content);

// 发送
ctx.writeAndFlush(message);
}
}

protected void channelRead0(ChannelHandlerContext channelHandlerContext, Message message) throws Exception {

}
}
  1. 服务端引导器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class MyServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();


try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MyServerInitializer());

ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
  1. 服务端pipeline
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();

// 加入自定义处理器
pipeline.addLast(new MyMessageDecoder());
pipeline.addLast(new MyMessageEncoder());
pipeline.addLast(new MyServerHandler());
}
}
  1. 服务端handler(重要)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.nio.charset.Charset;
import java.util.UUID;

public class MyServerHandler extends SimpleChannelInboundHandler<Message> {
// 统计接收的次数
private int count;

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Message message) throws Exception {

// 接收到数据进行处理
int len = message.getLen();
byte[] content = message.getContent();

System.out.println("服务器接收到信息如下");
System.out.println("长度=" + len);
System.out.println("内容=" + new String(content, Charset.forName("utf-8")));
System.out.println("服务器接收到消息包数量=" + (++this.count));


// 回复客户端消息
String response = UUID.randomUUID().toString();
int resLen = response.getBytes("utf-8").length;
byte[] rescontent = response.getBytes("utf-8");

// 把回复的消息封装成一个message对象
Message message1 = new Message();
message1.setContent(rescontent);
message1.setLen(resLen);

// 发送给客户端
channelHandlerContext.writeAndFlush(message1);
}
}
  1. 自定义协议编码器
1
2
3
4
5
6
7
8
9
10
11
12
13
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class MyMessageEncoder extends MessageToByteEncoder<Message> {
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
System.out.println("MyMessageEncoder encode 方法被调用");

// 将编码数据写入ByteBuf中
byteBuf.writeInt(message.getLen());
byteBuf.writeBytes(message.getContent());
}
}
  1. 自定义协议解码器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;

import java.util.List;

public class MyMessageDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyMessageDecoder decode 被调用");
//需要将得到二进制字节码-> MessageProtocol 数据包(对象)
int length = in.readInt();

byte[] content = new byte[length];
in.readBytes(content);

//封装成 MessageProtocol 对象,放入 out, 传递下一个handler业务处理
Message messageProtocol = new Message();
messageProtocol.setLen(length);
messageProtocol.setContent(content);

out.add(messageProtocol);

}
}

本文档整理自 尚硅谷韩顺平Netty 相关课程。

打赏
  • 版权声明: 本博客所有文章除特别声明外,均采用 Apache License 2.0 许可协议。转载请注明出处!
  • © 2019-2022 Zhuuu
  • PV: UV:

请我喝杯咖啡吧~

支付宝
微信