123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327 |
- package rtmp
- import (
- "bufio"
- "encoding/binary"
- "errors"
- "io"
- "net"
- "runtime"
- "sync/atomic"
- "go.uber.org/zap"
- "m7s.live/engine/v4/util"
- )
- const (
- SEND_CHUNK_SIZE_MESSAGE = "Send Chunk Size Message"
- SEND_ACK_MESSAGE = "Send Acknowledgement Message"
- SEND_ACK_WINDOW_SIZE_MESSAGE = "Send Window Acknowledgement Size Message"
- SEND_SET_PEER_BANDWIDTH_MESSAGE = "Send Set Peer Bandwidth Message"
- SEND_STREAM_BEGIN_MESSAGE = "Send Stream Begin Message"
- SEND_SET_BUFFER_LENGTH_MESSAGE = "Send Set Buffer Lengh Message"
- SEND_STREAM_IS_RECORDED_MESSAGE = "Send Stream Is Recorded Message"
- SEND_PING_REQUEST_MESSAGE = "Send Ping Request Message"
- SEND_PING_RESPONSE_MESSAGE = "Send Ping Response Message"
- SEND_CONNECT_MESSAGE = "Send Connect Message"
- SEND_CONNECT_RESPONSE_MESSAGE = "Send Connect Response Message"
- SEND_CREATE_STREAM_MESSAGE = "Send Create Stream Message"
- SEND_PLAY_MESSAGE = "Send Play Message"
- SEND_PLAY_RESPONSE_MESSAGE = "Send Play Response Message"
- SEND_PUBLISH_RESPONSE_MESSAGE = "Send Publish Response Message"
- SEND_PUBLISH_START_MESSAGE = "Send Publish Start Message"
- SEND_UNPUBLISH_RESPONSE_MESSAGE = "Send Unpublish Response Message"
- SEND_AUDIO_MESSAGE = "Send Audio Message"
- SEND_FULL_AUDIO_MESSAGE = "Send Full Audio Message"
- SEND_VIDEO_MESSAGE = "Send Video Message"
- SEND_FULL_VDIEO_MESSAGE = "Send Full Video Message"
- )
- type NetConnection struct {
- *bufio.Reader `json:"-" yaml:"-"`
- net.Conn `json:"-" yaml:"-"`
- bandwidth uint32
- readSeqNum uint32 // 当前读的字节
- writeSeqNum uint32 // 当前写的字节
- totalWrite uint32 // 总共写了多少字节
- totalRead uint32 // 总共读了多少字节
- writeChunkSize int
- readChunkSize int
- incommingChunks map[uint32]*Chunk
- objectEncoding float64
- appName string
- tmpBuf util.Buffer //用来接收/发送小数据,复用内存
- chunkHeader util.Buffer
- bytePool util.BytesPool
- writing atomic.Bool // false 可写,true 不可写
- }
- func NewNetConnection(conn net.Conn) *NetConnection {
- return &NetConnection{
- Conn: conn,
- Reader: bufio.NewReader(conn),
- writeChunkSize: RTMP_DEFAULT_CHUNK_SIZE,
- readChunkSize: RTMP_DEFAULT_CHUNK_SIZE,
- incommingChunks: make(map[uint32]*Chunk),
- bandwidth: RTMP_MAX_CHUNK_SIZE << 3,
- tmpBuf: make(util.Buffer, 4),
- chunkHeader: make(util.Buffer, 0, 16),
- bytePool: make(util.BytesPool, 17),
- }
- }
- func (conn *NetConnection) ReadFull(buf []byte) (n int, err error) {
- n, err = io.ReadFull(conn.Reader, buf)
- if err == nil {
- conn.readSeqNum += uint32(n)
- }
- return
- }
- func (conn *NetConnection) SendStreamID(eventType uint16, streamID uint32) (err error) {
- return conn.SendMessage(RTMP_MSG_USER_CONTROL, &StreamIDMessage{UserControlMessage{EventType: eventType}, streamID})
- }
- func (conn *NetConnection) SendUserControl(eventType uint16) error {
- return conn.SendMessage(RTMP_MSG_USER_CONTROL, &UserControlMessage{EventType: eventType})
- }
- func (conn *NetConnection) ResponseCreateStream(tid uint64, streamID uint32) error {
- m := &ResponseCreateStreamMessage{}
- m.CommandName = Response_Result
- m.TransactionId = tid
- m.StreamId = streamID
- return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
- }
- // func (conn *NetConnection) SendCommand(message string, args any) error {
- // switch message {
- // // case SEND_SET_BUFFER_LENGTH_MESSAGE:
- // // if args != nil {
- // // return errors.New(SEND_SET_BUFFER_LENGTH_MESSAGE + ", The parameter is nil")
- // // }
- // // m := new(SetBufferMessage)
- // // m.EventType = RTMP_USER_SET_BUFFLEN
- // // m.Millisecond = 100
- // // m.StreamID = conn.streamID
- // // return conn.writeMessage(RTMP_MSG_USER_CONTROL, m)
- // }
- // return errors.New("send message no exist")
- // }
- func (conn *NetConnection) readChunk() (msg *Chunk, err error) {
- head, err := conn.ReadByte()
- if err != nil {
- return nil, err
- }
- conn.readSeqNum++
- ChunkStreamID := uint32(head & 0x3f) // 0011 1111
- ChunkType := head >> 6 // 1100 0000
- // 如果块流ID为0,1的话,就需要计算.
- ChunkStreamID, err = conn.readChunkStreamID(ChunkStreamID)
- if err != nil {
- return nil, errors.New("get chunk stream id error :" + err.Error())
- }
- // println("ChunkStreamID:", ChunkStreamID, "ChunkType:", ChunkType)
- chunk, ok := conn.incommingChunks[ChunkStreamID]
- if ChunkType != 3 && ok && chunk.AVData.Length > 0 {
- // 如果块类型不为3,那么这个rtmp的body应该为空.
- return nil, errors.New("incompleteRtmpBody error")
- }
- if !ok {
- chunk = &Chunk{}
- conn.incommingChunks[ChunkStreamID] = chunk
- }
- if err = conn.readChunkType(&chunk.ChunkHeader, ChunkType); err != nil {
- return nil, errors.New("get chunk type error :" + err.Error())
- }
- msgLen := int(chunk.MessageLength)
- needRead := conn.readChunkSize
- if unRead := msgLen - chunk.AVData.ByteLength; unRead < needRead {
- needRead = unRead
- }
- mem := conn.bytePool.Get(needRead)
- if n, err := conn.ReadFull(mem.Value); err != nil {
- mem.Recycle()
- return nil, err
- } else {
- conn.readSeqNum += uint32(n)
- }
- if chunk.AVData.Push(mem); chunk.AVData.ByteLength == msgLen {
- chunk.ChunkHeader.ExtendTimestamp += chunk.ChunkHeader.Timestamp
- msg = chunk
- switch chunk.MessageTypeID {
- case RTMP_MSG_AUDIO, RTMP_MSG_VIDEO:
- default:
- err = GetRtmpMessage(msg, msg.AVData.ToBytes())
- msg.AVData.Recycle()
- }
- conn.incommingChunks[ChunkStreamID] = &Chunk{
- ChunkHeader: chunk.ChunkHeader,
- }
- }
- return
- }
- func (conn *NetConnection) readChunkStreamID(csid uint32) (chunkStreamID uint32, err error) {
- chunkStreamID = csid
- switch csid {
- case 0:
- {
- u8, err := conn.ReadByte()
- conn.readSeqNum++
- if err != nil {
- return 0, err
- }
- chunkStreamID = 64 + uint32(u8)
- }
- case 1:
- {
- u16_0, err1 := conn.ReadByte()
- if err1 != nil {
- return 0, err1
- }
- u16_1, err1 := conn.ReadByte()
- if err1 != nil {
- return 0, err1
- }
- conn.readSeqNum += 2
- chunkStreamID = 64 + uint32(u16_0) + (uint32(u16_1) << 8)
- }
- }
- return chunkStreamID, nil
- }
- func (conn *NetConnection) readChunkType(h *ChunkHeader, chunkType byte) (err error) {
- conn.tmpBuf.Reset()
- b4 := conn.tmpBuf.Malloc(4)
- b3 := b4[:3]
- if chunkType == 3 {
- // 3个字节的时间戳
- } else {
- // Timestamp 3 bytes
- if _, err = conn.ReadFull(b3); err != nil {
- return err
- }
- util.GetBE(b3, &h.Timestamp)
- if chunkType != 2 {
- if _, err = conn.ReadFull(b3); err != nil {
- return err
- }
- util.GetBE(b3, &h.MessageLength)
- // Message Type ID 1 bytes
- if h.MessageTypeID, err = conn.ReadByte(); err != nil {
- return err
- }
- conn.readSeqNum++
- if chunkType == 0 {
- // Message Stream ID 4bytes
- if _, err = conn.ReadFull(b4); err != nil { // 读取Message Stream ID
- return err
- }
- h.MessageStreamID = binary.LittleEndian.Uint32(b4)
- }
- }
- }
- // ExtendTimestamp 4 bytes
- if h.Timestamp == 0xffffff { // 对于type 0的chunk,绝对时间戳在这里表示,如果时间戳值大于等于0xffffff(16777215),该值必须是0xffffff,且时间戳扩展字段必须发送,其他情况没有要求
- if _, err = conn.ReadFull(b4); err != nil {
- return err
- }
- util.GetBE(b4, &h.Timestamp)
- }
- if chunkType == 0 {
- h.ExtendTimestamp = h.Timestamp
- h.Timestamp = 0
- }
- return nil
- }
- func (conn *NetConnection) RecvMessage() (msg *Chunk, err error) {
- if conn.readSeqNum >= conn.bandwidth {
- conn.totalRead += conn.readSeqNum
- conn.readSeqNum = 0
- err = conn.SendMessage(RTMP_MSG_ACK, Uint32Message(conn.totalRead))
- }
- for msg == nil && err == nil {
- if msg, err = conn.readChunk(); msg != nil && err == nil {
- switch msg.MessageTypeID {
- case RTMP_MSG_CHUNK_SIZE:
- conn.readChunkSize = int(msg.MsgData.(Uint32Message))
- RTMPPlugin.Info("msg read chunk size", zap.Int("readChunkSize", conn.readChunkSize))
- case RTMP_MSG_ABORT:
- delete(conn.incommingChunks, uint32(msg.MsgData.(Uint32Message)))
- case RTMP_MSG_ACK, RTMP_MSG_EDGE:
- case RTMP_MSG_USER_CONTROL:
- if _, ok := msg.MsgData.(*PingRequestMessage); ok {
- conn.SendUserControl(RTMP_USER_PING_RESPONSE)
- }
- case RTMP_MSG_ACK_SIZE:
- conn.bandwidth = uint32(msg.MsgData.(Uint32Message))
- case RTMP_MSG_BANDWIDTH:
- conn.bandwidth = msg.MsgData.(*SetPeerBandwidthMessage).AcknowledgementWindowsize
- case RTMP_MSG_AMF0_COMMAND, RTMP_MSG_AUDIO, RTMP_MSG_VIDEO:
- return msg, err
- }
- }
- }
- return
- }
- func (conn *NetConnection) SendMessage(t byte, msg RtmpMessage) (err error) {
- if conn == nil {
- return errors.New("connection is nil")
- }
- if conn.writeSeqNum > conn.bandwidth {
- conn.totalWrite += conn.writeSeqNum
- conn.writeSeqNum = 0
- err = conn.SendMessage(RTMP_MSG_ACK, Uint32Message(conn.totalWrite))
- err = conn.SendStreamID(RTMP_USER_PING_REQUEST, 0)
- }
- for !conn.writing.CompareAndSwap(false, true) {
- runtime.Gosched()
- }
- defer conn.writing.Store(false)
- conn.tmpBuf.Reset()
- amf := util.AMF{conn.tmpBuf}
- if conn.objectEncoding == 0 {
- msg.Encode(&amf)
- } else {
- amf := util.AMF3{AMF: amf}
- msg.Encode(&amf)
- }
- conn.tmpBuf = amf.Buffer
- head := newChunkHeader(t)
- head.MessageLength = uint32(conn.tmpBuf.Len())
- if sid, ok := msg.(HaveStreamID); ok {
- head.MessageStreamID = sid.GetStreamID()
- }
- head.WriteTo(RTMP_CHUNK_HEAD_12, &conn.chunkHeader)
- for _, chunk := range conn.tmpBuf.Split(conn.writeChunkSize) {
- conn.sendChunk(chunk)
- }
- return nil
- }
- func (conn *NetConnection) sendChunk(writeBuffer ...[]byte) error {
- if n, err := conn.Write(conn.chunkHeader); err != nil {
- return err
- } else {
- conn.writeSeqNum += uint32(n)
- }
- buf := net.Buffers(writeBuffer)
- n, err := buf.WriteTo(conn)
- conn.writeSeqNum += uint32(n)
- return err
- }
|