publisher.go 10 KB


  1. package ps
  2. import (
  3. "fmt"
  4. "net"
  5. "os"
  6. "path/filepath"
  7. "strings"
  8. "time"
  9. "github.com/pion/rtp"
  10. "github.com/yapingcat/gomedia/go-mpeg2"
  11. "go.uber.org/zap"
  12. . "m7s.live/engine/v4"
  13. "m7s.live/engine/v4/codec"
  14. "m7s.live/engine/v4/codec/mpegts"
  15. "m7s.live/engine/v4/config"
  16. . "m7s.live/engine/v4/track"
  17. "m7s.live/engine/v4/util"
  18. "m7s.live/plugin/ps/v4/mpegps"
  19. )
  20. type cacheItem struct {
  21. Seq uint16
  22. *util.ListItem[util.Buffer]
  23. }
  24. type PSPublisher struct {
  25. Publisher
  26. relayTrack *PSTrack
  27. rtp.Packet `json:"-" yaml:"-"`
  28. DisableReorder bool //是否禁用rtp重排序,TCP模式下应当禁用
  29. // mpegps.MpegPsStream `json:"-" yaml:"-"`
  30. // *mpegps.PSDemuxer `json:"-" yaml:"-"`
  31. mpegps.DecPSPackage `json:"-" yaml:"-"`
  32. reorder util.RTPReorder[*cacheItem]
  33. pool util.BytesPool
  34. lastSeq uint16
  35. lastReceive time.Time
  36. dump *os.File
  37. dumpLen []byte
  38. }
  39. func (p *PSPublisher) OnEvent(event any) {
  40. switch event.(type) {
  41. case IPublisher:
  42. p.dumpLen = make([]byte, 6)
  43. if conf.RelayMode != 0 {
  44. p.relayTrack = NewPSTrack(p.Stream)
  45. }
  46. case SEclose, SEKick:
  47. conf.streams.Delete(p.Header.SSRC)
  48. }
  49. p.Publisher.OnEvent(event)
  50. }
  51. func (p *PSPublisher) ServeTCP(conn net.Conn) {
  52. reader := TCPRTP{
  53. Conn: conn,
  54. }
  55. p.SetIO(conn)
  56. defer p.Stop()
  57. tcpAddr := zap.String("tcp", conn.LocalAddr().String())
  58. p.Info("start receive ps stream from", tcpAddr)
  59. defer p.Info("stop receive ps stream from", tcpAddr)
  60. reader.Start(p.PushPS)
  61. }
  62. func (p *PSPublisher) ServeUDP(conn *net.UDPConn) {
  63. p.SetIO(conn)
  64. defer p.Stop()
  65. bufUDP := make([]byte, 1024*1024)
  66. udpAddr := zap.String("udp", conn.LocalAddr().String())
  67. p.Info("start receive ps stream from", udpAddr)
  68. defer p.Info("stop receive ps stream from", udpAddr)
  69. for {
  70. conn.SetReadDeadline(time.Now().Add(time.Second * 10))
  71. n, _, err := conn.ReadFromUDP(bufUDP)
  72. if err != nil {
  73. return
  74. }
  75. p.PushPS(bufUDP[:n])
  76. }
  77. }
  78. func (p *PSPublisher) PushPS(ps util.Buffer) (err error) {
  79. if err = p.Unmarshal(ps); err != nil {
  80. p.Error("gb28181 decode rtp error:", zap.Error(err))
  81. } else if !p.IsClosed() {
  82. p.writeDump(ps)
  83. }
  84. p.pushPS()
  85. return
  86. }
  87. func (p *PSPublisher) pushRelay() {
  88. item := p.pool.Get(len(p.Packet.Payload))
  89. copy(item.Value, p.Packet.Payload)
  90. p.relayTrack.Push(item)
  91. }
  92. // 解析rtp封装 https://www.ietf.org/rfc/rfc2250.txt
  93. func (p *PSPublisher) pushPS() {
  94. if p.Stream == nil {
  95. return
  96. }
  97. if p.pool == nil {
  98. // p.PSDemuxer = mpegps.NewPSDemuxer()
  99. // p.PSDemuxer.OnPacket = p.OnPacket
  100. // p.PSDemuxer.OnFrame = p.OnFrame
  101. p.EsHandler = p
  102. p.lastSeq = p.SequenceNumber - 1
  103. p.pool = make(util.BytesPool, 17)
  104. }
  105. if conf.RelayMode == 1 && p.relayTrack.PSM != nil {
  106. p.pushRelay()
  107. return
  108. }
  109. if p.DisableReorder {
  110. p.Feed(p.Packet.Payload)
  111. p.lastSeq = p.SequenceNumber
  112. if conf.RelayMode != 0 {
  113. p.pushRelay()
  114. }
  115. } else {
  116. item := p.pool.Get(len(p.Packet.Payload))
  117. copy(item.Value, p.Packet.Payload)
  118. for rtpPacket := p.reorder.Push(p.SequenceNumber, &cacheItem{p.SequenceNumber, item}); rtpPacket != nil; rtpPacket = p.reorder.Pop() {
  119. if rtpPacket.Seq != p.lastSeq+1 {
  120. p.Debug("drop", zap.Uint16("seq", rtpPacket.Seq), zap.Uint16("lastSeq", p.lastSeq))
  121. p.Reset()
  122. if p.VideoTrack != nil {
  123. p.SetLostFlag()
  124. }
  125. }
  126. p.Feed(rtpPacket.Value)
  127. p.lastSeq = rtpPacket.Seq
  128. if conf.RelayMode != 0 {
  129. p.relayTrack.Push(rtpPacket.ListItem)
  130. } else {
  131. rtpPacket.Recycle()
  132. }
  133. }
  134. }
  135. }
  136. func (p *PSPublisher) OnFrame(frame []byte, cid mpeg2.PS_STREAM_TYPE, pts uint64, dts uint64) {
  137. switch cid {
  138. case mpeg2.PS_STREAM_AAC:
  139. if p.AudioTrack != nil {
  140. p.AudioTrack.WriteADTS(uint32(pts), util.ReuseBuffer{frame})
  141. } else {
  142. p.AudioTrack = NewAAC(p.Publisher.Stream, p.pool)
  143. }
  144. case mpeg2.PS_STREAM_G711A:
  145. if p.AudioTrack != nil {
  146. p.AudioTrack.WriteRawBytes(uint32(pts), util.ReuseBuffer{frame})
  147. } else {
  148. p.AudioTrack = NewG711(p.Publisher.Stream, true, p.pool)
  149. }
  150. case mpeg2.PS_STREAM_G711U:
  151. if p.AudioTrack != nil {
  152. p.AudioTrack.WriteRawBytes(uint32(pts), util.ReuseBuffer{frame})
  153. } else {
  154. p.AudioTrack = NewG711(p.Publisher.Stream, false, p.pool)
  155. }
  156. case mpeg2.PS_STREAM_H264:
  157. if p.VideoTrack != nil {
  158. // p.WriteNalu(uint32(pts), uint32(dts), frame)
  159. p.WriteAnnexB(uint32(pts), uint32(dts), frame)
  160. } else {
  161. p.VideoTrack = NewH264(p.Publisher.Stream, p.pool)
  162. }
  163. case mpeg2.PS_STREAM_H265:
  164. if p.VideoTrack != nil {
  165. // p.WriteNalu(uint32(pts), uint32(dts), frame)
  166. p.WriteAnnexB(uint32(pts), uint32(dts), frame)
  167. } else {
  168. p.VideoTrack = NewH265(p.Publisher.Stream, p.pool)
  169. }
  170. }
  171. }
  172. func (p *PSPublisher) OnPacket(pkg mpeg2.Display, decodeResult error) {
  173. // switch value := pkg.(type) {
  174. // case *mpeg2.PSPackHeader:
  175. // // fd3.WriteString("--------------PS Pack Header--------------\n")
  176. // if decodeResult == nil {
  177. // // value.PrettyPrint(fd3)
  178. // } else {
  179. // // fd3.WriteString(fmt.Sprintf("Decode Ps Packet Failed %s\n", decodeResult.Error()))
  180. // }
  181. // case *mpeg2.System_header:
  182. // // fd3.WriteString("--------------System Header--------------\n")
  183. // if decodeResult == nil {
  184. // // value.PrettyPrint(fd3)
  185. // } else {
  186. // // fd3.WriteString(fmt.Sprintf("Decode Ps Packet Failed %s\n", decodeResult.Error()))
  187. // }
  188. // case *mpeg2.Program_stream_map:
  189. // // fd3.WriteString("--------------------PSM-------------------\n")
  190. // if decodeResult == nil {
  191. // // value.PrettyPrint(fd3)
  192. // } else {
  193. // // fd3.WriteString(fmt.Sprintf("Decode Ps Packet Failed %s\n", decodeResult.Error()))
  194. // }
  195. // case *mpeg2.PesPacket:
  196. // // fd3.WriteString("-------------------PES--------------------\n")
  197. // if decodeResult == nil {
  198. // // value.PrettyPrint(fd3)
  199. // } else {
  200. // // fd3.WriteString(fmt.Sprintf("Decode Ps Packet Failed %s\n", decodeResult.Error()))
  201. // }
  202. // }
  203. }
  204. func (p *PSPublisher) ReceiveVideo(es mpegps.MpegPsEsStream) {
  205. if !conf.PubVideo || conf.RelayMode == 1 {
  206. return
  207. }
  208. if p.VideoTrack == nil {
  209. switch es.Type {
  210. case mpegts.STREAM_TYPE_H264:
  211. p.VideoTrack = NewH264(p.Publisher.Stream, p.pool)
  212. case mpegts.STREAM_TYPE_H265:
  213. p.VideoTrack = NewH265(p.Publisher.Stream, p.pool)
  214. default:
  215. //推测编码类型
  216. var maybe264 codec.H264NALUType
  217. maybe264 = maybe264.Parse(es.Buffer[4])
  218. switch maybe264 {
  219. case codec.NALU_Non_IDR_Picture,
  220. codec.NALU_IDR_Picture,
  221. codec.NALU_SEI,
  222. codec.NALU_SPS,
  223. codec.NALU_PPS,
  224. codec.NALU_Access_Unit_Delimiter:
  225. p.VideoTrack = NewH264(p.Publisher.Stream, p.pool)
  226. default:
  227. p.Info("maybe h265", zap.Uint8("type", maybe264.Byte()))
  228. p.VideoTrack = NewH265(p.Publisher.Stream, p.pool)
  229. }
  230. }
  231. }
  232. payload, pts, dts := es.Buffer, es.PTS, es.DTS
  233. if dts == 0 {
  234. dts = pts
  235. }
  236. // if binary.BigEndian.Uint32(payload) != 1 {
  237. // panic("not annexb")
  238. // }
  239. p.WriteAnnexB(pts, dts, payload)
  240. }
  241. func (p *PSPublisher) ReceiveAudio(es mpegps.MpegPsEsStream) {
  242. if !conf.PubAudio || conf.RelayMode == 1 {
  243. return
  244. }
  245. ts, payload := es.PTS, es.Buffer
  246. if p.AudioTrack == nil {
  247. switch es.Type {
  248. case mpegts.STREAM_TYPE_G711A:
  249. p.AudioTrack = NewG711(p.Publisher.Stream, true, p.pool)
  250. case mpegts.STREAM_TYPE_G711U:
  251. p.AudioTrack = NewG711(p.Publisher.Stream, false, p.pool)
  252. case mpegts.STREAM_TYPE_AAC:
  253. p.AudioTrack = NewAAC(p.Publisher.Stream, p.pool)
  254. p.WriteADTS(ts, util.ReuseBuffer{payload})
  255. case 0: //推测编码类型
  256. if payload[0] == 0xff && payload[1]>>4 == 0xf {
  257. p.AudioTrack = NewAAC(p.Publisher.Stream)
  258. p.WriteADTS(ts, util.ReuseBuffer{payload})
  259. }
  260. default:
  261. p.Error("audio type not supported yet", zap.Uint8("type", es.Type))
  262. }
  263. } else if es.Type == mpegts.STREAM_TYPE_AAC {
  264. p.WriteADTS(ts, util.ReuseBuffer{payload})
  265. } else {
  266. p.WriteRawBytes(ts, util.ReuseBuffer{payload})
  267. }
  268. }
  269. func (p *PSPublisher) writeDump(ps util.Buffer) {
  270. if p.dump != nil {
  271. util.PutBE(p.dumpLen[:4], ps.Len())
  272. if p.lastReceive.IsZero() {
  273. util.PutBE(p.dumpLen[4:], 0)
  274. } else {
  275. util.PutBE(p.dumpLen[4:], uint16(time.Since(p.lastReceive).Milliseconds()))
  276. }
  277. p.lastReceive = time.Now()
  278. p.dump.Write(p.dumpLen)
  279. p.dump.Write(ps)
  280. }
  281. }
  282. func (p *PSPublisher) Replay(f *os.File) (err error) {
  283. defer f.Close()
  284. var t uint16
  285. for l := make([]byte, 6); !p.IsClosed(); time.Sleep(time.Millisecond * time.Duration(t)) {
  286. _, err = f.Read(l)
  287. if err != nil {
  288. return
  289. }
  290. payload := make([]byte, util.ReadBE[int](l[:4]))
  291. t = util.ReadBE[uint16](l[4:])
  292. _, err = f.Read(payload)
  293. if err != nil {
  294. return
  295. }
  296. p.PushPS(payload)
  297. }
  298. return
  299. }
  300. func (p *PSPublisher) ReceivePSM(buf util.Buffer, hasAudio bool, hasVideo bool) {
  301. if p.relayTrack != nil {
  302. p.relayTrack.PSM = buf.Clone()
  303. }
  304. p.Config.PubAudio = hasAudio
  305. p.Config.PubVideo = hasVideo
  306. }
  307. func (p *PSPublisher) Receive(streamPath, dump, port string, ssrc uint32, reuse bool) (err error) {
  308. if PSPlugin.Disabled {
  309. return fmt.Errorf("ps plugin is disabled")
  310. }
  311. stream, loaded := conf.streams.LoadOrStore(ssrc, &PSStream{Flag: true})
  312. psStream := stream.(*PSStream)
  313. if loaded {
  314. if psStream.Flag {
  315. return fmt.Errorf("ssrc %d already exists", ssrc)
  316. }
  317. psStream.Flag = true
  318. }
  319. if dump != "" {
  320. dump = filepath.Join(dump, streamPath)
  321. os.MkdirAll(filepath.Dir(dump), 0766)
  322. p.dump, err = os.OpenFile(dump, os.O_CREATE|os.O_WRONLY, 0644)
  323. if err != nil {
  324. return
  325. }
  326. }
  327. if err = PSPlugin.Publish(streamPath, p); err == nil {
  328. psStream.PSPublisher = p
  329. protocol, listenaddr, _ := strings.Cut(port, ":")
  330. if !strings.Contains(listenaddr, ":") {
  331. listenaddr = ":" + listenaddr
  332. }
  333. switch protocol {
  334. case "tcp":
  335. var tcpConf config.TCP
  336. tcpConf.ListenAddr = listenaddr
  337. if reuse {
  338. if _, ok := conf.shareTCP.LoadOrStore(listenaddr, &tcpConf); ok {
  339. } else {
  340. go func() {
  341. tcpConf.ListenTCP(PSPlugin, conf)
  342. conf.shareTCP.Delete(listenaddr)
  343. }()
  344. }
  345. } else {
  346. tcpConf.ListenNum = 1
  347. go tcpConf.ListenTCP(p, p)
  348. }
  349. case "udp":
  350. if reuse {
  351. var udpConf struct {
  352. *net.UDPConn
  353. }
  354. if _, ok := conf.shareUDP.LoadOrStore(listenaddr, &udpConf); ok {
  355. } else {
  356. udpConn, err := util.ListenUDP(listenaddr, 1024*1024)
  357. if err != nil {
  358. PSPlugin.Error("udp listen error", zap.Error(err))
  359. return err
  360. }
  361. udpConf.UDPConn = udpConn
  362. go func() {
  363. conf.ServeUDP(udpConn)
  364. conf.shareUDP.Delete(listenaddr)
  365. }()
  366. }
  367. } else {
  368. udpConn, err := util.ListenUDP(listenaddr, 1024*1024)
  369. if err != nil {
  370. p.Stop()
  371. return err
  372. } else {
  373. go p.ServeUDP(udpConn)
  374. }
  375. }
  376. }
  377. }
  378. return
  379. }