client.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. package rtmp
  2. import (
  3. "crypto/tls"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "net/url"
  8. "strings"
  9. "go.uber.org/zap"
  10. "m7s.live/engine/v4"
  11. )
  12. func NewRTMPClient(addr string) (client *NetConnection, err error) {
  13. u, err := url.Parse(addr)
  14. if err != nil {
  15. RTMPPlugin.Error("connect url parse", zap.Error(err))
  16. return nil, err
  17. }
  18. ps := strings.Split(u.Path, "/")
  19. if len(ps) < 3 {
  20. RTMPPlugin.Error("illegal rtmp url", zap.String("url", addr))
  21. return nil, errors.New("illegal rtmp url")
  22. }
  23. isRtmps := u.Scheme == "rtmps"
  24. if strings.Count(u.Host, ":") == 0 {
  25. if isRtmps {
  26. u.Host += ":443"
  27. } else {
  28. u.Host += ":1935"
  29. }
  30. }
  31. var conn net.Conn
  32. if isRtmps {
  33. var tlsconn *tls.Conn
  34. tlsconn, err = tls.Dial("tcp", u.Host, &tls.Config{})
  35. conn = tlsconn
  36. } else {
  37. conn, err = net.Dial("tcp", u.Host)
  38. }
  39. if err != nil {
  40. RTMPPlugin.Error("dial tcp", zap.String("host", u.Host), zap.Error(err))
  41. return nil, err
  42. }
  43. defer func() {
  44. if err != nil || client == nil {
  45. conn.Close()
  46. }
  47. }()
  48. client = NewNetConnection(conn)
  49. err = client.ClientHandshake()
  50. if err != nil {
  51. RTMPPlugin.Error("handshake", zap.Error(err))
  52. return nil, err
  53. }
  54. client.appName = strings.Join(ps[1:len(ps)-1], "/")
  55. err = client.SendMessage(RTMP_MSG_CHUNK_SIZE, Uint32Message(conf.ChunkSize))
  56. if err != nil {
  57. return
  58. }
  59. client.writeChunkSize = conf.ChunkSize
  60. path := u.Path
  61. if len(u.Query()) != 0 {
  62. path += "?" + u.RawQuery
  63. }
  64. err = client.SendMessage(RTMP_MSG_AMF0_COMMAND, &CallMessage{
  65. CommandMessage{"connect", 1},
  66. map[string]any{
  67. "app": client.appName,
  68. "flashVer": "monibuca/" + engine.Engine.Version,
  69. "swfUrl": addr,
  70. "tcUrl": strings.TrimSuffix(addr, path) + "/" + client.appName,
  71. },
  72. nil,
  73. })
  74. if err != nil {
  75. return
  76. }
  77. for {
  78. msg, err := client.RecvMessage()
  79. if err != nil {
  80. return nil, err
  81. }
  82. switch msg.MessageTypeID {
  83. case RTMP_MSG_AMF0_COMMAND:
  84. cmd := msg.MsgData.(Commander).GetCommand()
  85. switch cmd.CommandName {
  86. case "_result":
  87. response := msg.MsgData.(*ResponseMessage)
  88. if response.Infomation["code"] == NetConnection_Connect_Success {
  89. return client, nil
  90. } else {
  91. return nil, err
  92. }
  93. default:
  94. fmt.Println(cmd.CommandName)
  95. }
  96. }
  97. }
  98. }
  99. type RTMPPusher struct {
  100. RTMPSender
  101. engine.Pusher
  102. }
  103. func (pusher *RTMPPusher) Connect() (err error) {
  104. if pusher.NetConnection, err = NewRTMPClient(pusher.RemoteURL); err == nil {
  105. pusher.SetIO(pusher.NetConnection.Conn)
  106. RTMPPlugin.Info("connect", zap.String("remoteURL", pusher.RemoteURL))
  107. }
  108. return
  109. }
  110. func (pusher *RTMPPusher) Disconnect() {
  111. if pusher.NetConnection != nil {
  112. pusher.NetConnection.Close()
  113. }
  114. }
  115. func (pusher *RTMPPusher) Push() error {
  116. pusher.SendMessage(RTMP_MSG_AMF0_COMMAND, &CommandMessage{"createStream", 2})
  117. for {
  118. msg, err := pusher.RecvMessage()
  119. if err != nil {
  120. return err
  121. }
  122. switch msg.MessageTypeID {
  123. case RTMP_MSG_AMF0_COMMAND:
  124. cmd := msg.MsgData.(Commander).GetCommand()
  125. switch cmd.CommandName {
  126. case Response_Result, Response_OnStatus:
  127. if response, ok := msg.MsgData.(*ResponseCreateStreamMessage); ok {
  128. pusher.StreamID = response.StreamId
  129. pusher.audio.MessageStreamID = pusher.StreamID
  130. pusher.video.MessageStreamID = pusher.StreamID
  131. URL, _ := url.Parse(pusher.RemoteURL)
  132. _, streamPath, _ := strings.Cut(URL.Path, "/")
  133. _, streamPath, _ = strings.Cut(streamPath, "/")
  134. pusher.Args = URL.Query()
  135. if len(pusher.Args) > 0 {
  136. streamPath += "?" + pusher.Args.Encode()
  137. }
  138. pusher.SendMessage(RTMP_MSG_AMF0_COMMAND, &PublishMessage{
  139. CURDStreamMessage{
  140. CommandMessage{
  141. "publish",
  142. 1,
  143. },
  144. response.StreamId,
  145. },
  146. streamPath,
  147. "live",
  148. })
  149. } else if response, ok := msg.MsgData.(*ResponsePublishMessage); ok {
  150. if response.Infomation["code"] == NetStream_Publish_Start {
  151. go pusher.PlayRaw()
  152. } else {
  153. return errors.New(response.Infomation["code"].(string))
  154. }
  155. }
  156. }
  157. }
  158. }
  159. }
  160. type RTMPPuller struct {
  161. RTMPReceiver
  162. engine.Puller
  163. }
  164. func (puller *RTMPPuller) Connect() (err error) {
  165. if puller.NetConnection, err = NewRTMPClient(puller.RemoteURL); err == nil {
  166. puller.SetIO(puller.NetConnection.Conn)
  167. RTMPPlugin.Info("connect", zap.String("remoteURL", puller.RemoteURL))
  168. }
  169. return
  170. }
  171. func (puller *RTMPPuller) Disconnect() {
  172. if puller.NetConnection != nil {
  173. puller.NetConnection.Close()
  174. }
  175. }
  176. func (puller *RTMPPuller) Pull() (err error) {
  177. defer puller.Stop()
  178. err = puller.SendMessage(RTMP_MSG_AMF0_COMMAND, &CommandMessage{"createStream", 2})
  179. for err == nil {
  180. msg, err := puller.RecvMessage()
  181. if err != nil {
  182. return err
  183. }
  184. switch msg.MessageTypeID {
  185. case RTMP_MSG_AUDIO:
  186. puller.ReceiveAudio(msg)
  187. case RTMP_MSG_VIDEO:
  188. puller.ReceiveVideo(msg)
  189. case RTMP_MSG_AMF0_COMMAND:
  190. cmd := msg.MsgData.(Commander).GetCommand()
  191. switch cmd.CommandName {
  192. case "_result":
  193. if response, ok := msg.MsgData.(*ResponseCreateStreamMessage); ok {
  194. puller.StreamID = response.StreamId
  195. m := &PlayMessage{}
  196. m.StreamId = response.StreamId
  197. m.TransactionId = 4
  198. m.CommandMessage.CommandName = "play"
  199. URL, _ := url.Parse(puller.RemoteURL)
  200. ps := strings.Split(URL.Path, "/")
  201. puller.Args = URL.Query()
  202. m.StreamName = ps[len(ps)-1]
  203. if len(puller.Args) > 0 {
  204. m.StreamName += "?" + puller.Args.Encode()
  205. }
  206. puller.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
  207. // if response, ok := msg.MsgData.(*ResponsePlayMessage); ok {
  208. // if response.Object["code"] == "NetStream.Play.Start" {
  209. // } else if response.Object["level"] == Level_Error {
  210. // return errors.New(response.Object["code"].(string))
  211. // }
  212. // } else {
  213. // return errors.New("pull faild")
  214. // }
  215. }
  216. }
  217. }
  218. }
  219. return
  220. }