main.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package webtransport
  2. import (
  3. "net/http"
  4. "strconv"
  5. "github.com/quic-go/quic-go"
  6. . "m7s.live/engine/v4"
  7. "m7s.live/engine/v4/config"
  8. )
  9. type WebTransportConfig struct {
  10. ListenAddr string `default:":4433" desc:"监听地址端口(IP:PORT)IP可省略"`
  11. CertFile string `desc:"证书文件"`
  12. KeyFile string `desc:"密钥文件"`
  13. }
  14. func (c *WebTransportConfig) OnEvent(event any) {
  15. switch event.(type) {
  16. case FirstConfig:
  17. // if c.CertFile != "" {
  18. // _, err := os.Stat(c.CertFile)
  19. // if err != nil {
  20. // plugin.Error("need certfile", zap.Error(err))
  21. // plugin.Disabled = true
  22. // return
  23. // }
  24. // }
  25. // if c.KeyFile != "" {
  26. // _, err := os.Stat(c.KeyFile)
  27. // if err != nil {
  28. // plugin.Error("need keyfile", zap.Error(err))
  29. // plugin.Disabled = true
  30. // return
  31. // }
  32. // }
  33. mux := http.NewServeMux()
  34. mux.HandleFunc("/play/", func(w http.ResponseWriter, r *http.Request) {
  35. streamPath := r.URL.Path[len("/play/"):]
  36. session := r.Body.(*Session)
  37. session.AcceptSession()
  38. defer session.CloseSession()
  39. // TODO: 多路
  40. s, err := session.AcceptStream()
  41. if err != nil {
  42. return
  43. }
  44. // buf := make([]byte, 1024)
  45. // n, err := s.Read(buf)
  46. // if err != nil {
  47. // return
  48. // }
  49. sub := &WebTransportSubscriber{}
  50. sub.RemoteAddr = r.RemoteAddr
  51. sub.SetIO(s)
  52. sub.ID = strconv.FormatInt(int64(s.StreamID()), 10)
  53. plugin.SubscribeBlock(streamPath, sub, SUBTYPE_FLV)
  54. })
  55. mux.HandleFunc("/push/", func(w http.ResponseWriter, r *http.Request) {
  56. streamPath := r.URL.Path[len("/push/"):]
  57. session := r.Body.(*Session)
  58. session.AcceptSession()
  59. defer session.CloseSession()
  60. // TODO: 多路
  61. s, err := session.AcceptStream()
  62. if err != nil {
  63. return
  64. }
  65. // buf := make([]byte, 1024)
  66. // n, err := s.Read(buf)
  67. // if err != nil {
  68. // return
  69. // }
  70. pub := &WebTransportPublisher{}
  71. pub.SetIO(s)
  72. pub.ID = strconv.FormatInt(int64(s.StreamID()), 10)
  73. if plugin.Publish(streamPath, pub) == nil {
  74. }
  75. })
  76. c.Run(mux)
  77. }
  78. }
  79. func (c *WebTransportConfig) Run(mux http.Handler) {
  80. s := &Server{
  81. Handler: mux,
  82. ListenAddr: c.ListenAddr,
  83. TLSCert: CertFile{Path: c.CertFile, Data: config.LocalCert},
  84. TLSKey: CertFile{Path: c.KeyFile, Data: config.LocalKey},
  85. }
  86. if s.QuicConfig == nil {
  87. s.QuicConfig = &QuicConfig{}
  88. }
  89. s.QuicConfig.EnableDatagrams = true
  90. listener, err := quic.ListenAddr(c.ListenAddr, s.generateTLSConfig(), (*quic.Config)(s.QuicConfig))
  91. if err != nil {
  92. plugin.Disabled = true
  93. return
  94. }
  95. go func() {
  96. <-plugin.Done()
  97. listener.Close()
  98. }()
  99. go func() {
  100. for {
  101. sess, err := listener.Accept(plugin)
  102. if err != nil {
  103. return
  104. }
  105. go s.handleSession(plugin, sess)
  106. }
  107. }()
  108. }
  109. var plugin = InstallPlugin(&WebTransportConfig{})