123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- package rtmp
- import (
- "errors"
- "net"
- "runtime"
- "go.uber.org/zap"
- . "m7s.live/engine/v4"
- "m7s.live/engine/v4/common"
- )
- type AVSender struct {
- *RTMPSender
- ChunkHeader
- firstSent bool
- }
- func (av *AVSender) sendSequenceHead(seqHead []byte) {
- av.SetTimestamp(0)
- av.MessageLength = uint32(len(seqHead))
- for !av.writing.CompareAndSwap(false, true) {
- runtime.Gosched()
- }
- defer av.writing.Store(false)
- if av.firstSent {
- av.WriteTo(RTMP_CHUNK_HEAD_8, &av.chunkHeader)
- } else {
- av.WriteTo(RTMP_CHUNK_HEAD_12, &av.chunkHeader)
- }
- av.sendChunk(seqHead)
- }
- func (av *AVSender) sendFrame(frame *common.AVFrame, absTime uint32) (err error) {
- seq := frame.Sequence
- payloadLen := frame.AVCC.ByteLength
- if payloadLen == 0 {
- err := errors.New("payload is empty")
- av.Error("payload is empty", zap.Error(err))
- return err
- }
- if av.writeSeqNum > av.bandwidth {
- av.totalWrite += av.writeSeqNum
- av.writeSeqNum = 0
- av.SendMessage(RTMP_MSG_ACK, Uint32Message(av.totalWrite))
- av.SendStreamID(RTMP_USER_PING_REQUEST, 0)
- }
- av.MessageLength = uint32(payloadLen)
- for !av.writing.CompareAndSwap(false, true) {
- runtime.Gosched()
- }
- defer av.writing.Store(false)
- // 第一次是发送关键帧,需要完整的消息头(Chunk Basic Header(1) + Chunk Message Header(11) + Extended Timestamp(4)(可能会要包括))
- // 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7))
- // 当Chunk Type为0时(即Chunk12),
- if !av.firstSent {
- av.firstSent = true
- av.SetTimestamp(absTime)
- av.WriteTo(RTMP_CHUNK_HEAD_12, &av.chunkHeader)
- } else {
- av.SetTimestamp(frame.DeltaTime)
- av.WriteTo(RTMP_CHUNK_HEAD_8, &av.chunkHeader)
- }
- //数据被覆盖导致序号变了
- if seq != frame.Sequence {
- return errors.New("sequence is not equal")
- }
- r := frame.AVCC.NewReader()
- chunk := net.Buffers{av.chunkHeader}
- av.writeSeqNum += uint32(av.chunkHeader.Len() + r.WriteNTo(av.writeChunkSize, &chunk))
- for r.CanRead() {
- item := av.bytePool.Get(16)
- defer item.Recycle()
- av.WriteTo(RTMP_CHUNK_HEAD_1, &item.Value)
- // 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1))
- chunk = append(chunk, item.Value)
- av.writeSeqNum += uint32(item.Value.Len() + r.WriteNTo(av.writeChunkSize, &chunk))
- }
- _, err = chunk.WriteTo(av.Conn)
- return nil
- }
- type RTMPSender struct {
- Subscriber
- NetStream
- audio, video AVSender
- }
- func (rtmp *RTMPSender) OnEvent(event any) {
- switch v := event.(type) {
- case SEwaitPublish:
- rtmp.Response(1, NetStream_Play_UnpublishNotify, Response_OnStatus)
- case SEpublish:
- rtmp.Response(1, NetStream_Play_PublishNotify, Response_OnStatus)
- case ISubscriber:
- rtmp.audio.RTMPSender = rtmp
- rtmp.video.RTMPSender = rtmp
- rtmp.audio.ChunkStreamID = RTMP_CSID_AUDIO
- rtmp.video.ChunkStreamID = RTMP_CSID_VIDEO
- rtmp.audio.MessageTypeID = RTMP_MSG_AUDIO
- rtmp.video.MessageTypeID = RTMP_MSG_VIDEO
- rtmp.audio.MessageStreamID = rtmp.StreamID
- rtmp.video.MessageStreamID = rtmp.StreamID
- case AudioDeConf:
- rtmp.audio.sendSequenceHead(v)
- case VideoDeConf:
- rtmp.video.sendSequenceHead(v)
- case AudioFrame:
- if err := rtmp.audio.sendFrame(v.AVFrame, v.AbsTime); err != nil {
- rtmp.Stop(zap.Error(err))
- }
- case VideoFrame:
- if err := rtmp.video.sendFrame(v.AVFrame, v.AbsTime); err != nil {
- rtmp.Stop(zap.Error(err))
- }
- default:
- rtmp.Subscriber.OnEvent(event)
- }
- }
- func (r *RTMPSender) Response(tid uint64, code, level string) error {
- m := new(ResponsePlayMessage)
- m.CommandName = Response_OnStatus
- m.TransactionId = tid
- m.Infomation = map[string]any{
- "code": code,
- "level": level,
- "description": "",
- }
- m.StreamID = r.StreamID
- return r.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
- }
- type RTMPReceiver struct {
- Publisher
- NetStream
- }
- func (r *RTMPReceiver) OnEvent(event any) {
- switch event.(type) {
- case IPublisher:
- if r.AudioTrack != nil {
- r.AudioTrack.SetStuff(r.bytePool)
- }
- if r.VideoTrack != nil {
- r.VideoTrack.SetStuff(r.bytePool)
- }
- }
- r.Publisher.OnEvent(event)
- }
- func (r *RTMPReceiver) Response(tid uint64, code, level string) error {
- m := new(ResponsePublishMessage)
- m.CommandName = Response_OnStatus
- m.TransactionId = tid
- m.Infomation = map[string]any{
- "code": code,
- "level": level,
- "description": "",
- }
- m.StreamID = r.StreamID
- return r.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
- }
- func (r *RTMPReceiver) ReceiveAudio(msg *Chunk) {
- if r.AudioTrack == nil {
- r.WriteAVCCAudio(0, &msg.AVData, r.bytePool)
- return
- }
- r.AudioTrack.WriteAVCC(msg.ExtendTimestamp, &msg.AVData)
- }
- func (r *RTMPReceiver) ReceiveVideo(msg *Chunk) {
- if r.VideoTrack == nil {
- r.WriteAVCCVideo(0, &msg.AVData, r.bytePool)
- return
- }
- r.VideoTrack.WriteAVCC(msg.ExtendTimestamp, &msg.AVData)
- }
|