device.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540
  1. package gb28181
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "net/http"
  7. "os"
  8. "strings"
  9. "sync"
  10. "time"
  11. "go.uber.org/zap"
  12. "m7s.live/engine/v4"
  13. "m7s.live/engine/v4/log"
  14. "m7s.live/plugin/gb28181/v4/utils"
  15. "github.com/ghettovoice/gosip/sip"
  16. myip "github.com/husanpao/ip"
  17. )
  18. const TIME_LAYOUT = "2006-01-02T15:04:05"
  19. // Record 录像
  20. type Record struct {
  21. DeviceID string
  22. Name string
  23. FilePath string
  24. Address string
  25. StartTime string
  26. EndTime string
  27. Secrecy int
  28. Type string
  29. }
  30. func (r *Record) GetPublishStreamPath() string {
  31. return fmt.Sprintf("%s/%s", r.DeviceID, r.StartTime)
  32. }
  33. var (
  34. Devices sync.Map
  35. DeviceNonce sync.Map //保存nonce防止设备伪造
  36. DeviceRegisterCount sync.Map //设备注册次数
  37. )
  38. type DeviceStatus string
  39. const (
  40. DeviceRegisterStatus = "REGISTER"
  41. DeviceRecoverStatus = "RECOVER"
  42. DeviceOnlineStatus = "ONLINE"
  43. DeviceOfflineStatus = "OFFLINE"
  44. DeviceAlarmedStatus = "ALARMED"
  45. )
  46. type Device struct {
  47. ID string
  48. Name string
  49. Manufacturer string
  50. Model string
  51. Owner string
  52. RegisterTime time.Time
  53. UpdateTime time.Time
  54. LastKeepaliveAt time.Time
  55. Status DeviceStatus
  56. SN int
  57. Addr sip.Address `json:"-" yaml:"-"`
  58. SipIP string //设备对应网卡的服务器ip
  59. MediaIP string //设备对应网卡的服务器ip
  60. NetAddr string
  61. channelMap sync.Map
  62. subscriber struct {
  63. CallID string
  64. Timeout time.Time
  65. }
  66. lastSyncTime time.Time
  67. GpsTime time.Time //gps时间
  68. Longitude string //经度
  69. Latitude string //纬度
  70. *log.Logger `json:"-" yaml:"-"`
  71. }
  72. func (d *Device) MarshalJSON() ([]byte, error) {
  73. type Alias Device
  74. data := &struct {
  75. Channels []*Channel
  76. *Alias
  77. }{
  78. Alias: (*Alias)(d),
  79. }
  80. d.channelMap.Range(func(key, value interface{}) bool {
  81. data.Channels = append(data.Channels, value.(*Channel))
  82. return true
  83. })
  84. return json.Marshal(data)
  85. }
  86. func (c *GB28181Config) RecoverDevice(d *Device, req sip.Request) {
  87. from, _ := req.From()
  88. d.Addr = sip.Address{
  89. DisplayName: from.DisplayName,
  90. Uri: from.Address,
  91. }
  92. deviceIp := req.Source()
  93. servIp := req.Recipient().Host()
  94. //根据网卡ip获取对应的公网ip
  95. sipIP := c.routes[servIp]
  96. //如果相等,则服务器是内网通道.海康摄像头不支持...自动获取
  97. if strings.LastIndex(deviceIp, ".") != -1 && strings.LastIndex(servIp, ".") != -1 {
  98. if servIp[0:strings.LastIndex(servIp, ".")] == deviceIp[0:strings.LastIndex(deviceIp, ".")] || sipIP == "" {
  99. sipIP = servIp
  100. }
  101. }
  102. //如果用户配置过则使用配置的
  103. if c.SipIP != "" {
  104. sipIP = c.SipIP
  105. } else if sipIP == "" {
  106. sipIP = myip.InternalIPv4()
  107. }
  108. mediaIp := sipIP
  109. if c.MediaIP != "" {
  110. mediaIp = c.MediaIP
  111. }
  112. d.Info("RecoverDevice", zap.String("deviceIp", deviceIp), zap.String("servIp", servIp), zap.String("sipIP", sipIP), zap.String("mediaIp", mediaIp))
  113. d.Status = DeviceRegisterStatus
  114. d.SipIP = sipIP
  115. d.MediaIP = mediaIp
  116. d.NetAddr = deviceIp
  117. d.UpdateTime = time.Now()
  118. }
  119. func (c *GB28181Config) StoreDevice(id string, req sip.Request) (d *Device) {
  120. from, _ := req.From()
  121. deviceAddr := sip.Address{
  122. DisplayName: from.DisplayName,
  123. Uri: from.Address,
  124. }
  125. deviceIp := req.Source()
  126. if _d, loaded := Devices.Load(id); loaded {
  127. d = _d.(*Device)
  128. d.UpdateTime = time.Now()
  129. d.NetAddr = deviceIp
  130. d.Addr = deviceAddr
  131. d.Debug("UpdateDevice", zap.String("netaddr", d.NetAddr))
  132. } else {
  133. servIp := req.Recipient().Host()
  134. //根据网卡ip获取对应的公网ip
  135. sipIP := c.routes[servIp]
  136. //如果相等,则服务器是内网通道.海康摄像头不支持...自动获取
  137. if strings.LastIndex(deviceIp, ".") != -1 && strings.LastIndex(servIp, ".") != -1 {
  138. if servIp[0:strings.LastIndex(servIp, ".")] == deviceIp[0:strings.LastIndex(deviceIp, ".")] || sipIP == "" {
  139. sipIP = servIp
  140. }
  141. }
  142. //如果用户配置过则使用配置的
  143. if c.SipIP != "" {
  144. sipIP = c.SipIP
  145. } else if sipIP == "" {
  146. sipIP = myip.InternalIPv4()
  147. }
  148. mediaIp := sipIP
  149. if c.MediaIP != "" {
  150. mediaIp = c.MediaIP
  151. }
  152. d = &Device{
  153. ID: id,
  154. RegisterTime: time.Now(),
  155. UpdateTime: time.Now(),
  156. Status: DeviceRegisterStatus,
  157. Addr: deviceAddr,
  158. SipIP: sipIP,
  159. MediaIP: mediaIp,
  160. NetAddr: deviceIp,
  161. Logger: GB28181Plugin.With(zap.String("id", id)),
  162. }
  163. d.Info("StoreDevice", zap.String("deviceIp", deviceIp), zap.String("servIp", servIp), zap.String("sipIP", sipIP), zap.String("mediaIp", mediaIp))
  164. Devices.Store(id, d)
  165. c.SaveDevices()
  166. }
  167. return
  168. }
  169. func (c *GB28181Config) ReadDevices() {
  170. if f, err := os.OpenFile("devices.json", os.O_RDONLY, 0644); err == nil {
  171. defer f.Close()
  172. var items []*Device
  173. if err = json.NewDecoder(f).Decode(&items); err == nil {
  174. for _, item := range items {
  175. if time.Since(item.UpdateTime) < conf.RegisterValidity {
  176. item.Status = "RECOVER"
  177. item.Logger = GB28181Plugin.With(zap.String("id", item.ID))
  178. Devices.Store(item.ID, item)
  179. }
  180. }
  181. }
  182. }
  183. }
  184. func (c *GB28181Config) SaveDevices() {
  185. var item []any
  186. Devices.Range(func(key, value any) bool {
  187. item = append(item, value)
  188. return true
  189. })
  190. if f, err := os.OpenFile("devices.json", os.O_WRONLY|os.O_CREATE, 0644); err == nil {
  191. defer f.Close()
  192. encoder := json.NewEncoder(f)
  193. encoder.SetIndent("", " ")
  194. encoder.Encode(item)
  195. }
  196. }
  197. func (d *Device) addOrUpdateChannel(info ChannelInfo) (c *Channel) {
  198. if old, ok := d.channelMap.Load(info.DeviceID); ok {
  199. c = old.(*Channel)
  200. c.ChannelInfo = info
  201. } else {
  202. c = &Channel{
  203. Device: d,
  204. ChannelInfo: info,
  205. Logger: d.Logger.With(zap.String("channel", info.DeviceID)),
  206. }
  207. if s := engine.Streams.Get(fmt.Sprintf("%s/%s/rtsp", c.Device.ID, c.DeviceID)); s != nil {
  208. c.LiveSubSP = s.Path
  209. } else {
  210. c.LiveSubSP = ""
  211. }
  212. d.channelMap.Store(info.DeviceID, c)
  213. }
  214. return
  215. }
  216. func (d *Device) deleteChannel(DeviceID string) {
  217. d.channelMap.Delete(DeviceID)
  218. }
  219. func (d *Device) UpdateChannels(list ...ChannelInfo) {
  220. for _, c := range list {
  221. if _, ok := conf.ignores[c.DeviceID]; ok {
  222. continue
  223. }
  224. //当父设备非空且存在时、父设备节点增加通道
  225. if c.ParentID != "" {
  226. path := strings.Split(c.ParentID, "/")
  227. parentId := path[len(path)-1]
  228. //如果父ID并非本身所属设备,一般情况下这是因为下级设备上传了目录信息,该信息通常不需要处理。
  229. // 暂时不考虑级联目录的实现
  230. if d.ID != parentId {
  231. if v, ok := Devices.Load(parentId); ok {
  232. parent := v.(*Device)
  233. parent.addOrUpdateChannel(c)
  234. continue
  235. } else {
  236. c.Model = "Directory " + c.Model
  237. c.Status = "NoParent"
  238. }
  239. }
  240. }
  241. //本设备增加通道
  242. channel := d.addOrUpdateChannel(c)
  243. if conf.InviteMode == INVIDE_MODE_AUTO {
  244. channel.TryAutoInvite(&InviteOptions{})
  245. }
  246. if s := engine.Streams.Get("sub/" + c.DeviceID); s != nil {
  247. channel.LiveSubSP = s.Path
  248. } else {
  249. channel.LiveSubSP = ""
  250. }
  251. }
  252. }
  253. func (d *Device) CreateRequest(Method sip.RequestMethod) (req sip.Request) {
  254. d.SN++
  255. callId := sip.CallID(utils.RandNumString(10))
  256. userAgent := sip.UserAgentHeader("Monibuca")
  257. maxForwards := sip.MaxForwards(70) //增加max-forwards为默认值 70
  258. cseq := sip.CSeq{
  259. SeqNo: uint32(d.SN),
  260. MethodName: Method,
  261. }
  262. port := sip.Port(conf.SipPort)
  263. serverAddr := sip.Address{
  264. //DisplayName: sip.String{Str: d.config.Serial},
  265. Uri: &sip.SipUri{
  266. FUser: sip.String{Str: conf.Serial},
  267. FHost: d.SipIP,
  268. FPort: &port,
  269. },
  270. Params: sip.NewParams().Add("tag", sip.String{Str: utils.RandNumString(9)}),
  271. }
  272. req = sip.NewRequest(
  273. "",
  274. Method,
  275. d.Addr.Uri,
  276. "SIP/2.0",
  277. []sip.Header{
  278. serverAddr.AsFromHeader(),
  279. d.Addr.AsToHeader(),
  280. &callId,
  281. &userAgent,
  282. &cseq,
  283. &maxForwards,
  284. serverAddr.AsContactHeader(),
  285. },
  286. "",
  287. nil,
  288. )
  289. req.SetTransport(conf.SipNetwork)
  290. req.SetDestination(d.NetAddr)
  291. //fmt.Printf("构建请求参数:%s", *&req)
  292. // requestMsg.DestAdd, err2 = d.ResolveAddress(requestMsg)
  293. // if err2 != nil {
  294. // return nil
  295. // }
  296. //intranet ip , let's resolve it with public ip
  297. // var deviceIp, deviceSourceIP net.IP
  298. // switch addr := requestMsg.DestAdd.(type) {
  299. // case *net.UDPAddr:
  300. // deviceIp = addr.IP
  301. // case *net.TCPAddr:
  302. // deviceIp = addr.IP
  303. // }
  304. // switch addr2 := d.SourceAddr.(type) {
  305. // case *net.UDPAddr:
  306. // deviceSourceIP = addr2.IP
  307. // case *net.TCPAddr:
  308. // deviceSourceIP = addr2.IP
  309. // }
  310. // if deviceIp.IsPrivate() && !deviceSourceIP.IsPrivate() {
  311. // requestMsg.DestAdd = d.SourceAddr
  312. // }
  313. return
  314. }
  315. func (d *Device) Subscribe() int {
  316. request := d.CreateRequest(sip.SUBSCRIBE)
  317. if d.subscriber.CallID != "" {
  318. callId := sip.CallID(utils.RandNumString(10))
  319. request.AppendHeader(&callId)
  320. }
  321. expires := sip.Expires(3600)
  322. d.subscriber.Timeout = time.Now().Add(time.Second * time.Duration(expires))
  323. contentType := sip.ContentType("Application/MANSCDP+xml")
  324. request.AppendHeader(&contentType)
  325. request.AppendHeader(&expires)
  326. request.SetBody(BuildCatalogXML(d.SN, d.ID), true)
  327. response, err := d.SipRequestForResponse(request)
  328. if err == nil && response != nil {
  329. if response.StatusCode() == http.StatusOK {
  330. callId, _ := request.CallID()
  331. d.subscriber.CallID = string(*callId)
  332. } else {
  333. d.subscriber.CallID = ""
  334. }
  335. return int(response.StatusCode())
  336. }
  337. return http.StatusRequestTimeout
  338. }
  339. func (d *Device) Catalog() int {
  340. //os.Stdout.Write(debug.Stack())
  341. request := d.CreateRequest(sip.MESSAGE)
  342. expires := sip.Expires(3600)
  343. d.subscriber.Timeout = time.Now().Add(time.Second * time.Duration(expires))
  344. contentType := sip.ContentType("Application/MANSCDP+xml")
  345. request.AppendHeader(&contentType)
  346. request.AppendHeader(&expires)
  347. request.SetBody(BuildCatalogXML(d.SN, d.ID), true)
  348. // 输出Sip请求设备通道信息信令
  349. GB28181Plugin.Sugar().Debugf("SIP->Catalog:%s", request)
  350. resp, err := d.SipRequestForResponse(request)
  351. if err == nil && resp != nil {
  352. GB28181Plugin.Sugar().Debugf("SIP<-Catalog Response: %s", resp.String())
  353. return int(resp.StatusCode())
  354. } else if err != nil {
  355. GB28181Plugin.Error("SIP<-Catalog error:", zap.Error(err))
  356. }
  357. return http.StatusRequestTimeout
  358. }
  359. func (d *Device) QueryDeviceInfo() {
  360. for i := time.Duration(5); i < 100; i++ {
  361. time.Sleep(time.Second * i)
  362. request := d.CreateRequest(sip.MESSAGE)
  363. contentType := sip.ContentType("Application/MANSCDP+xml")
  364. request.AppendHeader(&contentType)
  365. request.SetBody(BuildDeviceInfoXML(d.SN, d.ID), true)
  366. response, _ := d.SipRequestForResponse(request)
  367. if response != nil {
  368. // via, _ := response.ViaHop()
  369. // if via != nil && via.Params.Has("received") {
  370. // received, _ := via.Params.Get("received")
  371. // d.SipIP = received.String()
  372. // }
  373. d.Info("QueryDeviceInfo", zap.Uint16("status code", uint16(response.StatusCode())))
  374. if response.StatusCode() == http.StatusOK {
  375. break
  376. }
  377. }
  378. }
  379. }
  380. func (d *Device) SipRequestForResponse(request sip.Request) (sip.Response, error) {
  381. return srv.RequestWithContext(context.Background(), request)
  382. }
  383. // MobilePositionSubscribe 移动位置订阅
  384. func (d *Device) MobilePositionSubscribe(id string, expires time.Duration, interval time.Duration) (code int) {
  385. mobilePosition := d.CreateRequest(sip.SUBSCRIBE)
  386. if d.subscriber.CallID != "" {
  387. callId := sip.CallID(utils.RandNumString(10))
  388. mobilePosition.ReplaceHeaders(callId.Name(), []sip.Header{&callId})
  389. }
  390. expiresHeader := sip.Expires(expires / time.Second)
  391. d.subscriber.Timeout = time.Now().Add(expires)
  392. contentType := sip.ContentType("Application/MANSCDP+xml")
  393. mobilePosition.AppendHeader(&contentType)
  394. mobilePosition.AppendHeader(&expiresHeader)
  395. mobilePosition.SetBody(BuildDevicePositionXML(d.SN, id, int(interval/time.Second)), true)
  396. response, err := d.SipRequestForResponse(mobilePosition)
  397. if err == nil && response != nil {
  398. if response.StatusCode() == http.StatusOK {
  399. callId, _ := mobilePosition.CallID()
  400. d.subscriber.CallID = callId.String()
  401. } else {
  402. d.subscriber.CallID = ""
  403. }
  404. return int(response.StatusCode())
  405. }
  406. return http.StatusRequestTimeout
  407. }
  408. // UpdateChannelPosition 更新通道GPS坐标
  409. func (d *Device) UpdateChannelPosition(channelId string, gpsTime string, lng string, lat string) {
  410. if v, ok := d.channelMap.Load(channelId); ok {
  411. c := v.(*Channel)
  412. c.GpsTime = time.Now() //时间取系统收到的时间,避免设备时间和格式问题
  413. c.Longitude = lng
  414. c.Latitude = lat
  415. c.Debug("update channel position success")
  416. } else {
  417. //如果未找到通道,则更新到设备上
  418. d.GpsTime = time.Now() //时间取系统收到的时间,避免设备时间和格式问题
  419. d.Longitude = lng
  420. d.Latitude = lat
  421. d.Debug("update device position success", zap.String("channelId", channelId))
  422. }
  423. }
  424. // UpdateChannelStatus 目录订阅消息处理:新增/移除/更新通道或者更改通道状态
  425. func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
  426. for _, v := range deviceList {
  427. switch v.Event {
  428. case "ON":
  429. d.Debug("receive channel online notify")
  430. d.channelOnline(v.DeviceID)
  431. case "OFF":
  432. d.Debug("receive channel offline notify")
  433. d.channelOffline(v.DeviceID)
  434. case "VLOST":
  435. d.Debug("receive channel video lost notify")
  436. d.channelOffline(v.DeviceID)
  437. case "DEFECT":
  438. d.Debug("receive channel video defect notify")
  439. d.channelOffline(v.DeviceID)
  440. case "ADD":
  441. d.Debug("receive channel add notify")
  442. channel := ChannelInfo{
  443. DeviceID: v.DeviceID,
  444. ParentID: v.ParentID,
  445. Name: v.Name,
  446. Manufacturer: v.Manufacturer,
  447. Model: v.Model,
  448. Owner: v.Owner,
  449. CivilCode: v.CivilCode,
  450. Address: v.Address,
  451. Port: v.Port,
  452. Parental: v.Parental,
  453. SafetyWay: v.SafetyWay,
  454. RegisterWay: v.RegisterWay,
  455. Secrecy: v.Secrecy,
  456. Status: ChannelStatus(v.Status),
  457. }
  458. d.addOrUpdateChannel(channel)
  459. case "DEL":
  460. //删除
  461. d.Debug("receive channel delete notify")
  462. d.deleteChannel(v.DeviceID)
  463. case "UPDATE":
  464. d.Debug("receive channel update notify")
  465. // 更新通道
  466. channel := ChannelInfo{
  467. DeviceID: v.DeviceID,
  468. ParentID: v.ParentID,
  469. Name: v.Name,
  470. Manufacturer: v.Manufacturer,
  471. Model: v.Model,
  472. Owner: v.Owner,
  473. CivilCode: v.CivilCode,
  474. Address: v.Address,
  475. Port: v.Port,
  476. Parental: v.Parental,
  477. SafetyWay: v.SafetyWay,
  478. RegisterWay: v.RegisterWay,
  479. Secrecy: v.Secrecy,
  480. Status: ChannelStatus(v.Status),
  481. }
  482. d.UpdateChannels(channel)
  483. }
  484. }
  485. }
  486. func (d *Device) channelOnline(DeviceID string) {
  487. if v, ok := d.channelMap.Load(DeviceID); ok {
  488. c := v.(*Channel)
  489. c.Status = ChannelOnStatus
  490. c.Debug("channel online", zap.String("channelId", DeviceID))
  491. } else {
  492. d.Debug("update channel status failed, not found", zap.String("channelId", DeviceID))
  493. }
  494. }
  495. func (d *Device) channelOffline(DeviceID string) {
  496. if v, ok := d.channelMap.Load(DeviceID); ok {
  497. c := v.(*Channel)
  498. c.Status = ChannelOffStatus
  499. c.Debug("channel offline", zap.String("channelId", DeviceID))
  500. } else {
  501. d.Debug("update channel status failed, not found", zap.String("channelId", DeviceID))
  502. }
  503. }