server.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. package gb28181
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/ghettovoice/gosip"
  9. "github.com/ghettovoice/gosip/log"
  10. "github.com/ghettovoice/gosip/sip"
  11. "github.com/logrusorgru/aurora/v4"
  12. "go.uber.org/zap"
  13. . "m7s.live/engine/v4"
  14. "m7s.live/plugin/gb28181/v4/utils"
  15. )
  16. var srv gosip.Server
  17. const MaxRegisterCount = 3
  18. func FindChannel(deviceId string, channelId string) (c *Channel) {
  19. //fmt.Println("find channel11111111111111111:",deviceId,channelId)
  20. if deviceId == LiveRoom && len(channelId) == 20 {
  21. // 表示为gb28181 来的实时流
  22. if true{
  23. deviceId = channelId
  24. }
  25. if false{
  26. deviceId = getDeviceIdByChannelId(channelId)
  27. fmt.Println("设备编号为:",deviceId)
  28. if deviceId == ""{
  29. deviceId = channelId
  30. }
  31. }
  32. }
  33. if v, ok := Devices.Load(deviceId); ok {
  34. d := v.(*Device)
  35. if v, ok := d.channelMap.Load(channelId); ok {
  36. return v.(*Channel)
  37. }else{
  38. fmt.Println("没找到通道:",channelId)
  39. }
  40. }else{
  41. fmt.Println("没找到设备:",deviceId)
  42. }
  43. return
  44. }
  45. var levelMap = map[string]log.Level{
  46. "trace": log.TraceLevel,
  47. "debug": log.DebugLevel,
  48. "info": log.InfoLevel,
  49. "warn": log.WarnLevel,
  50. "error": log.ErrorLevel,
  51. "fatal": log.FatalLevel,
  52. "panic": log.PanicLevel,
  53. }
  54. func GetSipServer(transport string) gosip.Server {
  55. return srv
  56. }
  57. var sn = 0
  58. func CreateRequest(exposedId string, Method sip.RequestMethod, recipient *sip.Address, netAddr string) (req sip.Request) {
  59. sn++
  60. callId := sip.CallID(utils.RandNumString(10))
  61. userAgent := sip.UserAgentHeader("Monibuca")
  62. cseq := sip.CSeq{
  63. SeqNo: uint32(sn),
  64. MethodName: Method,
  65. }
  66. port := sip.Port(conf.SipPort)
  67. serverAddr := sip.Address{
  68. //DisplayName: sip.String{Str: d.config.Serial},
  69. Uri: &sip.SipUri{
  70. FUser: sip.String{Str: exposedId},
  71. FHost: conf.SipIP,
  72. FPort: &port,
  73. },
  74. Params: sip.NewParams().Add("tag", sip.String{Str: utils.RandNumString(9)}),
  75. }
  76. req = sip.NewRequest(
  77. "",
  78. Method,
  79. recipient.Uri,
  80. "SIP/2.0",
  81. []sip.Header{
  82. serverAddr.AsFromHeader(),
  83. recipient.AsToHeader(),
  84. &callId,
  85. &userAgent,
  86. &cseq,
  87. serverAddr.AsContactHeader(),
  88. },
  89. "",
  90. nil,
  91. )
  92. req.SetTransport(conf.SipNetwork)
  93. req.SetDestination(netAddr)
  94. //fmt.Printf("构建请求参数:%s", *&req)
  95. // requestMsg.DestAdd, err2 = d.ResolveAddress(requestMsg)
  96. // if err2 != nil {
  97. // return nil
  98. // }
  99. //intranet ip , let's resolve it with public ip
  100. // var deviceIp, deviceSourceIP net.IP
  101. // switch addr := requestMsg.DestAdd.(type) {
  102. // case *net.UDPAddr:
  103. // deviceIp = addr.IP
  104. // case *net.TCPAddr:
  105. // deviceIp = addr.IP
  106. // }
  107. // switch addr2 := d.SourceAddr.(type) {
  108. // case *net.UDPAddr:
  109. // deviceSourceIP = addr2.IP
  110. // case *net.TCPAddr:
  111. // deviceSourceIP = addr2.IP
  112. // }
  113. // if deviceIp.IsPrivate() && !deviceSourceIP.IsPrivate() {
  114. // requestMsg.DestAdd = d.SourceAddr
  115. // }
  116. return
  117. }
  118. func RequestForResponse(transport string, request sip.Request,
  119. options ...gosip.RequestWithContextOption) (sip.Response, error) {
  120. return (GetSipServer(transport)).RequestWithContext(context.Background(), request, options...)
  121. }
  122. func (c *GB28181Config) startServer() {
  123. //fmt.Println("START server :",*c)
  124. addr := c.ListenAddr + ":" + strconv.Itoa(int(c.SipPort))
  125. //fmt.Println("START server addr:",addr)
  126. logger := utils.NewZapLogger(GB28181Plugin.Logger, "GB SIP Server", nil)
  127. logger.SetLevel(uint32(levelMap[EngineConfig.LogLevel]))
  128. // logger := log.NewDefaultLogrusLogger().WithPrefix("GB SIP Server")
  129. srvConf := gosip.ServerConfig{}
  130. if c.SipIP != "" {
  131. srvConf.Host = c.SipIP
  132. }
  133. //fmt.Println("START server host:",srvConf.Host)
  134. srv = gosip.NewServer(srvConf, nil, nil, logger)
  135. srv.OnRequest(sip.REGISTER, c.OnRegister)
  136. srv.OnRequest(sip.MESSAGE, c.OnMessage)
  137. srv.OnRequest(sip.NOTIFY, c.OnNotify)
  138. srv.OnRequest(sip.BYE, c.OnBye)
  139. err := srv.Listen(strings.ToLower(c.SipNetwork), addr)
  140. if err != nil {
  141. GB28181Plugin.Logger.Error("gb28181 server listen", zap.Error(err))
  142. } else {
  143. GB28181Plugin.Info(fmt.Sprint(aurora.Green("Server gb28181 start at"), aurora.BrightBlue(addr)))
  144. }
  145. if c.MediaNetwork == "tcp" {
  146. c.tcpPorts.Init(c.MediaPortMin, c.MediaPortMax)
  147. } else {
  148. c.udpPorts.Init(c.MediaPortMin, c.MediaPortMax)
  149. }
  150. go c.startJob()
  151. }
  152. // func queryCatalog(config *transaction.Config) {
  153. // t := time.NewTicker(time.Duration(config.CatalogInterval) * time.Second)
  154. // for range t.C {
  155. // Devices.Range(func(key, value interface{}) bool {
  156. // device := value.(*Device)
  157. // if time.Since(device.UpdateTime) > time.Duration(config.RegisterValidity)*time.Second {
  158. // Devices.Delete(key)
  159. // } else if device.Channels != nil {
  160. // go device.Catalog()
  161. // }
  162. // return true
  163. // })
  164. // }
  165. // }
  166. // 定时任务
  167. func (c *GB28181Config) startJob() {
  168. statusTick := time.NewTicker(c.HeartbeatInterval / 2)
  169. banTick := time.NewTicker(c.RemoveBanInterval)
  170. linkTick := time.NewTicker(time.Millisecond * 100)
  171. GB28181Plugin.Debug("start job")
  172. for {
  173. select {
  174. case <-banTick.C:
  175. if c.Username != "" || c.Password != "" {
  176. c.removeBanDevice()
  177. }
  178. case <-statusTick.C:
  179. c.statusCheck()
  180. case <-linkTick.C:
  181. RecordQueryLink.cleanTimeout()
  182. }
  183. }
  184. }
  185. func (c *GB28181Config) removeBanDevice() {
  186. DeviceRegisterCount.Range(func(key, value interface{}) bool {
  187. if value.(int) > MaxRegisterCount {
  188. DeviceRegisterCount.Delete(key)
  189. }
  190. return true
  191. })
  192. }
  193. // statusCheck
  194. // - 当设备超过 3 倍心跳时间未发送过心跳(通过 UpdateTime 判断), 视为离线
  195. // - 当设备超过注册有效期内为发送过消息,则从设备列表中删除
  196. // UpdateTime 在设备发送心跳之外的消息也会被更新,相对于 LastKeepaliveAt 更能体现出设备最会一次活跃的时间
  197. func (c *GB28181Config) statusCheck() {
  198. Devices.Range(func(key, value any) bool {
  199. d := value.(*Device)
  200. if time.Since(d.UpdateTime) > c.RegisterValidity {
  201. // TODO 这里把注释掉了,不删除
  202. Devices.Delete(key)
  203. GB28181Plugin.Info("Device register timeout",
  204. zap.String("id", d.ID),
  205. zap.Time("registerTime", d.RegisterTime),
  206. zap.Time("updateTime", d.UpdateTime),
  207. )
  208. } else if time.Since(d.UpdateTime) > c.HeartbeatInterval*3 {
  209. preStatus := d.Status
  210. d.Status = DeviceOfflineStatus
  211. if preStatus != DeviceOfflineStatus{
  212. go d.UpdateDeviceStatus()
  213. }
  214. // TODO 更新为离线
  215. d.channelMap.Range(func(key, value any) bool {
  216. ch := value.(*Channel)
  217. preStatus := ch.Status
  218. ch.Status = ChannelOffStatus
  219. if preStatus != ChannelOffStatus{
  220. ch.UpdateChanelStatus()
  221. }
  222. return true
  223. })
  224. GB28181Plugin.Info("Device offline", zap.String("id", d.ID), zap.Time("updateTime", d.UpdateTime))
  225. }
  226. return true
  227. })
  228. }