main.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. package ps
  2. import (
  3. "fmt"
  4. "net"
  5. "net/http"
  6. "os"
  7. "path/filepath"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/gobwas/ws"
  13. "github.com/gobwas/ws/wsutil"
  14. "github.com/pion/rtp"
  15. "go.uber.org/zap"
  16. . "m7s.live/engine/v4"
  17. "m7s.live/engine/v4/config"
  18. "m7s.live/engine/v4/lang"
  19. "m7s.live/engine/v4/track"
  20. "m7s.live/engine/v4/util"
  21. )
  22. type PSStream struct {
  23. Flag bool
  24. *PSPublisher
  25. net.Conn
  26. }
  27. type PSConfig struct {
  28. config.HTTP
  29. config.Publish
  30. config.Subscribe
  31. RelayMode int `desc:"转发模式" enum:"0:只转协议,1:只转发,2:转协议+转发"` // 转发模式,0:转协议+不转发,1:不转协议+转发,2:转协议+转发
  32. streams sync.Map
  33. shareTCP sync.Map
  34. shareUDP sync.Map
  35. }
  36. var conf = &PSConfig{}
  37. var PSPlugin = InstallPlugin(conf)
  38. func (c *PSConfig) OnEvent(event any) {
  39. switch event.(type) {
  40. case FirstConfig:
  41. lang.Merge("zh", map[string]string{
  42. "start receive ps stream from": "开始接收PS流来自",
  43. "stop receive ps stream from": "停止接收PS流来自",
  44. "ssrc not found": "未找到ssrc",
  45. })
  46. }
  47. }
  48. func (c *PSConfig) ServeTCP(conn net.Conn) {
  49. startTime := time.Now()
  50. reader := TCPRTP{
  51. Conn: conn,
  52. }
  53. tcpAddr := zap.String("tcp", conn.LocalAddr().String())
  54. var puber *PSPublisher
  55. var psStream *PSStream
  56. var cache net.Buffers
  57. err := reader.Start(func(data util.Buffer) (err error) {
  58. if psStream == nil {
  59. var rtpPacket rtp.Packet
  60. if err = rtpPacket.Unmarshal(data); err != nil {
  61. PSPlugin.Error("gb28181 decode rtp error:", zap.Error(err))
  62. }
  63. ssrc := rtpPacket.SSRC
  64. stream, loaded := conf.streams.LoadOrStore(ssrc, &PSStream{
  65. Conn: conn,
  66. })
  67. psStream = stream.(*PSStream)
  68. if loaded {
  69. if psStream.Conn != nil {
  70. return fmt.Errorf("ssrc conflict")
  71. }
  72. }
  73. return
  74. }
  75. if puber == nil {
  76. if psStream.PSPublisher != nil {
  77. puber = psStream.PSPublisher
  78. puber.Info("start receive ps stream from", tcpAddr)
  79. for _, buf := range cache {
  80. puber.PushPS(buf)
  81. }
  82. puber.PushPS(data)
  83. return
  84. } else {
  85. PSPlugin.Warn("publisher not found", zap.Uint32("ssrc", psStream.SSRC))
  86. cache = append(cache, append([]byte(nil), data...))
  87. if time.Since(startTime) > time.Second*5 {
  88. return fmt.Errorf("publisher not found")
  89. }
  90. }
  91. } else {
  92. puber.PushPS(data)
  93. }
  94. return
  95. })
  96. if puber != nil {
  97. puber.Stop(zap.Error(err))
  98. puber.Info("stop receive ps stream from", tcpAddr)
  99. }
  100. }
  101. func (c *PSConfig) ServeUDP(conn *net.UDPConn) {
  102. bufUDP := make([]byte, 1024*1024)
  103. udpAddr := zap.String("udp", conn.LocalAddr().String())
  104. var rtpPacket rtp.Packet
  105. PSPlugin.Info("start receive ps stream from", udpAddr)
  106. defer PSPlugin.Info("stop receive ps stream from", udpAddr)
  107. var lastSSRC uint32
  108. var lastPubber *PSPublisher
  109. for {
  110. // conn.SetReadDeadline(time.Now().Add(time.Second * 10))
  111. n, _, err := conn.ReadFromUDP(bufUDP)
  112. if err != nil {
  113. return
  114. }
  115. if err := rtpPacket.Unmarshal(bufUDP[:n]); err != nil {
  116. PSPlugin.Error("gb28181 decode rtp error:", zap.Error(err))
  117. }
  118. ssrc := rtpPacket.SSRC
  119. if lastSSRC != ssrc {
  120. if v, ok := conf.streams.Load(ssrc); ok {
  121. lastSSRC = ssrc
  122. lastPubber = v.(*PSStream).PSPublisher
  123. } else {
  124. PSPlugin.Error("ssrc not found", zap.Uint32("ssrc", ssrc))
  125. continue
  126. }
  127. }
  128. lastPubber.Packet = rtpPacket
  129. lastPubber.pushPS()
  130. }
  131. }
  132. func (c *PSConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  133. streamPath := strings.TrimPrefix(r.URL.Path, "/")
  134. if r.URL.RawQuery != "" {
  135. streamPath += "?" + r.URL.RawQuery
  136. }
  137. conn, _, _, err := ws.UpgradeHTTP(r, w)
  138. if err != nil {
  139. return
  140. }
  141. var suber PSSubscriber
  142. suber.SetIO(conn)
  143. suber.SetParentCtx(r.Context())
  144. suber.ID = r.RemoteAddr
  145. if err = PSPlugin.Subscribe(streamPath, &suber); err != nil {
  146. http.Error(w, err.Error(), http.StatusBadRequest)
  147. return
  148. }
  149. var b []byte
  150. b, err = wsutil.ReadClientBinary(conn)
  151. var rtpPacket rtp.Packet
  152. if err == nil {
  153. dc := track.NewDataTrack[[]byte]("voice")
  154. dc.Attach(suber.Stream)
  155. for err == nil {
  156. err = rtpPacket.Unmarshal(b)
  157. if err == nil {
  158. dc.Push(rtpPacket.Payload)
  159. }
  160. b, err = wsutil.ReadClientBinary(conn)
  161. }
  162. }
  163. suber.Stop(zap.Error(err))
  164. }
  165. // Deprecated: 请使用PSPublisher的Receive
  166. func Receive(streamPath, dump, port string, ssrc uint32, reuse bool) (err error) {
  167. if PSPlugin.Disabled {
  168. return fmt.Errorf("ps plugin is disabled")
  169. }
  170. var pubber PSPublisher
  171. stream, loaded := conf.streams.LoadOrStore(ssrc, &PSStream{Flag: true})
  172. psStream := stream.(*PSStream)
  173. if loaded {
  174. if psStream.Flag {
  175. return fmt.Errorf("ssrc %d already exists", ssrc)
  176. }
  177. psStream.Flag = true
  178. }
  179. if dump != "" {
  180. dump = filepath.Join(dump, streamPath)
  181. os.MkdirAll(filepath.Dir(dump), 0766)
  182. pubber.dump, err = os.OpenFile(dump, os.O_CREATE|os.O_WRONLY, 0644)
  183. if err != nil {
  184. return
  185. }
  186. }
  187. if err = PSPlugin.Publish(streamPath, &pubber); err == nil {
  188. psStream.PSPublisher = &pubber
  189. protocol, listenaddr, _ := strings.Cut(port, ":")
  190. if !strings.Contains(listenaddr, ":") {
  191. listenaddr = ":" + listenaddr
  192. }
  193. // TODO: 暂时通过streamPath来判断是否是录像流
  194. tmp := strings.Split(pubber.Stream.StreamName, "/")
  195. if len(tmp) > 1 {
  196. pubber.Stream.DelayCloseTimeout = time.Second * 10
  197. pubber.Stream.IdleTimeout = time.Second * 10
  198. }
  199. switch protocol {
  200. case "tcp":
  201. var tcpConf config.TCP
  202. tcpConf.ListenAddr = listenaddr
  203. if reuse {
  204. if _, ok := conf.shareTCP.LoadOrStore(listenaddr, &tcpConf); ok {
  205. } else {
  206. go func() {
  207. tcpConf.ListenTCP(PSPlugin, conf)
  208. conf.shareTCP.Delete(listenaddr)
  209. }()
  210. }
  211. } else {
  212. tcpConf.ListenNum = 1
  213. go tcpConf.ListenTCP(pubber, &pubber)
  214. }
  215. case "udp":
  216. if reuse {
  217. var udpConf struct {
  218. *net.UDPConn
  219. }
  220. if _, ok := conf.shareUDP.LoadOrStore(listenaddr, &udpConf); ok {
  221. } else {
  222. udpConn, err := util.ListenUDP(listenaddr, 1024*1024)
  223. if err != nil {
  224. PSPlugin.Error("udp listen error", zap.Error(err))
  225. return err
  226. }
  227. udpConf.UDPConn = udpConn
  228. go func() {
  229. conf.ServeUDP(udpConn)
  230. conf.shareUDP.Delete(listenaddr)
  231. }()
  232. }
  233. } else {
  234. udpConn, err := util.ListenUDP(listenaddr, 1024*1024)
  235. if err != nil {
  236. pubber.Stop()
  237. return err
  238. } else {
  239. go pubber.ServeUDP(udpConn)
  240. }
  241. }
  242. }
  243. }
  244. return
  245. }
  246. // 收流
  247. func (c *PSConfig) API_receive(w http.ResponseWriter, r *http.Request) {
  248. query := r.URL.Query()
  249. dump := query.Get("dump")
  250. streamPath := query.Get("streamPath")
  251. ssrc := query.Get("ssrc")
  252. port := query.Get("port")
  253. reuse := query.Get("reuse") // 是否复用端口
  254. if _ssrc, err := strconv.ParseInt(ssrc, 10, 0); err == nil {
  255. if err := Receive(streamPath, dump, port, uint32(_ssrc), reuse != ""); err != nil {
  256. http.Error(w, err.Error(), http.StatusInternalServerError)
  257. } else {
  258. w.Write([]byte("ok"))
  259. }
  260. } else {
  261. http.Error(w, err.Error(), http.StatusInternalServerError)
  262. }
  263. }
  264. func (c *PSConfig) API_replay(w http.ResponseWriter, r *http.Request) {
  265. dump := r.URL.Query().Get("dump")
  266. streamPath := r.URL.Query().Get("streamPath")
  267. if dump == "" {
  268. dump = "dump/ps"
  269. }
  270. f, err := os.OpenFile(dump, os.O_RDONLY, 0644)
  271. if err != nil {
  272. http.Error(w, err.Error(), http.StatusInternalServerError)
  273. } else {
  274. if streamPath == "" {
  275. if strings.HasPrefix(dump, "/") {
  276. streamPath = "replay" + dump
  277. } else {
  278. streamPath = "replay/" + dump
  279. }
  280. }
  281. var pub PSPublisher
  282. pub.SetIO(f)
  283. if err = PSPlugin.Publish(streamPath, &pub); err == nil {
  284. go pub.Replay(f)
  285. w.Write([]byte("ok"))
  286. } else {
  287. http.Error(w, err.Error(), http.StatusInternalServerError)
  288. }
  289. }
  290. }