rtsp拉流接收数据包

注:setup包含音视频时要setup 2次,且setup 的路径要对的上a=control:trackID=0
在这里插入图片描述

/*	
 * auth    : dujingning
 * date    : 2021.09.12
 * license : MIT
*/
package main

import (
	"encoding/binary"
	"flag"
	"fmt"
	"log"
	"net"
	"regexp"
	"strings"
	"time"
)

const (
	version   = "RTSP/1.0"
	UserAgent = "User-Agent: djav1.0.0"
	rtspUrl   = "rtsp://192.168.1.61:554/test"
	cseq      = "CSeq: "
	Accept    = "Accept: application/sdp"
	Transport = "Transport: RTP/AVP/TCP;unicast;interleaved=0-1"
	Range     = "Range: npt=0.000-"
	Session   = "Session: "
)

const (
	DESCRIBE      = "DESCRIBE"
	ANNOUNCE      = "ANNOUNCE"
	GET_PARAMETER = "GET_PARAMETER"
	OPTIONS       = "OPTIONS"
	PAUSE         = "PAUSE"
	PLAY          = "PLAY"
	RECORD        = "RECORD"
	REDIRECT      = "REDIRECT"
	SETUP         = "SETUP"
	SET_PARAMETER = "SET_PARAMETER"
	TEARDOWN      = "TEARDOWN"
)

var rtspUrlIn string

func option() string {
	return OPTIONS + " " + rtspUrlIn + " " + version + "\r\n" +
		cseq + "1" + "\r\n" +
		UserAgent + "\r\n\r\n"
}

func describe() string {
	return DESCRIBE + " " + rtspUrlIn + " " + version + "\r\n" +
		cseq + "2" + "\r\n" +
		UserAgent + "\r\n" +
		Accept + "\r\n\r\n"
}

func setup() string {
	return SETUP + " " + rtspUrlIn + "/streamid=0 " + version + "\r\n" +
		cseq + "3" + "\r\n" +
		UserAgent + "\r\n" +
		Transport + "\r\n\r\n"
}

func play(sessionid string) string {
	return PLAY + " " + rtspUrlIn + " " + version + "\r\n" +
		cseq + "4" + "\r\n" +
		UserAgent + "\r\n" +
		Session + sessionid + "\r\n" +
		Range + "\r\n\r\n"
}

func teardown(sessionid string) string {
	return TEARDOWN + " " + rtspUrlIn + " " + version + "\r\n" +
		cseq + "4" + "\r\n" +
		UserAgent + "\r\n" +
		Session + sessionid + "\r\n\r\n"
}

func sendAndRecv(conn net.Conn, trimmedInput string) string {
	//发送数据
	if _, err := conn.Write([]byte(trimmedInput)); err != nil {
		fmt.Printf("write failed , err : %v\n", err)
		return ""
	}
	fmt.Println(trimmedInput)

	//接受数据
	var recvData = make([]byte, 1024)
	if _, err := conn.Read(recvData); err != nil {
		fmt.Printf("Read failed , err : %v\n", err)
		return ""
	}
	fmt.Println(string(recvData))

	return string(recvData)
}

func Recv(conn net.Conn) string {
	//接受数据
	var recvData = make([]byte, 1024*1024)
	var sumData uint64 = 0

	timeStart := time.Now().Unix()
	defer func() {
		log.Println("Time Out, Fully Recved size :", float64(sumData)/float64(1024*1024), "MB / ", float64(sumData)/float64(1024), "KB   ", float64(sumData)/float64(1024)/float64((time.Now().Unix()-timeStart)), "  KB/s", "   timeSpend:", time.Now().Unix()-timeStart)
		if p := recover(); p != nil {
			fmt.Printf("panic: %s\n", p)
		}
	}()

	var rtpLen, counter int
	rtpLen = 0
	counter = 0
	for {

		/* 读取会出现粘包,多个rtp包在同一个rtsp头下面 */
		if readSize, err := conn.Read(recvData); err != nil {
			conn.SetReadDeadline(time.Now().Add(time.Duration(3) * time.Second))
			log.Printf("Read failed , err : %v\n", err)
			//break
		} else {

			sumData += uint64(readSize)
			conn.SetReadDeadline(time.Now().Add(time.Duration(10) * time.Second))

			//log.Println("Read OK, Recv size :", readSize)
			if 0x24 == recvData[0] {
				//log.Println("rtp data")					// rtsp协议头 4 字节
				contentLen := make([]byte, 2)
				contentLen[0] = recvData[2]
				contentLen[1] = recvData[3]

				rtpLen = int(binary.BigEndian.Uint16(contentLen))
				rtpInfo := ParseRTP(recvData[4 : rtpLen+4]) // rtp协议头 16 字节

				/*

						The RTP header has the following format:

						0                   1                   2                   3
						0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
					   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
					   |V=2|P|X|  CC   |M|     PT      |       sequence number         |
					   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
					   |                           timestamp                           |
					   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
					   |           synchronization source (SSRC) identifier            |
					   +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
					   |            contributing source (CSRC) identifiers             |
					   |                             ....                              |
					   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

				*/

				log.Println("version:", recvData[4]>>6, " pt:", (recvData[5]<<1)>>1, " X", (recvData[4]>>4)&0x01, " rtpPayload: ", len(rtpInfo.Payload))
				//log.Println("version:", rtpInfo.Version, " payloadType:", rtpInfo.PayloadType, " timestamp:", rtpInfo.Timestamp)

				if 72 == rtpInfo.PayloadType {
					log.Println("Sender Report----------  : ", rtpInfo.PayloadType)
				}

				if 97 == rtpInfo.PayloadType {
					contentLen := make([]byte, 2)
					contentLen[0] = recvData[0]
					contentLen[1] = recvData[1]
					//result := int(binary.BigEndian.Uint16(contentLen))
					//log.Println("rtp payload audio ~~~~~~~~~~~~  : ", rtpInfo.PayloadType, " rtpPayload: ", len(rtpInfo.Payload), " ADTS:", result)
				}

				//log.Println("rtp data  == rtpLen:", rtpLen, "\treadSize-4:", readSize, "\tequal?", rtpLen == readSize-4, " rtpVersion: ", rtpInfo.Version, " rtpExtension: ", rtpInfo.Extension, " rtpPayloadType: ", rtpInfo.PayloadType," rtpPayload: ", len(rtpInfo.Payload) )
			}

			if (rtpLen + 4) == readSize {
				//log.Println("same size~~~")
			} else {

				if 0x24 == recvData[rtpLen+4] {
					//log.Println("rtp data 粘包了了了了了了了")					// rtsp协议头 4 字节
					contentLen := make([]byte, 2)
					contentLen[0] = recvData[rtpLen+4+2]
					contentLen[1] = recvData[rtpLen+4+3]

					rtpLen2 := int(binary.BigEndian.Uint16(contentLen))
					rtpInfo := ParseRTP(recvData[rtpLen+4+4 : rtpLen+4+rtpLen2+4]) // rtp协议头 16 字节, 分片时注意每次rtsp都会占用4个字节

					if 72 == rtpInfo.PayloadType {
						log.Println("Sender Report~~~~~~~~~~~~  : ", rtpInfo.PayloadType)
					}

					//log.Println("rtp data  == rtpLen2:", rtpLen2, "\treadSize-4:", readSize -rtpLen-4 , " rtpVersion: ", rtpInfo.Version, " rtpExtension: ", rtpInfo.Extension, " rtpPayloadType: ", rtpInfo.PayloadType," rtpPayload: ", len(rtpInfo.Payload) )
				}

				//log.Println("not same size~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~=============================================================================================")
			}

			counter += readSize - 4
			if rtpLen == counter {
				//log.Println("a rtp pack Read OK, Recv sum size :", rtpLen)

				rtpLen = 0
				counter = 0
			}

			var size int = 0
			for size > readSize {
				if 0x24 == recvData[size] {
					contentLen := make([]byte, 2)
					contentLen[0] = recvData[size+2]
					contentLen[1] = recvData[size+3]
					rtpLen = int(binary.BigEndian.Uint16(contentLen))
					log.Println("rtp data???    rtpLen:", rtpLen, "\treadSize-4:", readSize-4, "\tequal?", rtpLen == readSize-4)
				}
				size += 1
			}
			/*
				if 0xc8 == recvData[5] {
					log.Println("Sender Report from Server")
				}

				if 28 == int16(recvData[3]) {

					log.Println("Read OK, Recv size :", readSize, "   首字节:", uint8(recvData[0]), "  channel:", uint8(recvData[1]), "  package type:", uint16(recvData[3]))
				}
			*/
		}
	}

	return string(recvData)
}

type Request struct {
	Method  string
	URL     string
	Version string
	Header  map[string]string
	Content string
	Body    string
}

func NewRequest(content string) *Request {
	lines := strings.Split(strings.TrimSpace(content), "\r\n")
	if len(lines) == 0 {
		return nil
	}
	items := regexp.MustCompile("\\s+").Split(strings.TrimSpace(lines[0]), -1) /*  \s+:匹配任意多个上面的字符 */
	if len(items) < 3 {
		return nil
	}
	if !strings.HasPrefix(items[2], "RTSP") {
		log.Printf("invalid rtsp request, line[0] %s", lines[0])
		return nil
	}
	header := make(map[string]string)
	for i := 1; i < len(lines); i++ {
		line := strings.TrimSpace(lines[i])
		headerItems := regexp.MustCompile(":\\s+").Split(line, 2)
		if len(headerItems) < 2 {
			continue
		}
		header[headerItems[0]] = headerItems[1]
	}
	return &Request{
		Method:  items[0],
		URL:     items[1],
		Version: items[2],
		Header:  header,
		Content: content,
		Body:    "",
	}
}

func getIpFromUrl(url string) string {
	m_ip := strings.Split(url, "/")
	log.Println(m_ip[2])
	return m_ip[2]
}

const (
	RTP_FIXED_HEADER_LENGTH = 12
)

type RTPInfo struct {
	Version        int
	Padding        bool
	Extension      bool
	CSRCCnt        int
	Marker         bool
	PayloadType    int
	SequenceNumber int
	Timestamp      int
	SSRC           int
	Payload        []byte
}

func ParseRTP(rtpBytes []byte) *RTPInfo {
	if len(rtpBytes) < RTP_FIXED_HEADER_LENGTH {
		return nil
	}
	firstByte := rtpBytes[0]
	secondByte := rtpBytes[1]
	info := &RTPInfo{
		Version:   int(firstByte >> 6),
		Padding:   (firstByte>>5)&1 == 1,
		Extension: (firstByte>>4)&1 == 1,
		CSRCCnt:   int(firstByte & 0x0f),

		Marker:         secondByte>>7 == 1,
		PayloadType:    int(secondByte & 0x7f),
		SequenceNumber: int(binary.BigEndian.Uint16(rtpBytes[2:])),
		Timestamp:      int(binary.BigEndian.Uint32(rtpBytes[4:])),
		SSRC:           int(binary.BigEndian.Uint32(rtpBytes[8:])),
	}
	offset := RTP_FIXED_HEADER_LENGTH
	end := len(rtpBytes)
	if end-offset >= 4*info.CSRCCnt {
		offset += 4 * info.CSRCCnt
	}
	if info.Extension && end-offset >= 4 {
		extLen := 4 * int(binary.BigEndian.Uint16(rtpBytes[offset+2:]))
		offset += 4
		if end-offset >= extLen {
			offset += extLen
		}
	}
	if info.Padding && end-offset > 0 {
		paddingLen := int(rtpBytes[end-1])
		if end-offset >= paddingLen {
			end -= paddingLen
		}
	}
	info.Payload = rtpBytes[offset:end]
	return info
}

var m_rtspUrl = flag.String("url", "rtsp url", "rtsp 地址,如:rtsp://127.0.0.1/stream")

/* go run .\rtsp.go --url=rtsp://192.168.1.61:554/test1 */
func main() {
	flag.Parse()
	var inputRtspUrl, serverIp string
	if "rtsp url" == *m_rtspUrl {
		fmt.Print("rtsp addr:")
		fmt.Scan(&inputRtspUrl)

		serverIp = getIpFromUrl(inputRtspUrl)
		rtspUrlIn = inputRtspUrl
	} else {
		serverIp = getIpFromUrl(*m_rtspUrl)
		rtspUrlIn = *m_rtspUrl
	}

	log.SetPrefix("[rtsp] ")
	log.SetFlags(log.Lshortfile | log.LstdFlags)

	//conn, err := net.Dial("tcp", "192.168.1.61:554")
	conn, err := net.Dial("tcp", serverIp)
	if err != nil {
		fmt.Printf("connect failed, err : %v\n", err.Error())
		return
	}
	defer conn.Close()

	// OPTIONS
	recvdata := sendAndRecv(conn, option())

	/*
		req := NewRequest(option())
		log.Println(req.Header)
		log.Println(req.URL)
		log.Println(req.Content)
		log.Println(req.Method)
	*/

	// 获取 Session ID
	s := strings.Split(recvdata, "\r\n")
	var Sessionid string
	for i := range s {
		if strings.Contains(s[i], "Session") {
			s = strings.Split(s[i], " ")
			//fmt.Println(s[1])
			Sessionid = s[1]
			break
		}
	}

	// DESCRIBE
	if strings.Contains(sendAndRecv(conn, describe()), "404") {
		log.Println("流不存在\r\n")
		sendAndRecv(conn, teardown(Sessionid))
		//time.Sleep(3 * time.Second)
		return
	}

	// SETUP
	sendAndRecv(conn, setup())

	// PLAY
	sendAndRecv(conn, play(Sessionid))

	// TEARDOWN
	defer sendAndRecv(conn, teardown(Sessionid))

	// rtp data
	Recv(conn)

	//time.Sleep(5 * time.Second)

	//conn.SetReadDeadline(time.Now().Add(time.Duration(2) * time.Second))

}