一般情况下,如果要实现聊天即时通讯,都要借助公网服务器作为中继节点对消息进行转发。
例如用户A和用户B进行即时通讯的具体步骤如下所示
首先用户A和B需要和公网服务器建立长连接
ClientA ====> (建立长连接) ===> 公网服务器
ClientB ====> (建立长连接) ===> 公网服务器
紧接着用户A如果想发送消息给用户B,就会采用转发的形式
ClientA => 公网服务器(消息转发) => ClientB
但是我们从中可以看到,如果用户之间进行的是语音视频通话,所有流量将会从中继服务器中经过。这将会给中继服务器带来巨大挑战。
那么是否可以存在一种方式可以抛除中继服务器的存在,让用户A和用户B进行直连通信呢?
我们知道用户A和用户B都在各自的内网下,双方都不知道彼此的地址,那么如何进行通信成了问题。
二、P2P 通信与NAT类型
紧接上文,其实用户A在给中继服务器发送长连接请求后,中继服务器就能获取到运营商给用户A开放的公网IP和端口。
那么如果用户B知道了用户A所在的公网IP和端口,是否就能脱离中继服务器的限制,直接发送请求给用户A所在的IP和端口呢?
答案是,在一定情况下是可以的。这要求用户A所在的 NAT 是完全锥形。
NAT 的作用是会将内网主机的IP地址映射为一个公网IP,由于 IPV4 地址池不够用的情况下,运营商不会给每个接入互联网的用户分配公网 IP ,而是多个用户,或者一整个小区公用一个公网 IP 出口。
当用户发送网络请求时, NAT 会将用户的内网 IP 转换为公网 IP,并且分配一个公网端口。当用户的请求结束,一段时间后该这些公共资源将会被回收。
Server S1 Server S2
18.181.0.31:1235 138.76.29.7:1235
| |
| |
+----------------------+----------------------+
|
^ Session 1 (A-S1) ^ | ^ Session 2 (A-S2) ^
| 18.181.0.31:1235 | | | 138.76.29.7:1235 |
v 155.99.25.11:62000 v | v 155.99.25.11:62000 v
|
Cone NAT
155.99.25.11
|
^ Session 1 (A-S1) ^ | ^ Session 2 (A-S2) ^
| 18.181.0.31:1235 | | | 138.76.29.7:1235 |
v 10.0.0.1:1234 v | v 10.0.0.1:1234 v
|
用户内网
10.0.0.1:1234
基于这种特性,NAT一般情况被分为 4 类
- 完全圆锥型NAT (Full Cone NAT)把一个来自内部IP地址和端口的所有请求,始终映射到相同的外网IP地址和端口;同时,任意外部主机向该映射的外网IP地址和端口发送报文,都可以实现和内网主机进行通信,就像一个向外开口的圆锥形一样,故得名。
- 地址限制式锥形NAT(Address Restricted Cone NAT)地址限制式圆锥形NAT同样把一个来自内部IP地址和端口的所有请求,始终映射到相同的外网IP地址和端口;与完全圆锥型NAT不同的是,当内网主机向某公网主机发送过报文后,只有该公网主机才能向内网主机发送报文,故得名。相比完全锥形,增加了地址限制,也就是IP受限,而端口不受限。
- 端口限制式锥形NAT(Port Restricted Cone NAT)端口限制式圆锥形NAT更加严格,在上述条件下,只有该公网主机该端口才能向内网主机发送报文,故得名。相比地址限制锥形又增加了端口限制,也就是说IP、端口都受限。
- 对称式NAT(Symmetric NAT)对称式NAT把内网IP和端口到相同目的地址和端口的所有请求,都映射到同一个公网地址和端口;同一个内网主机,用相同的内网IP和端口向另外一个目的地址发送报文,则会用不同的映射(比如映射到不同的端口)。和端口限制式NAT不同的是,端口限制式NAT是所有请求映射到相同的公网IP地址和端口,而对称式NAT是为不同的请求建立不同的映射。它具有端口受限锥型的受限特性,内部地址每一次请求一个特定的外部地址,都可能会绑定到一个新的端口号。也就是请求不同的外部地址映射的端口号是可能不同的。这种类型基本上就告别 P2P 了。
一般情况下,家用 NAT 是NAT3,也就是 端口限制式锥形NAT。我们基于这一特性可以尝试让两台主机进行内网端对端直连。
请注意,P2P通信不意味着全程不需要服务器的介入。服务器的介入只是为了让双方节点都获取到各自穿透的公网 IP和端口,实现的具体流程请方法下图。
请注意这里使用到了端口复用技术。因为我们的端口不仅要监听一个服务,并且这个端口还能进行复用发送网络请求。
具体代码示例如下:
代码我把它托管到了 Github 上,并且有完整说明,链接如下
server.go
代码其实很简单,server.go 只做一件事,交换两个内网节点临时生成的公网 IP 和端口
package main
import (
"encoding/json"
"fmt"
"github.com/go-basic/uuid"
"github.com/libp2p/go-reuseport"
"net"
"time"
)
type Client struct {
UID string
Conn net.Conn
Address string
}
type Handler struct {
// 服务端句柄
Listener net.Listener
// 客户端句柄池
ClientPool map[string]*Client
}
func (s *Handler) Handle() {
for {
conn, err := s.Listener.Accept()
if err != nil {
fmt.Println("获取连接句柄失败", err.Error())
continue
}
id := uuid.New()
s.ClientPool[id] = &Client{
UID: id,
Conn: conn,
Address: conn.RemoteAddr().String(),
}
fmt.Println("一个客户端连接进去了,他的公网IP是", conn.RemoteAddr().String())
// 暂时只接受两个客户端,多余的不处理
if len(s.ClientPool) == 2 {
// 交换双方的公网地址
s.ExchangeAddress()
break
}
}
}
// ExchangeAddress 交换地址
func (s *Handler) ExchangeAddress() {
for uid, client := range s.ClientPool {
for id, c := range s.ClientPool {
// 自己不交换
if uid == id {
continue
}
var data = make(map[string]string)
data["dst_uid"] = client.UID // 对方的 UID
data["address"] = client.Address // 对方的公网地址
body, _ := json.Marshal(data)
if _, err := c.Conn.Write(body); err != nil {
fmt.Println("交换地址时出现了错误", err.Error())
}
}
}
}
func main() {
address := fmt.Sprintf("0.0.0.0:6999")
listener, err := reuseport.Listen("tcp", address)
if err != nil {
panic("服务端监听失败" + err.Error())
}
h := &Handler{Listener: listener, ClientPool: make(map[string]*Client)}
// 监听内网节点连接,交换彼此的公网 IP 和端口
h.Handle()
time.Sleep(time.Hour) // 防止主线程退出
}
client.go
客户端得到对方的临时生成的公网IP和端口后,尝试进行连接,并不停发送数据
package main
import (
"crypto/rand"
"encoding/json"
"fmt"
"github.com/libp2p/go-reuseport"
"math"
"math/big"
"net"
"time"
)
type Handler struct {
// 中继服务器的连接句柄
ServerConn net.Conn
// p2p 连接
P2PConn net.Conn
// 端口复用
LocalPort int
}
// WaitNotify 等待远程服务器发送通知告知我们另一个用户的公网IP
func (s *Handler) WaitNotify() {
buffer := make([]byte, 1024)
n, err := s.ServerConn.Read(buffer)
if err != nil {
panic("从服务器获取用户地址失败" + err.Error())
}
data := make(map[string]string)
if err := json.Unmarshal(buffer[:n], &data); err != nil {
panic("获取用户信息失败" + err.Error())
}
fmt.Println("客户端获取到了对方的地址:", data["address"])
// 断开服务器连接
defer s.ServerConn.Close()
// 请求用户的临时公网 IP
go s.DailP2PAndSayHello(data["address"], data["dst_uid"])
}
// DailP2PAndSayHello 连接对方临时的公网地址,并且不停的发送数据
func (s *Handler) DailP2PAndSayHello(address, uid string) {
var errCount = 1
var conn net.Conn
var err error
for {
// 重试三次
if errCount > 3 {
break
}
time.Sleep(time.Second)
conn, err = reuseport.Dial("tcp", fmt.Sprintf(":%d", s.LocalPort), address)
if err != nil {
fmt.Println("请求第", errCount, "次地址失败,用户地址:", address)
errCount++
continue
}
break
}
if errCount > 3 {
panic("客户端连接失败")
}
s.P2PConn = conn
go s.P2PRead()
go s.P2PWrite()
}
// P2PRead 读取 P2P 节点的数据
func (s *Handler) P2PRead() {
for {
buffer := make([]byte, 1024)
n, err := s.P2PConn.Read(buffer)
if err != nil {
fmt.Println("读取失败", err.Error())
time.Sleep(time.Second)
continue
}
body := string(buffer[:n])
fmt.Println("读取到的内容是:", body)
fmt.Println("来自地址", s.P2PConn.RemoteAddr())
fmt.Println("=============")
}
}
// P2PWrite 向远程 P2P 节点写入数据
func (s *Handler) P2PWrite() {
for {
if _, err := s.P2PConn.Write([]byte("你好呀~")); err != nil {
fmt.Println("客户端写入错误")
}
time.Sleep(time.Second)
}
}
func main() {
// 指定本地端口
localPort := RandPort(10000, 50000)
// 向 P2P 转发服务器注册自己的临时生成的公网 IP (请注意,Dial 这里拨号指定了自己临时生成的本地端口)
serverConn, err := reuseport.Dial("tcp", fmt.Sprintf(":%d", localPort), "你自己的公网服务器IP:6999")
if err != nil {
panic("请求远程服务器失败" + err.Error())
}
h := &Handler{ServerConn: serverConn, LocalPort: int(localPort)}
h.WaitNotify()
time.Sleep(time.Hour)
}
// RandPort 生成区间范围内的随机端口
func RandPort(min, max int64) int64 {
if min > max {
panic("the min is greater than max!")
}
if min < 0 {
f64Min := math.Abs(float64(min))
i64Min := int64(f64Min)
result, _ := rand.Int(rand.Reader, big.NewInt(max+1+i64Min))
return result.Int64() - i64Min
}
result, _ := rand.Int(rand.Reader, big.NewInt(max-min+1))
return min + result.Int64()
}