一、概述
前面已经完成了一个完美的多并发CS模型,但美中不足的是没有解决粘包问题。
1.1 什么是粘包问题?
在网络传输中,数据都是通过数据流来传输的,也就是以比特来传输。传输的过程中我们可能会遇到各种各样的问题导致数据传输异常,最常见的就是网络发送时延。网络时延会导致服务端此时收到的数据的时间有偏差,然后就导致数据接收数据的时间不一致。
可以看一个例子,修改上篇的服务端和客户端为以下内容:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
for {
data := make([]byte, 2048)
n, err := conn.Read(data)
if n == 0{
fmt.Printf("%s has disconnect", conn.RemoteAddr())
break
}
if err != nil{
fmt.Println(err)
continue
}
//fmt.Printf("Receive data [%s] from [%s]", string(data[:n]), conn.RemoteAddr())
//修改上句为下面的
fmt.Printf("Receive %d byte data : %s", n, string(data[:n]))
//程序睡眠1ns,模拟网络时延
time.Sleep(time.Nanosecond)
}
客户端改为以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main(){
conn, err := net.Dial("tcp", ":8899")
if err != nil{
fmt.Println(err)
return
}
for i := 0; i < 100; i++{
data := fmt.Sprintf("{"index":%d, "name":"maqian", "age":21, "company":"intely"}", i + 1)
n, err := conn.Write([]byte(data))
if err != nil{
fmt.Println(err)
continue
}
fmt.Printf("Send %d byte data : %s", n, data)
}
}
以上我们发送了100条json到服务端,按照预想服务端将会输出100行json,但是实际上并不是:
这个现象产生的原因是因为服务端每次读取数据之后将会休眠1ns,但是对于客户端来说,这1ns它还在一直传输数据,1ns的时间可能 发送了1条,也可能是2条,这个数量我们不知道是多少,也无法控制。于是就导致数据堆积,服务端再读取就会出问题了。与此同时,由于缓冲区有限,一次最多读取2048个字节,堆积的字节超过2048的也无法读取,只能留到下次读取,这种现象就是粘包问题。
二、解决办法
上面抛出了粘包的问题后,现在就要开始想办法处理了,怎么处理呢?这里就需要用到协议了,协议就是双方约定好的数据包格式, 让服务端知道从哪里开始读,读到哪里结束,这样就不会出错了。实现这个协议最简单的办法就是加上一个协议头和一个数据包长度 。
[0x11, 0x22, 0x33][0xaa, 0xbb][0xaa, 0xbb, 0x03, 0x11, 0x22, 0x33]
[0xaa, 0xbb]3
三、实现
指定好了协议之后就可以开始实现了,为了方便,直接把这里写成一个对象:
1
2
3
type SocketUtil struct {
Coon net.Conn
}
包头的定义:
1
2
3
4
type PkgHeader struct {
HeaderFlag [2]byte
DataLength uint32
}
包头包括协议头和数据长度,共六个字节。
3.1 数据发送时的封装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (fd *SocketUtil) WritePkg(data []byte)(int, error){
if fd == nil {
return -1, errors.New(SOCKET_ERROR_SERVER_CLOSED)
}
if len(data) == 0{
return 0, nil
}
buff := bytes.NewBuffer([]byte{})
binary.Write(buff, binary.BigEndian, []byte{0xaa, 0xbb}) //添加协议头
binary.Write(buff, binary.BigEndian, uint32(len(data))) //添加长度
binary.Write(buff, binary.BigEndian, data) //数据部分
allBytes := buff.Bytes()
return fd.writeNByte(allBytes)
}
writeByte()
1
2
3
4
5
6
7
8
func (fd *SocketUtil) writeNByte(data []byte)(int, error){
n, err := fd.Coon.Write(data)
if err != nil{
return -1, err
}else{
return n, nil
}
}
3.2 接收数据时解包
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (fd *SocketUtil) ReadPkg()([]byte, error){
if fd == nil || fd.Coon == nil{
return nil, errors.New(SOCKET_ERROR_SERVER_CLOSED)
}
head, err := fd.readHead() //先读取数据头
if err != nil{
return nil, err
}
//数据头和约定不一样,报错
if head.HeaderFlag != [2]byte{0xaa, 0xbb}{
return nil, errors.New("Head package error")
}
//读取指定长度的数据
datas, err := fd.readNByte(head.DataLength)
if err != nil{
return nil, err
}
return datas, nil
}
readHead()
1
2
3
4
5
6
7
8
9
10
11
func (fd *SocketUtil) readHead()(*PkgHeader, error){
data, err := fd.readNByte(HeaderLength)
if err != nil{
return nil, err
}
h := PkgHeader{}
buff := bytes.NewBuffer(data)
binary.Read(buff, binary.BigEndian, &h.HeaderFlag) //读取0xaa 0xbb连个字节
binary.Read(buff, binary.BigEndian, &h.DataLength) //读取四个字节的长度
return &h, nil
}
readNByte()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (fd * SocketUtil) readNByte(n uint32)([]byte, error){
data := make([]byte, n)
for x := 0; x < int(n) ;{
length, err := fd.Coon.Read(data[x:]) //有数据则读,没有则阻塞
if length == 0{
return nil, errors.New(SOCKET_ERROR_CLIENT_CLOSED)
}
if err != nil{
return nil, err
}
x += length
}
return data, nil
}
3.3 完整代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package common
import (
"net"
"errors"
"bytes"
"encoding/binary"
)
type PkgHeader struct {
HeaderFlag [2]byte
DataLength uint32
}
const(
HeaderLength = 6
)
const(
SOCKET_ERROR_CLIENT_CLOSED = "Client has been closed"
SOCKET_ERROR_SERVER_CLOSED = "Server has been closed"
SOCKET_ERROR_TIMEOUT = "Timeout"
)
type SocketUtil struct {
Conn net.Conn
}
func (fd *SocketUtil) Init(conn net.Conn){
fd.Conn = conn
}
func (fd *SocketUtil) WritePkg(data []byte)(int, error){
if fd == nil {
return -1, errors.New(SOCKET_ERROR_SERVER_CLOSED)
}
if len(data) == 0{
return 0, nil
}
buff := bytes.NewBuffer([]byte{})
binary.Write(buff, binary.BigEndian, []byte{0xaa, 0xbb})
binary.Write(buff, binary.BigEndian, uint32(len(data)))
binary.Write(buff, binary.BigEndian, data)
allBytes := buff.Bytes()
return fd.writeNByte(allBytes)
}
func (fd *SocketUtil) ReadPkg()([]byte, error){
if fd == nil || fd.Conn == nil{
return nil, errors.New(SOCKET_ERROR_SERVER_CLOSED)
}
head, err := fd.readHead()
if err != nil{
return nil, err
}
if head.HeaderFlag != [2]byte{0xaa, 0xbb}{
return nil, errors.New("Head package error")
}
datas, err := fd.readNByte(head.DataLength)
if err != nil{
return nil, err
}
return datas, nil
}
func (fd *SocketUtil) readHead()(*PkgHeader, error){
data, err := fd.readNByte(HeaderLength)
if err != nil{
return nil, err
}
h := PkgHeader{}
buff := bytes.NewBuffer(data)
binary.Read(buff, binary.BigEndian, &h.HeaderFlag)
binary.Read(buff, binary.BigEndian, &h.DataLength)
return &h, nil
}
func (fd * SocketUtil) readNByte(n uint32)([]byte, error){
data := make([]byte, n)
for x := 0; x < int(n) ;{
length, err := fd.Conn.Read(data[x:])
if length == 0{
return nil, errors.New(SOCKET_ERROR_CLIENT_CLOSED)
}
if err != nil{
return nil, err
}
x += length
}
return data, nil
}
func (fd *SocketUtil) writeNByte(data []byte)(int, error){
n, err := fd.Conn.Write(data)
if err != nil{
return -1, err
}else{
return n, nil
}
}
func (fd *SocketUtil) Close(){
fd.Conn.Close()
}
四、服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main
import (
"net"
"fmt"
"网络编程/并发/common"
)
func handle(conn net.Conn){
defer conn.Close()
fmt.Println("Connect :", conn.RemoteAddr())
fd := common.SocketUtil{conn}
for {
data, err := fd.ReadPkg() //读取数据
if err != nil{
fmt.Println(err)
break
}
fmt.Println(string(data))
}
}
func main(){
listener, err := net.Listen("tcp", ":8899")
if err != nil{
fmt.Println(err)
return
}
fmt.Println("Start listen localhost:8899")
for {
conn, err := listener.Accept()
if err != nil{
fmt.Println(err)
return
}
go handle(conn)
}
}
五、客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main
import (
"net"
"fmt"
"网络编程/并发/common"
)
func main(){
conn, err := net.Dial("tcp", ":8899")
if err != nil{
fmt.Println(err)
return
}
clntFd := common.SocketUtil{conn}
for i := 0; i < 100; i++{
data := fmt.Sprintf("{"index":%d, "name":"maqian", "age":21, "company":"intely"}", i + 1)
n, err := clntFd.WritePkg([]byte(data))
if err != nil{
fmt.Println(err)
return
}
fmt.Printf("Send %d byte data : %s", n, data)
}
}
六、运行
运行服务端再运行客户端就会发现,已经不和之前的一样了,整整齐齐,perfect!