server.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. package gb28181
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/ghettovoice/gosip"
  9. "github.com/ghettovoice/gosip/log"
  10. "github.com/ghettovoice/gosip/sip"
  11. "github.com/logrusorgru/aurora/v4"
  12. "go.uber.org/zap"
  13. . "m7s.live/engine/v4"
  14. "m7s.live/plugin/gb28181/v4/utils"
  15. )
  16. var srv gosip.Server
  17. const MaxRegisterCount = 3
  18. func FindChannel(deviceId string, channelId string) (c *Channel) {
  19. if v, ok := Devices.Load(deviceId); ok {
  20. d := v.(*Device)
  21. if v, ok := d.channelMap.Load(channelId); ok {
  22. return v.(*Channel)
  23. }
  24. }
  25. return
  26. }
  27. var levelMap = map[string]log.Level{
  28. "trace": log.TraceLevel,
  29. "debug": log.DebugLevel,
  30. "info": log.InfoLevel,
  31. "warn": log.WarnLevel,
  32. "error": log.ErrorLevel,
  33. "fatal": log.FatalLevel,
  34. "panic": log.PanicLevel,
  35. }
  36. func GetSipServer(transport string) gosip.Server {
  37. return srv
  38. }
  39. var sn = 0
  40. func CreateRequest(exposedId string, Method sip.RequestMethod, recipient *sip.Address, netAddr string) (req sip.Request) {
  41. sn++
  42. callId := sip.CallID(utils.RandNumString(10))
  43. userAgent := sip.UserAgentHeader("Monibuca")
  44. cseq := sip.CSeq{
  45. SeqNo: uint32(sn),
  46. MethodName: Method,
  47. }
  48. port := sip.Port(conf.SipPort)
  49. serverAddr := sip.Address{
  50. //DisplayName: sip.String{Str: d.config.Serial},
  51. Uri: &sip.SipUri{
  52. FUser: sip.String{Str: exposedId},
  53. FHost: conf.SipIP,
  54. FPort: &port,
  55. },
  56. Params: sip.NewParams().Add("tag", sip.String{Str: utils.RandNumString(9)}),
  57. }
  58. req = sip.NewRequest(
  59. "",
  60. Method,
  61. recipient.Uri,
  62. "SIP/2.0",
  63. []sip.Header{
  64. serverAddr.AsFromHeader(),
  65. recipient.AsToHeader(),
  66. &callId,
  67. &userAgent,
  68. &cseq,
  69. serverAddr.AsContactHeader(),
  70. },
  71. "",
  72. nil,
  73. )
  74. req.SetTransport(conf.SipNetwork)
  75. req.SetDestination(netAddr)
  76. //fmt.Printf("构建请求参数:%s", *&req)
  77. // requestMsg.DestAdd, err2 = d.ResolveAddress(requestMsg)
  78. // if err2 != nil {
  79. // return nil
  80. // }
  81. //intranet ip , let's resolve it with public ip
  82. // var deviceIp, deviceSourceIP net.IP
  83. // switch addr := requestMsg.DestAdd.(type) {
  84. // case *net.UDPAddr:
  85. // deviceIp = addr.IP
  86. // case *net.TCPAddr:
  87. // deviceIp = addr.IP
  88. // }
  89. // switch addr2 := d.SourceAddr.(type) {
  90. // case *net.UDPAddr:
  91. // deviceSourceIP = addr2.IP
  92. // case *net.TCPAddr:
  93. // deviceSourceIP = addr2.IP
  94. // }
  95. // if deviceIp.IsPrivate() && !deviceSourceIP.IsPrivate() {
  96. // requestMsg.DestAdd = d.SourceAddr
  97. // }
  98. return
  99. }
  100. func RequestForResponse(transport string, request sip.Request,
  101. options ...gosip.RequestWithContextOption) (sip.Response, error) {
  102. return (GetSipServer(transport)).RequestWithContext(context.Background(), request, options...)
  103. }
  104. func (c *GB28181Config) startServer() {
  105. addr := c.ListenAddr + ":" + strconv.Itoa(int(c.SipPort))
  106. logger := utils.NewZapLogger(GB28181Plugin.Logger, "GB SIP Server", nil)
  107. logger.SetLevel(uint32(levelMap[EngineConfig.LogLevel]))
  108. // logger := log.NewDefaultLogrusLogger().WithPrefix("GB SIP Server")
  109. srvConf := gosip.ServerConfig{}
  110. if c.SipIP != "" {
  111. srvConf.Host = c.SipIP
  112. }
  113. srv = gosip.NewServer(srvConf, nil, nil, logger)
  114. srv.OnRequest(sip.REGISTER, c.OnRegister)
  115. srv.OnRequest(sip.MESSAGE, c.OnMessage)
  116. srv.OnRequest(sip.NOTIFY, c.OnNotify)
  117. srv.OnRequest(sip.BYE, c.OnBye)
  118. err := srv.Listen(strings.ToLower(c.SipNetwork), addr)
  119. if err != nil {
  120. GB28181Plugin.Logger.Error("gb28181 server listen", zap.Error(err))
  121. } else {
  122. GB28181Plugin.Info(fmt.Sprint(aurora.Green("Server gb28181 start at"), aurora.BrightBlue(addr)))
  123. }
  124. if c.MediaNetwork == "tcp" {
  125. c.tcpPorts.Init(c.MediaPortMin, c.MediaPortMax)
  126. } else {
  127. c.udpPorts.Init(c.MediaPortMin, c.MediaPortMax)
  128. }
  129. go c.startJob()
  130. }
  131. // func queryCatalog(config *transaction.Config) {
  132. // t := time.NewTicker(time.Duration(config.CatalogInterval) * time.Second)
  133. // for range t.C {
  134. // Devices.Range(func(key, value interface{}) bool {
  135. // device := value.(*Device)
  136. // if time.Since(device.UpdateTime) > time.Duration(config.RegisterValidity)*time.Second {
  137. // Devices.Delete(key)
  138. // } else if device.Channels != nil {
  139. // go device.Catalog()
  140. // }
  141. // return true
  142. // })
  143. // }
  144. // }
  145. // 定时任务
  146. func (c *GB28181Config) startJob() {
  147. statusTick := time.NewTicker(c.HeartbeatInterval / 2)
  148. banTick := time.NewTicker(c.RemoveBanInterval)
  149. linkTick := time.NewTicker(time.Millisecond * 100)
  150. GB28181Plugin.Debug("start job")
  151. for {
  152. select {
  153. case <-banTick.C:
  154. if c.Username != "" || c.Password != "" {
  155. c.removeBanDevice()
  156. }
  157. case <-statusTick.C:
  158. c.statusCheck()
  159. case <-linkTick.C:
  160. RecordQueryLink.cleanTimeout()
  161. }
  162. }
  163. }
  164. func (c *GB28181Config) removeBanDevice() {
  165. DeviceRegisterCount.Range(func(key, value interface{}) bool {
  166. if value.(int) > MaxRegisterCount {
  167. DeviceRegisterCount.Delete(key)
  168. }
  169. return true
  170. })
  171. }
  172. // statusCheck
  173. // - 当设备超过 3 倍心跳时间未发送过心跳(通过 UpdateTime 判断), 视为离线
  174. // - 当设备超过注册有效期内为发送过消息,则从设备列表中删除
  175. // UpdateTime 在设备发送心跳之外的消息也会被更新,相对于 LastKeepaliveAt 更能体现出设备最会一次活跃的时间
  176. func (c *GB28181Config) statusCheck() {
  177. Devices.Range(func(key, value any) bool {
  178. d := value.(*Device)
  179. if time.Since(d.UpdateTime) > c.RegisterValidity {
  180. Devices.Delete(key)
  181. GB28181Plugin.Info("Device register timeout",
  182. zap.String("id", d.ID),
  183. zap.Time("registerTime", d.RegisterTime),
  184. zap.Time("updateTime", d.UpdateTime),
  185. )
  186. } else if time.Since(d.UpdateTime) > c.HeartbeatInterval*3 {
  187. d.Status = DeviceOfflineStatus
  188. d.channelMap.Range(func(key, value any) bool {
  189. ch := value.(*Channel)
  190. ch.Status = ChannelOffStatus
  191. return true
  192. })
  193. GB28181Plugin.Info("Device offline", zap.String("id", d.ID), zap.Time("updateTime", d.UpdateTime))
  194. }
  195. return true
  196. })
  197. }