remote.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. package config
  2. import (
  3. "bufio"
  4. "context"
  5. "crypto/tls"
  6. "encoding/json"
  7. "io"
  8. "net"
  9. "net/http"
  10. "strings"
  11. "time"
  12. "github.com/quic-go/quic-go"
  13. "m7s.live/engine/v4/log"
  14. )
  15. type myResponseWriter2 struct {
  16. quic.Stream
  17. myResponseWriter
  18. }
  19. type myResponseWriter3 struct {
  20. handshake bool
  21. myResponseWriter2
  22. quic.Connection
  23. }
  24. func (w *myResponseWriter3) Write(b []byte) (int, error) {
  25. if !w.handshake {
  26. w.handshake = true
  27. return len(b), nil
  28. }
  29. println(string(b))
  30. return w.Stream.Write(b)
  31. }
  32. func (w *myResponseWriter3) Hijack() (net.Conn, *bufio.ReadWriter, error) {
  33. return net.Conn(w), bufio.NewReadWriter(bufio.NewReader(w), bufio.NewWriter(w)), nil
  34. }
  35. func (cfg *Engine) WtRemote(ctx context.Context) {
  36. retryDelay := [...]int{2, 3, 5, 8, 13}
  37. for i := 0; ctx.Err() == nil; i++ {
  38. connected, err := cfg.Remote(ctx)
  39. if err == nil {
  40. //不需要重试了,服务器返回了错误
  41. return
  42. }
  43. if Global.LogLang == "zh" {
  44. log.Error("连接到控制台服务器", cfg.Server, "失败", err)
  45. } else {
  46. log.Error("connect to console server ", cfg.Server, " ", err)
  47. }
  48. if connected {
  49. i = 0
  50. } else if i >= 5 {
  51. i = 4
  52. }
  53. time.Sleep(time.Second * time.Duration(retryDelay[i]))
  54. }
  55. }
  56. func (cfg *Engine) Remote(ctx context.Context) (wasConnected bool, err error) {
  57. tlsConf := &tls.Config{
  58. InsecureSkipVerify: true,
  59. NextProtos: []string{"monibuca"},
  60. }
  61. conn, err := quic.DialAddr(ctx, cfg.Server, tlsConf, &quic.Config{
  62. KeepAlivePeriod: time.Second * 10,
  63. EnableDatagrams: true,
  64. })
  65. wasConnected = err == nil
  66. if stream := quic.Stream(nil); err == nil {
  67. if stream, err = conn.OpenStreamSync(ctx); err == nil {
  68. _, err = stream.Write(append([]byte{1}, (cfg.Secret + "\n")...))
  69. if msg := []byte(nil); err == nil {
  70. if msg, err = bufio.NewReader(stream).ReadSlice(0); err == nil {
  71. var rMessage map[string]any
  72. if err = json.Unmarshal(msg[:len(msg)-1], &rMessage); err == nil {
  73. if rMessage["code"].(float64) != 0 {
  74. if Global.LogLang == "zh" {
  75. log.Error("控制台服务器", cfg.Server, "返回错误", rMessage["msg"])
  76. } else {
  77. log.Error("response from console server ", cfg.Server, " ", rMessage["msg"])
  78. }
  79. return false, nil
  80. } else {
  81. cfg.reportStream = stream
  82. if Global.LogLang == "zh" {
  83. log.Info("连接到控制台服务器", cfg.Server, "成功", rMessage)
  84. } else {
  85. log.Info("response from console server ", cfg.Server, " success ", rMessage)
  86. }
  87. if v, ok := rMessage["enableReport"]; ok {
  88. cfg.enableReport = v.(bool)
  89. }
  90. if v, ok := rMessage["instanceId"]; ok {
  91. cfg.instanceId = v.(string)
  92. }
  93. }
  94. }
  95. }
  96. }
  97. }
  98. }
  99. for err == nil {
  100. var s quic.Stream
  101. if s, err = conn.AcceptStream(ctx); err == nil {
  102. go cfg.ReceiveRequest(s, conn)
  103. }
  104. }
  105. return wasConnected, err
  106. }
  107. func (cfg *Engine) ReceiveRequest(s quic.Stream, conn quic.Connection) error {
  108. defer s.Close()
  109. wr := &myResponseWriter2{Stream: s}
  110. reader := bufio.NewReader(s)
  111. var req *http.Request
  112. url, _, err := reader.ReadLine()
  113. if err == nil {
  114. ctx, cancel := context.WithCancel(s.Context())
  115. defer cancel()
  116. req, err = http.NewRequestWithContext(ctx, "GET", string(url), reader)
  117. for err == nil {
  118. var h []byte
  119. if h, _, err = reader.ReadLine(); len(h) > 0 {
  120. if b, a, f := strings.Cut(string(h), ": "); f {
  121. req.Header.Set(b, a)
  122. }
  123. } else {
  124. break
  125. }
  126. }
  127. if err == nil {
  128. h, _ := cfg.mux.Handler(req)
  129. if req.Header.Get("Accept") == "text/event-stream" {
  130. go h.ServeHTTP(wr, req)
  131. } else if req.Header.Get("Upgrade") == "websocket" {
  132. var writer myResponseWriter3
  133. writer.Stream = s
  134. writer.Connection = conn
  135. req.Host = req.Header.Get("Host")
  136. if req.Host == "" {
  137. req.Host = req.URL.Host
  138. }
  139. if req.Host == "" {
  140. req.Host = "m7s.live"
  141. }
  142. h.ServeHTTP(&writer, req) //建立websocket连接,握手
  143. } else {
  144. h.ServeHTTP(wr, req)
  145. }
  146. }
  147. io.ReadAll(s)
  148. }
  149. if err != nil {
  150. log.Error("read console server error:", err)
  151. }
  152. return err
  153. }