123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396 |
- package ps
- import (
- "fmt"
- "net"
- "os"
- "path/filepath"
- "strings"
- "time"
- "github.com/pion/rtp"
- "github.com/yapingcat/gomedia/go-mpeg2"
- "go.uber.org/zap"
- . "m7s.live/engine/v4"
- "m7s.live/engine/v4/codec"
- "m7s.live/engine/v4/codec/mpegts"
- "m7s.live/engine/v4/config"
- . "m7s.live/engine/v4/track"
- "m7s.live/engine/v4/util"
- "m7s.live/plugin/ps/v4/mpegps"
- )
- type cacheItem struct {
- Seq uint16
- *util.ListItem[util.Buffer]
- }
- type PSPublisher struct {
- Publisher
- relayTrack *PSTrack
- rtp.Packet `json:"-" yaml:"-"`
- DisableReorder bool //是否禁用rtp重排序,TCP模式下应当禁用
- // mpegps.MpegPsStream `json:"-" yaml:"-"`
- // *mpegps.PSDemuxer `json:"-" yaml:"-"`
- mpegps.DecPSPackage `json:"-" yaml:"-"`
- reorder util.RTPReorder[*cacheItem]
- pool util.BytesPool
- lastSeq uint16
- lastReceive time.Time
- dump *os.File
- dumpLen []byte
- }
- func (p *PSPublisher) OnEvent(event any) {
- switch event.(type) {
- case IPublisher:
- p.dumpLen = make([]byte, 6)
- if conf.RelayMode != 0 {
- p.relayTrack = NewPSTrack(p.Stream)
- }
- case SEclose, SEKick:
- conf.streams.Delete(p.Header.SSRC)
- }
- p.Publisher.OnEvent(event)
- }
- func (p *PSPublisher) ServeTCP(conn net.Conn) {
- reader := TCPRTP{
- Conn: conn,
- }
- p.SetIO(conn)
- defer p.Stop()
- tcpAddr := zap.String("tcp", conn.LocalAddr().String())
- p.Info("start receive ps stream from", tcpAddr)
- defer p.Info("stop receive ps stream from", tcpAddr)
- reader.Start(p.PushPS)
- }
- func (p *PSPublisher) ServeUDP(conn *net.UDPConn) {
- p.SetIO(conn)
- defer p.Stop()
- bufUDP := make([]byte, 1024*1024)
- udpAddr := zap.String("udp", conn.LocalAddr().String())
- p.Info("start receive ps stream from", udpAddr)
- defer p.Info("stop receive ps stream from", udpAddr)
- for {
- conn.SetReadDeadline(time.Now().Add(time.Second * 10))
- n, _, err := conn.ReadFromUDP(bufUDP)
- if err != nil {
- return
- }
- p.PushPS(bufUDP[:n])
- }
- }
- func (p *PSPublisher) PushPS(ps util.Buffer) (err error) {
- if err = p.Unmarshal(ps); err != nil {
- p.Error("gb28181 decode rtp error:", zap.Error(err))
- } else if !p.IsClosed() {
- p.writeDump(ps)
- }
- p.pushPS()
- return
- }
- func (p *PSPublisher) pushRelay() {
- item := p.pool.Get(len(p.Packet.Payload))
- copy(item.Value, p.Packet.Payload)
- p.relayTrack.Push(item)
- }
- // 解析rtp封装 https://www.ietf.org/rfc/rfc2250.txt
- func (p *PSPublisher) pushPS() {
- if p.Stream == nil {
- return
- }
- if p.pool == nil {
- // p.PSDemuxer = mpegps.NewPSDemuxer()
- // p.PSDemuxer.OnPacket = p.OnPacket
- // p.PSDemuxer.OnFrame = p.OnFrame
- p.EsHandler = p
- p.lastSeq = p.SequenceNumber - 1
- p.pool = make(util.BytesPool, 17)
- }
- if conf.RelayMode == 1 && p.relayTrack.PSM != nil {
- p.pushRelay()
- return
- }
- if p.DisableReorder {
- p.Feed(p.Packet.Payload)
- p.lastSeq = p.SequenceNumber
- if conf.RelayMode != 0 {
- p.pushRelay()
- }
- } else {
- item := p.pool.Get(len(p.Packet.Payload))
- copy(item.Value, p.Packet.Payload)
- for rtpPacket := p.reorder.Push(p.SequenceNumber, &cacheItem{p.SequenceNumber, item}); rtpPacket != nil; rtpPacket = p.reorder.Pop() {
- if rtpPacket.Seq != p.lastSeq+1 {
- p.Debug("drop", zap.Uint16("seq", rtpPacket.Seq), zap.Uint16("lastSeq", p.lastSeq))
- p.Reset()
- if p.VideoTrack != nil {
- p.SetLostFlag()
- }
- }
- p.Feed(rtpPacket.Value)
- p.lastSeq = rtpPacket.Seq
- if conf.RelayMode != 0 {
- p.relayTrack.Push(rtpPacket.ListItem)
- } else {
- rtpPacket.Recycle()
- }
- }
- }
- }
- func (p *PSPublisher) OnFrame(frame []byte, cid mpeg2.PS_STREAM_TYPE, pts uint64, dts uint64) {
- switch cid {
- case mpeg2.PS_STREAM_AAC:
- if p.AudioTrack != nil {
- p.AudioTrack.WriteADTS(uint32(pts), util.ReuseBuffer{frame})
- } else {
- p.AudioTrack = NewAAC(p.Publisher.Stream, p.pool)
- }
- case mpeg2.PS_STREAM_G711A:
- if p.AudioTrack != nil {
- p.AudioTrack.WriteRawBytes(uint32(pts), util.ReuseBuffer{frame})
- } else {
- p.AudioTrack = NewG711(p.Publisher.Stream, true, p.pool)
- }
- case mpeg2.PS_STREAM_G711U:
- if p.AudioTrack != nil {
- p.AudioTrack.WriteRawBytes(uint32(pts), util.ReuseBuffer{frame})
- } else {
- p.AudioTrack = NewG711(p.Publisher.Stream, false, p.pool)
- }
- case mpeg2.PS_STREAM_H264:
- if p.VideoTrack != nil {
- // p.WriteNalu(uint32(pts), uint32(dts), frame)
- p.WriteAnnexB(uint32(pts), uint32(dts), frame)
- } else {
- p.VideoTrack = NewH264(p.Publisher.Stream, p.pool)
- }
- case mpeg2.PS_STREAM_H265:
- if p.VideoTrack != nil {
- // p.WriteNalu(uint32(pts), uint32(dts), frame)
- p.WriteAnnexB(uint32(pts), uint32(dts), frame)
- } else {
- p.VideoTrack = NewH265(p.Publisher.Stream, p.pool)
- }
- }
- }
- func (p *PSPublisher) OnPacket(pkg mpeg2.Display, decodeResult error) {
- // switch value := pkg.(type) {
- // case *mpeg2.PSPackHeader:
- // // fd3.WriteString("--------------PS Pack Header--------------\n")
- // if decodeResult == nil {
- // // value.PrettyPrint(fd3)
- // } else {
- // // fd3.WriteString(fmt.Sprintf("Decode Ps Packet Failed %s\n", decodeResult.Error()))
- // }
- // case *mpeg2.System_header:
- // // fd3.WriteString("--------------System Header--------------\n")
- // if decodeResult == nil {
- // // value.PrettyPrint(fd3)
- // } else {
- // // fd3.WriteString(fmt.Sprintf("Decode Ps Packet Failed %s\n", decodeResult.Error()))
- // }
- // case *mpeg2.Program_stream_map:
- // // fd3.WriteString("--------------------PSM-------------------\n")
- // if decodeResult == nil {
- // // value.PrettyPrint(fd3)
- // } else {
- // // fd3.WriteString(fmt.Sprintf("Decode Ps Packet Failed %s\n", decodeResult.Error()))
- // }
- // case *mpeg2.PesPacket:
- // // fd3.WriteString("-------------------PES--------------------\n")
- // if decodeResult == nil {
- // // value.PrettyPrint(fd3)
- // } else {
- // // fd3.WriteString(fmt.Sprintf("Decode Ps Packet Failed %s\n", decodeResult.Error()))
- // }
- // }
- }
- func (p *PSPublisher) ReceiveVideo(es mpegps.MpegPsEsStream) {
- if !conf.PubVideo || conf.RelayMode == 1 {
- return
- }
- if p.VideoTrack == nil {
- switch es.Type {
- case mpegts.STREAM_TYPE_H264:
- p.VideoTrack = NewH264(p.Publisher.Stream, p.pool)
- case mpegts.STREAM_TYPE_H265:
- p.VideoTrack = NewH265(p.Publisher.Stream, p.pool)
- default:
- //推测编码类型
- var maybe264 codec.H264NALUType
- maybe264 = maybe264.Parse(es.Buffer[4])
- switch maybe264 {
- case codec.NALU_Non_IDR_Picture,
- codec.NALU_IDR_Picture,
- codec.NALU_SEI,
- codec.NALU_SPS,
- codec.NALU_PPS,
- codec.NALU_Access_Unit_Delimiter:
- p.VideoTrack = NewH264(p.Publisher.Stream, p.pool)
- default:
- p.Info("maybe h265", zap.Uint8("type", maybe264.Byte()))
- p.VideoTrack = NewH265(p.Publisher.Stream, p.pool)
- }
- }
- }
- payload, pts, dts := es.Buffer, es.PTS, es.DTS
- if dts == 0 {
- dts = pts
- }
- // if binary.BigEndian.Uint32(payload) != 1 {
- // panic("not annexb")
- // }
- p.WriteAnnexB(pts, dts, payload)
- }
- func (p *PSPublisher) ReceiveAudio(es mpegps.MpegPsEsStream) {
- if !conf.PubAudio || conf.RelayMode == 1 {
- return
- }
- ts, payload := es.PTS, es.Buffer
- if p.AudioTrack == nil {
- switch es.Type {
- case mpegts.STREAM_TYPE_G711A:
- p.AudioTrack = NewG711(p.Publisher.Stream, true, p.pool)
- case mpegts.STREAM_TYPE_G711U:
- p.AudioTrack = NewG711(p.Publisher.Stream, false, p.pool)
- case mpegts.STREAM_TYPE_AAC:
- p.AudioTrack = NewAAC(p.Publisher.Stream, p.pool)
- p.WriteADTS(ts, util.ReuseBuffer{payload})
- case 0: //推测编码类型
- if payload[0] == 0xff && payload[1]>>4 == 0xf {
- p.AudioTrack = NewAAC(p.Publisher.Stream)
- p.WriteADTS(ts, util.ReuseBuffer{payload})
- }
- default:
- p.Error("audio type not supported yet", zap.Uint8("type", es.Type))
- }
- } else if es.Type == mpegts.STREAM_TYPE_AAC {
- p.WriteADTS(ts, util.ReuseBuffer{payload})
- } else {
- p.WriteRawBytes(ts, util.ReuseBuffer{payload})
- }
- }
- func (p *PSPublisher) writeDump(ps util.Buffer) {
- if p.dump != nil {
- util.PutBE(p.dumpLen[:4], ps.Len())
- if p.lastReceive.IsZero() {
- util.PutBE(p.dumpLen[4:], 0)
- } else {
- util.PutBE(p.dumpLen[4:], uint16(time.Since(p.lastReceive).Milliseconds()))
- }
- p.lastReceive = time.Now()
- p.dump.Write(p.dumpLen)
- p.dump.Write(ps)
- }
- }
- func (p *PSPublisher) Replay(f *os.File) (err error) {
- defer f.Close()
- var t uint16
- for l := make([]byte, 6); !p.IsClosed(); time.Sleep(time.Millisecond * time.Duration(t)) {
- _, err = f.Read(l)
- if err != nil {
- return
- }
- payload := make([]byte, util.ReadBE[int](l[:4]))
- t = util.ReadBE[uint16](l[4:])
- _, err = f.Read(payload)
- if err != nil {
- return
- }
- p.PushPS(payload)
- }
- return
- }
- func (p *PSPublisher) ReceivePSM(buf util.Buffer, hasAudio bool, hasVideo bool) {
- if p.relayTrack != nil {
- p.relayTrack.PSM = buf.Clone()
- }
- p.Config.PubAudio = hasAudio
- p.Config.PubVideo = hasVideo
- }
- func (p *PSPublisher) Receive(streamPath, dump, port string, ssrc uint32, reuse bool) (err error) {
- if PSPlugin.Disabled {
- return fmt.Errorf("ps plugin is disabled")
- }
-
- stream, loaded := conf.streams.LoadOrStore(ssrc, &PSStream{Flag: true})
- psStream := stream.(*PSStream)
- if loaded {
- if psStream.Flag {
- return fmt.Errorf("ssrc %d already exists", ssrc)
- }
- psStream.Flag = true
- }
- if dump != "" {
- dump = filepath.Join(dump, streamPath)
- os.MkdirAll(filepath.Dir(dump), 0766)
- p.dump, err = os.OpenFile(dump, os.O_CREATE|os.O_WRONLY, 0644)
- if err != nil {
- return
- }
- }
- if err = PSPlugin.Publish(streamPath, p); err == nil {
- psStream.PSPublisher = p
- protocol, listenaddr, _ := strings.Cut(port, ":")
- if !strings.Contains(listenaddr, ":") {
- listenaddr = ":" + listenaddr
- }
- switch protocol {
- case "tcp":
- var tcpConf config.TCP
- tcpConf.ListenAddr = listenaddr
- if reuse {
- if _, ok := conf.shareTCP.LoadOrStore(listenaddr, &tcpConf); ok {
- } else {
- go func() {
- tcpConf.ListenTCP(PSPlugin, conf)
- conf.shareTCP.Delete(listenaddr)
- }()
- }
- } else {
- tcpConf.ListenNum = 1
- go tcpConf.ListenTCP(p, p)
- }
- case "udp":
- if reuse {
- var udpConf struct {
- *net.UDPConn
- }
- if _, ok := conf.shareUDP.LoadOrStore(listenaddr, &udpConf); ok {
- } else {
- udpConn, err := util.ListenUDP(listenaddr, 1024*1024)
- if err != nil {
- PSPlugin.Error("udp listen error", zap.Error(err))
- return err
- }
- udpConf.UDPConn = udpConn
- go func() {
- conf.ServeUDP(udpConn)
- conf.shareUDP.Delete(listenaddr)
- }()
- }
- } else {
- udpConn, err := util.ListenUDP(listenaddr, 1024*1024)
- if err != nil {
- p.Stop()
- return err
- } else {
- go p.ServeUDP(udpConn)
- }
- }
- }
- }
- return
- }
|