tcp.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package config
  2. import (
  3. "context"
  4. "crypto/tls"
  5. _ "embed"
  6. "net"
  7. "runtime"
  8. "time"
  9. "m7s.live/engine/v4/log"
  10. )
  11. //go:embed local.monibuca.com_bundle.pem
  12. var LocalCert []byte
  13. //go:embed local.monibuca.com.key
  14. var LocalKey []byte
  15. var _ TCPConfig = (*TCP)(nil)
  16. type TCPConfig interface {
  17. ListenTCP(context.Context, TCPPlugin) error
  18. }
  19. type TCP struct {
  20. ListenAddr string `desc:"监听地址,格式为ip:port,ip 可省略默认监听所有网卡"`
  21. ListenAddrTLS string `desc:"监听地址,格式为ip:port,ip 可省略默认监听所有网卡"`
  22. CertFile string `desc:"证书文件"`
  23. KeyFile string `desc:"私钥文件"`
  24. ListenNum int `desc:"同时并行监听数量,0为CPU核心数量"` //同时并行监听数量,0为CPU核心数量
  25. NoDelay bool `desc:"是否禁用Nagle算法"` //是否禁用Nagle算法
  26. }
  27. func (tcp *TCP) listen(l net.Listener, handler func(net.Conn)) {
  28. var tempDelay time.Duration
  29. for {
  30. conn, err := l.Accept()
  31. if err != nil {
  32. if ne, ok := err.(net.Error); ok && ne.Temporary() {
  33. if tempDelay == 0 {
  34. tempDelay = 5 * time.Millisecond
  35. } else {
  36. tempDelay *= 2
  37. }
  38. if max := 1 * time.Second; tempDelay > max {
  39. tempDelay = max
  40. }
  41. log.Warnf("%s: Accept error: %v; retrying in %v", tcp.ListenAddr, err, tempDelay)
  42. time.Sleep(tempDelay)
  43. continue
  44. }
  45. return
  46. }
  47. var tcpConn *net.TCPConn
  48. switch v := conn.(type) {
  49. case *net.TCPConn:
  50. tcpConn = v
  51. case *tls.Conn:
  52. tcpConn = v.NetConn().(*net.TCPConn)
  53. }
  54. if !tcp.NoDelay {
  55. tcpConn.SetNoDelay(false)
  56. }
  57. tempDelay = 0
  58. go handler(conn)
  59. }
  60. }
  61. func (tcp *TCP) ListenTCP(ctx context.Context, plugin TCPPlugin) error {
  62. l, err := net.Listen("tcp", tcp.ListenAddr)
  63. if err != nil {
  64. if Global.LogLang == "zh" {
  65. log.Fatalf("%s: 监听失败: %v", tcp.ListenAddr, err)
  66. } else {
  67. log.Fatalf("%s: Listen error: %v", tcp.ListenAddr, err)
  68. }
  69. return err
  70. }
  71. count := tcp.ListenNum
  72. if count == 0 {
  73. count = runtime.NumCPU()
  74. }
  75. log.Infof("tcp listen %d at %s", count, tcp.ListenAddr)
  76. for i := 0; i < count; i++ {
  77. go tcp.listen(l, plugin.ServeTCP)
  78. }
  79. if tcp.ListenAddrTLS != "" {
  80. keyPair, _ := tls.X509KeyPair(LocalCert, LocalKey)
  81. if tcp.CertFile != "" || tcp.KeyFile != "" {
  82. keyPair, err = tls.LoadX509KeyPair(tcp.CertFile, tcp.KeyFile)
  83. }
  84. if err != nil {
  85. if Global.LogLang == "zh" {
  86. log.Fatalf("加载证书失败: %v", err)
  87. } else {
  88. log.Fatalf("LoadX509KeyPair error: %v", err)
  89. }
  90. return err
  91. }
  92. l, err = tls.Listen("tcp", tcp.ListenAddrTLS, &tls.Config{
  93. Certificates: []tls.Certificate{keyPair},
  94. })
  95. if err != nil {
  96. if Global.LogLang == "zh" {
  97. log.Fatalf("%s: 监听失败: %v", tcp.ListenAddrTLS, err)
  98. } else {
  99. log.Fatalf("%s: Listen error: %v", tcp.ListenAddrTLS, err)
  100. }
  101. return err
  102. }
  103. log.Infof("tls tcp listen %d at %s", count, tcp.ListenAddrTLS)
  104. for i := 0; i < count; i++ {
  105. go tcp.listen(l, plugin.ServeTCP)
  106. }
  107. }
  108. <-ctx.Done()
  109. return l.Close()
  110. }