Message消息模块的定义
Message消息模块主要用于放在Request中的变量,便于知道包的ID和长度,以便处理黏包
- 新增imessage.go接口脚本
type IMessage interface {
GetMsgID() uint32
GetMsgLength() uint32
GetMsgData() []byte
SetMsgID(id uint32)
SetMsgLength(len uint32)
SetMsgData(data []byte)
}
- 新增message.go结构体脚本
type Message struct {
ID uint32
DataLength uint32
Data []byte
}
func NewMessage(id uint32, data []byte) *Message {
return &Message{ID: id, DataLength: uint32(len(data)), Data: data}
}
func (m *Message) GetMsgID() uint32 {
return m.ID
}
func (m *Message) GetMsgLength() uint32 {
return m.DataLength
}
func (m *Message) GetMsgData() []byte {
return m.Data
}
func (m *Message) SetMsgID(id uint32) {
m.ID = id
}
func (m *Message) SetMsgLength(len uint32) {
m.DataLength = len
}
func (m *Message) SetMsgData(data []byte) {
m.Data = data
}
封包拆包过程的实现
由于对包体增加了包头,为了方便使用,新增对字节流和消息包的转换脚本
- 新增idatapack.go接口文件
type IDataPack interface {
GetHeadLength() int
Pack(message IMessage) ([]byte, error)
UnPack(data []byte) (IMessage, error)
}
- 新增datapack.go结构体文件
type DataPack struct {
}
func NewDataPack() *DataPack {
return &DataPack{}
}
func (d *DataPack) GetHeadLength() int {
return 8
}
func (d *DataPack) Pack(message ziface.IMessage) ([]byte, error) {
buffer := bytes.NewBuffer([]byte{})
var err error
if err = binary.Write(buffer, binary.LittleEndian, message.GetMsgLength()); err != nil {
return nil, err
}
if err = binary.Write(buffer, binary.LittleEndian, message.GetMsgID()); err != nil {
return nil, err
}
if err = binary.Write(buffer, binary.LittleEndian, message.GetMsgData()); err != nil {
return nil, err
}
return buffer.Bytes(), nil
}
func (d *DataPack) UnPack(data []byte) (ziface.IMessage, error) {
buffer := bytes.NewReader(data)
message := &Message{}
var err error
if err = binary.Read(buffer, binary.LittleEndian, &message.DataLength); err != nil {
return nil, err
}
if err = binary.Read(buffer, binary.LittleEndian, &message.ID); err != nil {
return nil, err
}
if err = binary.Read(buffer, binary.LittleEndian, &message.Data); err != nil {
return nil, err
}
return message, nil
}
对封包拆包进行单元测试
- 新建packdata_test.go文件(测试文件后缀必为xxx_test.go,且开始执行的函数必须以TestXXX,形参必为t *testing.T)
- 临时创建服务器用于测试接收黏包消息
- 临时创建客户端,连接到服务器中,用于发送黏包消息
func TestDataPack(t *testing.T) {
/*
服务器
*/
addr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("%s:%d", "127.0.0.1", 8888))
if err != nil {
fmt.Println(err)
return
}
listener, err := net.ListenTCP("tcp", addr)
if err != nil {
fmt.Println(err)
return
}
go func() {
for true {
conn, err := listener.AcceptTCP()
if err != nil {
fmt.Println(err)
}
go func(conn *net.TCPConn) {
dp := NewDataPack()
for true {
headData := make([]byte, dp.GetHeadLength())
_, err := io.ReadFull(conn, headData)
if err != nil {
fmt.Println(err)
break
}
msgHead, err := dp.UnPack(headData)
if err != nil {
fmt.Println(err)
return
}
if msgHead.GetMsgLength() > 0 {
msg, ok := msgHead.(*Message)
if !ok {
return
}
msg.Data = make([]byte, msg.GetMsgLength())
_, err := io.ReadFull(conn, msg.Data)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("Recv MsgID:%d ,DataLen:%d Data:%s\n", msg.ID, msg.DataLength, string(msg.Data))
}
}
}(conn)
}
}()
/*
客户端
*/
conn, err := net.Dial("tcp", "127.0.0.1:8888")
if err != nil {
fmt.Println(err)
return
}
dp := NewDataPack()
msg1 := &Message{
ID: 1,
DataLength: 5,
Data: []byte{'h', 'e', 'l', 'l', 'o'},
}
msg2 := &Message{
ID: 2,
DataLength: 7,
Data: []byte{'q', 'w', 'e', 'r', 't', 'y', 'u'},
}
sendMsg1, err := dp.Pack(msg1)
if err != nil {
fmt.Println(err)
return
}
sendMsg2, err := dp.Pack(msg2)
if err != nil {
fmt.Println(err)
return
}
sendMsg1 = append(sendMsg1, sendMsg2...)
_, err = conn.Write(sendMsg1)
if err != nil {
fmt.Println(err)
return
}
select {}
}
将上述功能集成到框架中
- 对Request结构体进行重构
- 对Connection结构体中的StartReader方法进行重构
- 修改读取包的代码块
- 新增完善发送消息SendMsg方法
func (c *Connection) SendMsg(msgID uint32, data []byte) error {
if c.isClosed {
return errors.New("connection closed when sendMsg")
}
dp := NewDataPack()
buffer, err := dp.Pack(NewMessage(msgID, data))
if err != nil {
fmt.Println(err)
return err
}
_, err = c.GetTcpConnection().Write(buffer)
if err != nil {
fmt.Println(err)
return err
}
return nil
}
对功能进行测试
项目的整体结构
- 服务端
type PingRouter struct {
znet.BaseRouter
}
func (p *PingRouter) Handle(request ziface.IRequest) {
fmt.Println("服务器接收到客户端的消息,MsgID:", request.GetMsgID(), "MsgData:", string(request.GetData()))
err := request.GetConnection().SendMsg(1, []byte("ping...ping...ping..."))
if err != nil {
fmt.Println("Handle Error:", err)
return
}
}
func main() {
s := znet.NewServer("[zinxV0.5]ServerApp")
s.AddRouter(&PingRouter{})
s.Serve()
}
- 客户端
func main() {
fmt.Println("Client Start...")
time.Sleep(time.Second)
conn, err := net.Dial("tcp", "127.0.0.1:8080")
if err != nil {
fmt.Println("Client Start Error", err)
return
}
//创建子Goroutine去读取数据
go func() {
for true {
dp := znet.NewDataPack()
headData := make([]byte, dp.GetHeadLength())
_, err := io.ReadFull(conn, headData)
if err != nil {
fmt.Println(err)
return
}
msg, err := dp.UnPack(headData)
if err != nil {
fmt.Println(err)
return
}
var buffer []byte
if msg.GetMsgLength() <= 0 {
return
}
buffer = make([]byte, msg.GetMsgLength())
_, err = io.ReadFull(conn, buffer)
if err != nil {
fmt.Println(err)
return
}
msg.SetMsgData(buffer)
fmt.Println("接收服务器的消息,MsgID:", msg.GetMsgID(), "MsgData:", string(msg.GetMsgData()))
}
}()
//让主Goroutine阻塞,去写数据
for true {
dp := znet.NewDataPack()
buffer, err := dp.Pack(znet.NewMessage(0, []byte("你好啊服务器!!!")))
if err != nil {
fmt.Println(err)
return
}
_, err = conn.Write(buffer)
if err != nil {
fmt.Println(err)
return
}
time.Sleep(time.Second)
}
}