123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- package webtransport
- import (
- "net/http"
- "strconv"
- "github.com/quic-go/quic-go"
- . "m7s.live/engine/v4"
- "m7s.live/engine/v4/config"
- )
- type WebTransportConfig struct {
- ListenAddr string `default:":4433" desc:"监听地址端口(IP:PORT)IP可省略"`
- CertFile string `desc:"证书文件"`
- KeyFile string `desc:"密钥文件"`
- }
- func (c *WebTransportConfig) OnEvent(event any) {
- switch event.(type) {
- case FirstConfig:
- // if c.CertFile != "" {
- // _, err := os.Stat(c.CertFile)
- // if err != nil {
- // plugin.Error("need certfile", zap.Error(err))
- // plugin.Disabled = true
- // return
- // }
- // }
- // if c.KeyFile != "" {
- // _, err := os.Stat(c.KeyFile)
- // if err != nil {
- // plugin.Error("need keyfile", zap.Error(err))
- // plugin.Disabled = true
- // return
- // }
- // }
- mux := http.NewServeMux()
- mux.HandleFunc("/play/", func(w http.ResponseWriter, r *http.Request) {
- streamPath := r.URL.Path[len("/play/"):]
- session := r.Body.(*Session)
- session.AcceptSession()
- defer session.CloseSession()
- // TODO: 多路
- s, err := session.AcceptStream()
- if err != nil {
- return
- }
- // buf := make([]byte, 1024)
- // n, err := s.Read(buf)
- // if err != nil {
- // return
- // }
- sub := &WebTransportSubscriber{}
- sub.RemoteAddr = r.RemoteAddr
- sub.SetIO(s)
- sub.ID = strconv.FormatInt(int64(s.StreamID()), 10)
- plugin.SubscribeBlock(streamPath, sub, SUBTYPE_FLV)
- })
- mux.HandleFunc("/push/", func(w http.ResponseWriter, r *http.Request) {
- streamPath := r.URL.Path[len("/push/"):]
- session := r.Body.(*Session)
- session.AcceptSession()
- defer session.CloseSession()
- // TODO: 多路
- s, err := session.AcceptStream()
- if err != nil {
- return
- }
- // buf := make([]byte, 1024)
- // n, err := s.Read(buf)
- // if err != nil {
- // return
- // }
- pub := &WebTransportPublisher{}
- pub.SetIO(s)
- pub.ID = strconv.FormatInt(int64(s.StreamID()), 10)
- if plugin.Publish(streamPath, pub) == nil {
- }
- })
- c.Run(mux)
- }
- }
- func (c *WebTransportConfig) Run(mux http.Handler) {
- s := &Server{
- Handler: mux,
- ListenAddr: c.ListenAddr,
- TLSCert: CertFile{Path: c.CertFile, Data: config.LocalCert},
- TLSKey: CertFile{Path: c.KeyFile, Data: config.LocalKey},
- }
- if s.QuicConfig == nil {
- s.QuicConfig = &QuicConfig{}
- }
- s.QuicConfig.EnableDatagrams = true
- listener, err := quic.ListenAddr(c.ListenAddr, s.generateTLSConfig(), (*quic.Config)(s.QuicConfig))
- if err != nil {
- plugin.Disabled = true
- return
- }
- go func() {
- <-plugin.Done()
- listener.Close()
- }()
- go func() {
- for {
- sess, err := listener.Accept(plugin)
- if err != nil {
- return
- }
- go s.handleSession(plugin, sess)
- }
- }()
- }
- var plugin = InstallPlugin(&WebTransportConfig{})
|