rtsp拉流接收数据包
注:setup包含音视频时要setup 2次,且setup 的路径要对的上a=control:trackID=0
/*
* auth : dujingning
* date : 2021.09.12
* license : MIT
*/
package main
import (
"encoding/binary"
"flag"
"fmt"
"log"
"net"
"regexp"
"strings"
"time"
)
const (
version = "RTSP/1.0"
UserAgent = "User-Agent: djav1.0.0"
rtspUrl = "rtsp://192.168.1.61:554/test"
cseq = "CSeq: "
Accept = "Accept: application/sdp"
Transport = "Transport: RTP/AVP/TCP;unicast;interleaved=0-1"
Range = "Range: npt=0.000-"
Session = "Session: "
)
const (
DESCRIBE = "DESCRIBE"
ANNOUNCE = "ANNOUNCE"
GET_PARAMETER = "GET_PARAMETER"
OPTIONS = "OPTIONS"
PAUSE = "PAUSE"
PLAY = "PLAY"
RECORD = "RECORD"
REDIRECT = "REDIRECT"
SETUP = "SETUP"
SET_PARAMETER = "SET_PARAMETER"
TEARDOWN = "TEARDOWN"
)
var rtspUrlIn string
func option() string {
return OPTIONS + " " + rtspUrlIn + " " + version + "\r\n" +
cseq + "1" + "\r\n" +
UserAgent + "\r\n\r\n"
}
func describe() string {
return DESCRIBE + " " + rtspUrlIn + " " + version + "\r\n" +
cseq + "2" + "\r\n" +
UserAgent + "\r\n" +
Accept + "\r\n\r\n"
}
func setup() string {
return SETUP + " " + rtspUrlIn + "/streamid=0 " + version + "\r\n" +
cseq + "3" + "\r\n" +
UserAgent + "\r\n" +
Transport + "\r\n\r\n"
}
func play(sessionid string) string {
return PLAY + " " + rtspUrlIn + " " + version + "\r\n" +
cseq + "4" + "\r\n" +
UserAgent + "\r\n" +
Session + sessionid + "\r\n" +
Range + "\r\n\r\n"
}
func teardown(sessionid string) string {
return TEARDOWN + " " + rtspUrlIn + " " + version + "\r\n" +
cseq + "4" + "\r\n" +
UserAgent + "\r\n" +
Session + sessionid + "\r\n\r\n"
}
func sendAndRecv(conn net.Conn, trimmedInput string) string {
//发送数据
if _, err := conn.Write([]byte(trimmedInput)); err != nil {
fmt.Printf("write failed , err : %v\n", err)
return ""
}
fmt.Println(trimmedInput)
//接受数据
var recvData = make([]byte, 1024)
if _, err := conn.Read(recvData); err != nil {
fmt.Printf("Read failed , err : %v\n", err)
return ""
}
fmt.Println(string(recvData))
return string(recvData)
}
func Recv(conn net.Conn) string {
//接受数据
var recvData = make([]byte, 1024*1024)
var sumData uint64 = 0
timeStart := time.Now().Unix()
defer func() {
log.Println("Time Out, Fully Recved size :", float64(sumData)/float64(1024*1024), "MB / ", float64(sumData)/float64(1024), "KB ", float64(sumData)/float64(1024)/float64((time.Now().Unix()-timeStart)), " KB/s", " timeSpend:", time.Now().Unix()-timeStart)
if p := recover(); p != nil {
fmt.Printf("panic: %s\n", p)
}
}()
var rtpLen, counter int
rtpLen = 0
counter = 0
for {
/* 读取会出现粘包,多个rtp包在同一个rtsp头下面 */
if readSize, err := conn.Read(recvData); err != nil {
conn.SetReadDeadline(time.Now().Add(time.Duration(3) * time.Second))
log.Printf("Read failed , err : %v\n", err)
//break
} else {
sumData += uint64(readSize)
conn.SetReadDeadline(time.Now().Add(time.Duration(10) * time.Second))
//log.Println("Read OK, Recv size :", readSize)
if 0x24 == recvData[0] {
//log.Println("rtp data") // rtsp协议头 4 字节
contentLen := make([]byte, 2)
contentLen[0] = recvData[2]
contentLen[1] = recvData[3]
rtpLen = int(binary.BigEndian.Uint16(contentLen))
rtpInfo := ParseRTP(recvData[4 : rtpLen+4]) // rtp协议头 16 字节
/*
The RTP header has the following format:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|V=2|P|X| CC |M| PT | sequence number |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| timestamp |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| synchronization source (SSRC) identifier |
+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
| contributing source (CSRC) identifiers |
| .... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
log.Println("version:", recvData[4]>>6, " pt:", (recvData[5]<<1)>>1, " X", (recvData[4]>>4)&0x01, " rtpPayload: ", len(rtpInfo.Payload))
//log.Println("version:", rtpInfo.Version, " payloadType:", rtpInfo.PayloadType, " timestamp:", rtpInfo.Timestamp)
if 72 == rtpInfo.PayloadType {
log.Println("Sender Report---------- : ", rtpInfo.PayloadType)
}
if 97 == rtpInfo.PayloadType {
contentLen := make([]byte, 2)
contentLen[0] = recvData[0]
contentLen[1] = recvData[1]
//result := int(binary.BigEndian.Uint16(contentLen))
//log.Println("rtp payload audio ~~~~~~~~~~~~ : ", rtpInfo.PayloadType, " rtpPayload: ", len(rtpInfo.Payload), " ADTS:", result)
}
//log.Println("rtp data == rtpLen:", rtpLen, "\treadSize-4:", readSize, "\tequal?", rtpLen == readSize-4, " rtpVersion: ", rtpInfo.Version, " rtpExtension: ", rtpInfo.Extension, " rtpPayloadType: ", rtpInfo.PayloadType," rtpPayload: ", len(rtpInfo.Payload) )
}
if (rtpLen + 4) == readSize {
//log.Println("same size~~~")
} else {
if 0x24 == recvData[rtpLen+4] {
//log.Println("rtp data 粘包了了了了了了了") // rtsp协议头 4 字节
contentLen := make([]byte, 2)
contentLen[0] = recvData[rtpLen+4+2]
contentLen[1] = recvData[rtpLen+4+3]
rtpLen2 := int(binary.BigEndian.Uint16(contentLen))
rtpInfo := ParseRTP(recvData[rtpLen+4+4 : rtpLen+4+rtpLen2+4]) // rtp协议头 16 字节, 分片时注意每次rtsp都会占用4个字节
if 72 == rtpInfo.PayloadType {
log.Println("Sender Report~~~~~~~~~~~~ : ", rtpInfo.PayloadType)
}
//log.Println("rtp data == rtpLen2:", rtpLen2, "\treadSize-4:", readSize -rtpLen-4 , " rtpVersion: ", rtpInfo.Version, " rtpExtension: ", rtpInfo.Extension, " rtpPayloadType: ", rtpInfo.PayloadType," rtpPayload: ", len(rtpInfo.Payload) )
}
//log.Println("not same size~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~=============================================================================================")
}
counter += readSize - 4
if rtpLen == counter {
//log.Println("a rtp pack Read OK, Recv sum size :", rtpLen)
rtpLen = 0
counter = 0
}
var size int = 0
for size > readSize {
if 0x24 == recvData[size] {
contentLen := make([]byte, 2)
contentLen[0] = recvData[size+2]
contentLen[1] = recvData[size+3]
rtpLen = int(binary.BigEndian.Uint16(contentLen))
log.Println("rtp data??? rtpLen:", rtpLen, "\treadSize-4:", readSize-4, "\tequal?", rtpLen == readSize-4)
}
size += 1
}
/*
if 0xc8 == recvData[5] {
log.Println("Sender Report from Server")
}
if 28 == int16(recvData[3]) {
log.Println("Read OK, Recv size :", readSize, " 首字节:", uint8(recvData[0]), " channel:", uint8(recvData[1]), " package type:", uint16(recvData[3]))
}
*/
}
}
return string(recvData)
}
type Request struct {
Method string
URL string
Version string
Header map[string]string
Content string
Body string
}
func NewRequest(content string) *Request {
lines := strings.Split(strings.TrimSpace(content), "\r\n")
if len(lines) == 0 {
return nil
}
items := regexp.MustCompile("\\s+").Split(strings.TrimSpace(lines[0]), -1) /* \s+:匹配任意多个上面的字符 */
if len(items) < 3 {
return nil
}
if !strings.HasPrefix(items[2], "RTSP") {
log.Printf("invalid rtsp request, line[0] %s", lines[0])
return nil
}
header := make(map[string]string)
for i := 1; i < len(lines); i++ {
line := strings.TrimSpace(lines[i])
headerItems := regexp.MustCompile(":\\s+").Split(line, 2)
if len(headerItems) < 2 {
continue
}
header[headerItems[0]] = headerItems[1]
}
return &Request{
Method: items[0],
URL: items[1],
Version: items[2],
Header: header,
Content: content,
Body: "",
}
}
func getIpFromUrl(url string) string {
m_ip := strings.Split(url, "/")
log.Println(m_ip[2])
return m_ip[2]
}
const (
RTP_FIXED_HEADER_LENGTH = 12
)
type RTPInfo struct {
Version int
Padding bool
Extension bool
CSRCCnt int
Marker bool
PayloadType int
SequenceNumber int
Timestamp int
SSRC int
Payload []byte
}
func ParseRTP(rtpBytes []byte) *RTPInfo {
if len(rtpBytes) < RTP_FIXED_HEADER_LENGTH {
return nil
}
firstByte := rtpBytes[0]
secondByte := rtpBytes[1]
info := &RTPInfo{
Version: int(firstByte >> 6),
Padding: (firstByte>>5)&1 == 1,
Extension: (firstByte>>4)&1 == 1,
CSRCCnt: int(firstByte & 0x0f),
Marker: secondByte>>7 == 1,
PayloadType: int(secondByte & 0x7f),
SequenceNumber: int(binary.BigEndian.Uint16(rtpBytes[2:])),
Timestamp: int(binary.BigEndian.Uint32(rtpBytes[4:])),
SSRC: int(binary.BigEndian.Uint32(rtpBytes[8:])),
}
offset := RTP_FIXED_HEADER_LENGTH
end := len(rtpBytes)
if end-offset >= 4*info.CSRCCnt {
offset += 4 * info.CSRCCnt
}
if info.Extension && end-offset >= 4 {
extLen := 4 * int(binary.BigEndian.Uint16(rtpBytes[offset+2:]))
offset += 4
if end-offset >= extLen {
offset += extLen
}
}
if info.Padding && end-offset > 0 {
paddingLen := int(rtpBytes[end-1])
if end-offset >= paddingLen {
end -= paddingLen
}
}
info.Payload = rtpBytes[offset:end]
return info
}
var m_rtspUrl = flag.String("url", "rtsp url", "rtsp 地址,如:rtsp://127.0.0.1/stream")
/* go run .\rtsp.go --url=rtsp://192.168.1.61:554/test1 */
func main() {
flag.Parse()
var inputRtspUrl, serverIp string
if "rtsp url" == *m_rtspUrl {
fmt.Print("rtsp addr:")
fmt.Scan(&inputRtspUrl)
serverIp = getIpFromUrl(inputRtspUrl)
rtspUrlIn = inputRtspUrl
} else {
serverIp = getIpFromUrl(*m_rtspUrl)
rtspUrlIn = *m_rtspUrl
}
log.SetPrefix("[rtsp] ")
log.SetFlags(log.Lshortfile | log.LstdFlags)
//conn, err := net.Dial("tcp", "192.168.1.61:554")
conn, err := net.Dial("tcp", serverIp)
if err != nil {
fmt.Printf("connect failed, err : %v\n", err.Error())
return
}
defer conn.Close()
// OPTIONS
recvdata := sendAndRecv(conn, option())
/*
req := NewRequest(option())
log.Println(req.Header)
log.Println(req.URL)
log.Println(req.Content)
log.Println(req.Method)
*/
// 获取 Session ID
s := strings.Split(recvdata, "\r\n")
var Sessionid string
for i := range s {
if strings.Contains(s[i], "Session") {
s = strings.Split(s[i], " ")
//fmt.Println(s[1])
Sessionid = s[1]
break
}
}
// DESCRIBE
if strings.Contains(sendAndRecv(conn, describe()), "404") {
log.Println("流不存在\r\n")
sendAndRecv(conn, teardown(Sessionid))
//time.Sleep(3 * time.Second)
return
}
// SETUP
sendAndRecv(conn, setup())
// PLAY
sendAndRecv(conn, play(Sessionid))
// TEARDOWN
defer sendAndRecv(conn, teardown(Sessionid))
// rtp data
Recv(conn)
//time.Sleep(5 * time.Second)
//conn.SetReadDeadline(time.Now().Add(time.Duration(2) * time.Second))
}