device.go 19 KB


  1. package gb28181
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "net/http"
  7. "os"
  8. "strings"
  9. "sync"
  10. "time"
  11. "go.uber.org/zap"
  12. "m7s.live/engine/v4"
  13. "m7s.live/engine/v4/log"
  14. "m7s.live/engine/v4/db"
  15. "m7s.live/plugin/gb28181/v4/utils"
  16. "github.com/ghettovoice/gosip/sip"
  17. myip "github.com/husanpao/ip"
  18. )
  19. const TIME_LAYOUT = "2006-01-02T15:04:05"
  20. // Record 录像
  21. type Record struct {
  22. DeviceID string
  23. Name string
  24. FilePath string
  25. Address string
  26. StartTime string
  27. EndTime string
  28. Secrecy int
  29. Type string
  30. }
  31. func (r *Record) GetPublishStreamPath() string {
  32. return fmt.Sprintf("%s/%s", r.DeviceID, r.StartTime)
  33. }
  34. var (
  35. Devices sync.Map
  36. DeviceNonce sync.Map //保存nonce防止设备伪造
  37. DeviceRegisterCount sync.Map //设备注册次数
  38. DeviceRecordState sync.Map
  39. )
  40. type DeviceStatus string
  41. const (
  42. DeviceRegisterStatus = "REGISTER"
  43. DeviceRecoverStatus = "RECOVER"
  44. DeviceOnlineStatus = "ONLINE"
  45. DeviceOfflineStatus = "OFFLINE"
  46. DeviceAlarmedStatus = "ALARMED"
  47. )
  48. type Device struct {
  49. ID string
  50. Name string
  51. Manufacturer string
  52. Model string
  53. Owner string
  54. RegisterTime time.Time
  55. UpdateTime time.Time
  56. LastKeepaliveAt time.Time
  57. Status DeviceStatus
  58. SN int
  59. Addr sip.Address `json:"-" yaml:"-"`
  60. SipIP string //设备对应网卡的服务器ip
  61. MediaIP string //设备对应网卡的服务器ip
  62. NetAddr string
  63. channelMap sync.Map
  64. subscriber struct {
  65. CallID string
  66. Timeout time.Time
  67. }
  68. lastSyncTime time.Time
  69. GpsTime time.Time //gps时间
  70. Longitude string //经度
  71. Latitude string //纬度
  72. *log.Logger `json:"-" yaml:"-"`
  73. }
  74. // 更新设备信息
  75. func(d *Device) UpdateDeviceInfo(){
  76. //fmt.Println("更新设备信息:",*d)
  77. GB28181Plugin.Info("更新设备信息", zap.String("id", d.ID))
  78. database := db.DB()
  79. updateMap := map[string]interface{}{}
  80. if d.Status == DeviceOnlineStatus{
  81. updateMap["state"] = 1
  82. }else if d.Status == DeviceOfflineStatus{
  83. updateMap["state"] = 0
  84. }
  85. updateMap["name"] = d.Name
  86. updateMap["manufacturer"] = d.Manufacturer
  87. updateMap["model"] = d.Model
  88. updateMap["owner"] = d.Owner
  89. updateMap["register_time"] = d.RegisterTime
  90. updateMap["last_keepalive_at"] = d.LastKeepaliveAt
  91. updateMap["net_addr"] = d.NetAddr
  92. updateMap["longitude"] = d.Longitude
  93. updateMap["latitude"] = d.Latitude
  94. err := database.Table("t_sdp_delivery").Where("device_id=?",d.ID).Updates(updateMap).Error
  95. //err := database.Exec(`update t_sdp_channel set live_status=? where channel_id=?`,info.DeviceID,d.ID).Error
  96. if err != nil {
  97. GB28181Plugin.Error("更新设备信息失败:",zap.String("id", d.ID),zap.String("error", err.Error()))
  98. //fmt.Println("更新设备信息失败:",err)
  99. }
  100. }
  101. // 更新设备状态
  102. func(d *Device) UpdateDeviceStatus(){
  103. GB28181Plugin.Info("更新设备状态", zap.String("id", d.ID), zap.Any("state", d.Status))
  104. database := db.DB()
  105. status := 0
  106. updateMap := map[string]interface{}{}
  107. if d.Status == DeviceOnlineStatus{
  108. updateMap["state"] = 1
  109. status = 1
  110. }else if d.Status == DeviceOfflineStatus{
  111. updateMap["state"] = 0
  112. }else{
  113. return
  114. }
  115. err := database.Table("t_sdp_delivery").Where("device_id=?",d.ID).Updates(updateMap).Error
  116. if err != nil {
  117. GB28181Plugin.Error("更新设备状态失败:",zap.String("id", d.ID),zap.String("error", err.Error()))
  118. }
  119. SendMessageToRedis(d.ID,status)
  120. }
  121. // 检查设备是否出库
  122. func CheckDeviceDelivery(id string) bool {
  123. isDelivery := false
  124. database := db.DB()
  125. deviceId := 0
  126. //fmt.Println("设备注册校验:",id)
  127. err := database.Raw(`select id from t_sdp_delivery where device_id=?`,id).Find(&deviceId).Error
  128. if err == nil && deviceId != 0 {
  129. isDelivery = true
  130. }else{
  131. GB28181Plugin.Error("设备注册校验不通过",zap.String("id", id),zap.String("error", "设备未出库"))
  132. //fmt.Println("设备注册校验不通过(未出库设备):",id)
  133. }
  134. return isDelivery
  135. }
  136. func (d *Device) MarshalJSON() ([]byte, error) {
  137. type Alias Device
  138. data := &struct {
  139. Channels []*Channel
  140. *Alias
  141. }{
  142. Alias: (*Alias)(d),
  143. }
  144. d.channelMap.Range(func(key, value interface{}) bool {
  145. data.Channels = append(data.Channels, value.(*Channel))
  146. return true
  147. })
  148. return json.Marshal(data)
  149. }
  150. func (c *GB28181Config) RecoverDevice(d *Device, req sip.Request) {
  151. from, _ := req.From()
  152. d.Addr = sip.Address{
  153. DisplayName: from.DisplayName,
  154. Uri: from.Address,
  155. }
  156. deviceIp := req.Source()
  157. servIp := req.Recipient().Host()
  158. //根据网卡ip获取对应的公网ip
  159. sipIP := c.routes[servIp]
  160. //如果相等,则服务器是内网通道.海康摄像头不支持...自动获取
  161. if strings.LastIndex(deviceIp, ".") != -1 && strings.LastIndex(servIp, ".") != -1 {
  162. if servIp[0:strings.LastIndex(servIp, ".")] == deviceIp[0:strings.LastIndex(deviceIp, ".")] || sipIP == "" {
  163. sipIP = servIp
  164. }
  165. }
  166. //如果用户配置过则使用配置的
  167. if c.SipIP != "" {
  168. sipIP = c.SipIP
  169. } else if sipIP == "" {
  170. sipIP = myip.InternalIPv4()
  171. }
  172. mediaIp := sipIP
  173. if c.MediaIP != "" {
  174. mediaIp = c.MediaIP
  175. }
  176. d.Info("RecoverDevice", zap.String("deviceIp", deviceIp), zap.String("servIp", servIp), zap.String("sipIP", sipIP), zap.String("mediaIp", mediaIp))
  177. d.Status = DeviceRegisterStatus
  178. d.SipIP = sipIP
  179. d.MediaIP = mediaIp
  180. d.NetAddr = deviceIp
  181. d.UpdateTime = time.Now()
  182. }
  183. func (c *GB28181Config) StoreDevice(id string, req sip.Request) (d *Device) {
  184. from, _ := req.From()
  185. deviceAddr := sip.Address{
  186. DisplayName: from.DisplayName,
  187. Uri: from.Address,
  188. }
  189. deviceIp := req.Source()
  190. if _d, loaded := Devices.Load(id); loaded {
  191. d = _d.(*Device)
  192. d.UpdateTime = time.Now()
  193. d.NetAddr = deviceIp
  194. d.Addr = deviceAddr
  195. d.UpdateDeviceInfo()
  196. d.Debug("UpdateDevice", zap.String("netaddr", d.NetAddr))
  197. } else {
  198. servIp := req.Recipient().Host()
  199. //根据网卡ip获取对应的公网ip
  200. sipIP := c.routes[servIp]
  201. //如果相等,则服务器是内网通道.海康摄像头不支持...自动获取
  202. if strings.LastIndex(deviceIp, ".") != -1 && strings.LastIndex(servIp, ".") != -1 {
  203. if servIp[0:strings.LastIndex(servIp, ".")] == deviceIp[0:strings.LastIndex(deviceIp, ".")] || sipIP == "" {
  204. sipIP = servIp
  205. }
  206. }
  207. //如果用户配置过则使用配置的
  208. if c.SipIP != "" {
  209. sipIP = c.SipIP
  210. } else if sipIP == "" {
  211. sipIP = myip.InternalIPv4()
  212. }
  213. mediaIp := sipIP
  214. if c.MediaIP != "" {
  215. mediaIp = c.MediaIP
  216. }
  217. d = &Device{
  218. ID: id,
  219. RegisterTime: time.Now(),
  220. UpdateTime: time.Now(),
  221. Status: DeviceRegisterStatus,
  222. Addr: deviceAddr,
  223. SipIP: sipIP,
  224. MediaIP: mediaIp,
  225. NetAddr: deviceIp,
  226. Logger: GB28181Plugin.With(zap.String("id", id)),
  227. }
  228. d.Info("StoreDevice", zap.String("deviceIp", deviceIp), zap.String("servIp", servIp), zap.String("sipIP", sipIP), zap.String("mediaIp", mediaIp))
  229. Devices.Store(id, d)
  230. c.SaveDevices()
  231. d.UpdateDeviceInfo()
  232. }
  233. return
  234. }
  235. func (c *GB28181Config) ReadDevices() {
  236. /*if false{
  237. database := db.DB()
  238. var items []Device
  239. err := database.Raw("select * from t_sdp_delivery where sip_id =? and type=2",conf.SipId).Find(&items).Error
  240. fmt.Println("items:",items,err)
  241. if err == nil {
  242. for _, item := range items {
  243. item.ID = item.DeviceId
  244. fmt.Println("ITEM device_id;",item.ID,item.DeviceId)
  245. item.UpdateTime = time.Now().Add(5* time.Minute)
  246. item.Status = DeviceRecoverStatus
  247. item.Logger = GB28181Plugin.With(zap.String("id", item.ID))
  248. Devices.Store(item.ID, &item)
  249. }
  250. }
  251. }*/
  252. if true{
  253. //fmt.Println("recover 000000000000000000000000000000000000000000:")
  254. if f, err := os.OpenFile("devices.json", os.O_RDONLY, 0644); err == nil {
  255. defer f.Close()
  256. var items []*Device
  257. if err = json.NewDecoder(f).Decode(&items); err == nil {
  258. //fmt.Println("recover 111111111111111111111111111111111111:",items)
  259. for _, item := range items {
  260. // 注释掉了更新时间小于有效期
  261. /*if time.Since(item.UpdateTime) < conf.RegisterValidity {
  262. item.Status = DeviceRecoverStatus
  263. item.Logger = GB28181Plugin.With(zap.String("id", item.ID))
  264. Devices.Store(item.ID, item)
  265. }*/
  266. if time.Since(item.UpdateTime) > conf.RegisterValidity {
  267. item.UpdateTime = time.Now().Add(5* time.Minute)
  268. }
  269. //fmt.Println("recover devvice111111111111111111111:",item.ID)
  270. item.Status = DeviceRecoverStatus
  271. item.Logger = GB28181Plugin.With(zap.String("id", item.ID))
  272. Devices.Store(item.ID, item)
  273. }
  274. }else{
  275. fmt.Println("err 111111111111",err)
  276. }
  277. }
  278. }
  279. }
  280. func (c *GB28181Config) SaveDevices() {
  281. var item []any
  282. Devices.Range(func(key, value any) bool {
  283. item = append(item, value)
  284. return true
  285. })
  286. if f, err := os.OpenFile("devices.json", os.O_WRONLY|os.O_CREATE, 0644); err == nil {
  287. defer f.Close()
  288. encoder := json.NewEncoder(f)
  289. encoder.SetIndent("", " ")
  290. encoder.Encode(item)
  291. }
  292. }
  293. func (d *Device) addOrUpdateChannel(info ChannelInfo) (c *Channel) {
  294. if old, ok := d.channelMap.Load(info.DeviceID); ok {
  295. c = old.(*Channel)
  296. if info.Name != c.ChannelInfo.Name || info.Address != c.ChannelInfo.Address {
  297. c.UpdateChanelInfo()
  298. }
  299. c.ChannelInfo = info
  300. } else {
  301. c = &Channel{
  302. Device: d,
  303. ChannelInfo: info,
  304. Logger: d.Logger.With(zap.String("channel", info.DeviceID)),
  305. }
  306. if s := engine.Streams.Get(fmt.Sprintf("%s/%s/rtsp", c.Device.ID, c.DeviceID)); s != nil {
  307. c.LiveSubSP = s.Path
  308. } else {
  309. c.LiveSubSP = ""
  310. }
  311. d.channelMap.Store(info.DeviceID, c)
  312. c.UpdateChanelInfo()
  313. }
  314. return
  315. }
  316. func (d *Device) deleteChannel(DeviceID string) {
  317. d.channelMap.Delete(DeviceID)
  318. }
  319. func (d *Device) UpdateChannels(list ...ChannelInfo) {
  320. for _, c := range list {
  321. if _, ok := conf.ignores[c.DeviceID]; ok {
  322. continue
  323. }
  324. //当父设备非空且存在时、父设备节点增加通道
  325. if c.ParentID != "" {
  326. path := strings.Split(c.ParentID, "/")
  327. parentId := path[len(path)-1]
  328. //如果父ID并非本身所属设备,一般情况下这是因为下级设备上传了目录信息,该信息通常不需要处理。
  329. // 暂时不考虑级联目录的实现
  330. if d.ID != parentId {
  331. if v, ok := Devices.Load(parentId); ok {
  332. parent := v.(*Device)
  333. parent.addOrUpdateChannel(c)
  334. continue
  335. } else {
  336. c.Model = "Directory " + c.Model
  337. c.Status = "NoParent"
  338. }
  339. }
  340. }
  341. //本设备增加通道
  342. channel := d.addOrUpdateChannel(c)
  343. if GetInvistMode(conf.InviteMode,channel.Model,channel.DeviceID) == INVIDE_MODE_AUTO{
  344. //if conf.InviteMode == INVIDE_MODE_AUTO {
  345. //fmt.Println("UpdateChannels 11111111111111111111")
  346. channel.TryAutoInvite(&InviteOptions{})
  347. }
  348. if s := engine.Streams.Get("sub/" + c.DeviceID); s != nil {
  349. channel.LiveSubSP = s.Path
  350. } else {
  351. channel.LiveSubSP = ""
  352. }
  353. }
  354. }
  355. func (d *Device) CreateRequest(Method sip.RequestMethod) (req sip.Request) {
  356. d.SN++
  357. callId := sip.CallID(utils.RandNumString(10))
  358. userAgent := sip.UserAgentHeader("Monibuca")
  359. maxForwards := sip.MaxForwards(70) //增加max-forwards为默认值 70
  360. cseq := sip.CSeq{
  361. SeqNo: uint32(d.SN),
  362. MethodName: Method,
  363. }
  364. port := sip.Port(conf.SipPort)
  365. serverAddr := sip.Address{
  366. //DisplayName: sip.String{Str: d.config.Serial},
  367. Uri: &sip.SipUri{
  368. FUser: sip.String{Str: conf.Serial},
  369. FHost: d.SipIP,
  370. FPort: &port,
  371. },
  372. Params: sip.NewParams().Add("tag", sip.String{Str: utils.RandNumString(9)}),
  373. }
  374. req = sip.NewRequest(
  375. "",
  376. Method,
  377. d.Addr.Uri,
  378. "SIP/2.0",
  379. []sip.Header{
  380. serverAddr.AsFromHeader(),
  381. d.Addr.AsToHeader(),
  382. &callId,
  383. &userAgent,
  384. &cseq,
  385. &maxForwards,
  386. serverAddr.AsContactHeader(),
  387. },
  388. "",
  389. nil,
  390. )
  391. req.SetTransport(conf.SipNetwork)
  392. req.SetDestination(d.NetAddr)
  393. //fmt.Printf("构建请求参数:%s", *&req)
  394. // requestMsg.DestAdd, err2 = d.ResolveAddress(requestMsg)
  395. // if err2 != nil {
  396. // return nil
  397. // }
  398. //intranet ip , let's resolve it with public ip
  399. // var deviceIp, deviceSourceIP net.IP
  400. // switch addr := requestMsg.DestAdd.(type) {
  401. // case *net.UDPAddr:
  402. // deviceIp = addr.IP
  403. // case *net.TCPAddr:
  404. // deviceIp = addr.IP
  405. // }
  406. // switch addr2 := d.SourceAddr.(type) {
  407. // case *net.UDPAddr:
  408. // deviceSourceIP = addr2.IP
  409. // case *net.TCPAddr:
  410. // deviceSourceIP = addr2.IP
  411. // }
  412. // if deviceIp.IsPrivate() && !deviceSourceIP.IsPrivate() {
  413. // requestMsg.DestAdd = d.SourceAddr
  414. // }
  415. return
  416. }
  417. func (d *Device) Subscribe() int {
  418. request := d.CreateRequest(sip.SUBSCRIBE)
  419. if d.subscriber.CallID != "" {
  420. callId := sip.CallID(utils.RandNumString(10))
  421. request.AppendHeader(&callId)
  422. }
  423. expires := sip.Expires(3600)
  424. d.subscriber.Timeout = time.Now().Add(time.Second * time.Duration(expires))
  425. contentType := sip.ContentType("Application/MANSCDP+xml")
  426. request.AppendHeader(&contentType)
  427. request.AppendHeader(&expires)
  428. request.SetBody(BuildCatalogXML(d.SN, d.ID), true)
  429. response, err := d.SipRequestForResponse(request)
  430. if err == nil && response != nil {
  431. if response.StatusCode() == http.StatusOK {
  432. callId, _ := request.CallID()
  433. d.subscriber.CallID = string(*callId)
  434. } else {
  435. d.subscriber.CallID = ""
  436. }
  437. return int(response.StatusCode())
  438. }
  439. return http.StatusRequestTimeout
  440. }
  441. func (d *Device) Catalog() int {
  442. //os.Stdout.Write(debug.Stack())
  443. request := d.CreateRequest(sip.MESSAGE)
  444. expires := sip.Expires(3600)
  445. d.subscriber.Timeout = time.Now().Add(time.Second * time.Duration(expires))
  446. contentType := sip.ContentType("Application/MANSCDP+xml")
  447. request.AppendHeader(&contentType)
  448. request.AppendHeader(&expires)
  449. request.SetBody(BuildCatalogXML(d.SN, d.ID), true)
  450. // 输出Sip请求设备通道信息信令
  451. GB28181Plugin.Sugar().Debugf("SIP->Catalog:%s", request)
  452. resp, err := d.SipRequestForResponse(request)
  453. if err == nil && resp != nil {
  454. GB28181Plugin.Sugar().Debugf("SIP<-Catalog Response: %s", resp.String())
  455. return int(resp.StatusCode())
  456. } else if err != nil {
  457. GB28181Plugin.Error("SIP<-Catalog error:", zap.Error(err))
  458. }
  459. return http.StatusRequestTimeout
  460. }
  461. func (d *Device) QueryDeviceInfo() {
  462. for i := time.Duration(5); i < 100; i++ {
  463. time.Sleep(time.Second * i)
  464. request := d.CreateRequest(sip.MESSAGE)
  465. contentType := sip.ContentType("Application/MANSCDP+xml")
  466. request.AppendHeader(&contentType)
  467. request.SetBody(BuildDeviceInfoXML(d.SN, d.ID), true)
  468. response, _ := d.SipRequestForResponse(request)
  469. if response != nil {
  470. // via, _ := response.ViaHop()
  471. // if via != nil && via.Params.Has("received") {
  472. // received, _ := via.Params.Get("received")
  473. // d.SipIP = received.String()
  474. // }
  475. d.Info("QueryDeviceInfo", zap.Uint16("status code", uint16(response.StatusCode())))
  476. if response.StatusCode() == http.StatusOK {
  477. break
  478. }
  479. }
  480. }
  481. }
  482. func (d *Device) SipRequestForResponse(request sip.Request) (sip.Response, error) {
  483. return srv.RequestWithContext(context.Background(), request)
  484. }
  485. // MobilePositionSubscribe 移动位置订阅
  486. func (d *Device) MobilePositionSubscribe(id string, expires time.Duration, interval time.Duration) (code int) {
  487. mobilePosition := d.CreateRequest(sip.SUBSCRIBE)
  488. if d.subscriber.CallID != "" {
  489. callId := sip.CallID(utils.RandNumString(10))
  490. mobilePosition.ReplaceHeaders(callId.Name(), []sip.Header{&callId})
  491. }
  492. expiresHeader := sip.Expires(expires / time.Second)
  493. d.subscriber.Timeout = time.Now().Add(expires)
  494. contentType := sip.ContentType("Application/MANSCDP+xml")
  495. mobilePosition.AppendHeader(&contentType)
  496. mobilePosition.AppendHeader(&expiresHeader)
  497. mobilePosition.SetBody(BuildDevicePositionXML(d.SN, id, int(interval/time.Second)), true)
  498. response, err := d.SipRequestForResponse(mobilePosition)
  499. if err == nil && response != nil {
  500. if response.StatusCode() == http.StatusOK {
  501. callId, _ := mobilePosition.CallID()
  502. d.subscriber.CallID = callId.String()
  503. } else {
  504. d.subscriber.CallID = ""
  505. }
  506. return int(response.StatusCode())
  507. }
  508. return http.StatusRequestTimeout
  509. }
  510. // UpdateChannelPosition 更新通道GPS坐标
  511. func (d *Device) UpdateChannelPosition(channelId string, gpsTime string, lng string, lat string) {
  512. if v, ok := d.channelMap.Load(channelId); ok {
  513. c := v.(*Channel)
  514. c.GpsTime = time.Now() //时间取系统收到的时间,避免设备时间和格式问题
  515. c.Longitude = lng
  516. c.Latitude = lat
  517. c.Debug("update channel position success")
  518. } else {
  519. //如果未找到通道,则更新到设备上
  520. d.GpsTime = time.Now() //时间取系统收到的时间,避免设备时间和格式问题
  521. d.Longitude = lng
  522. d.Latitude = lat
  523. d.Debug("update device position success", zap.String("channelId", channelId))
  524. }
  525. }
  526. // UpdateChannelStatus 目录订阅消息处理:新增/移除/更新通道或者更改通道状态
  527. func (d *Device) UpdateChannelStatus(deviceList []*notifyMessage) {
  528. for _, v := range deviceList {
  529. switch v.Event {
  530. case "ON":
  531. d.Debug("receive channel online notify")
  532. d.channelOnline(v.DeviceID)
  533. case "OFF":
  534. d.Debug("receive channel offline notify")
  535. d.channelOffline(v.DeviceID)
  536. case "VLOST":
  537. d.Debug("receive channel video lost notify")
  538. d.channelOffline(v.DeviceID)
  539. case "DEFECT":
  540. d.Debug("receive channel video defect notify")
  541. d.channelOffline(v.DeviceID)
  542. case "ADD":
  543. d.Debug("receive channel add notify")
  544. channel := ChannelInfo{
  545. DeviceID: v.DeviceID,
  546. ParentID: v.ParentID,
  547. Name: v.Name,
  548. Manufacturer: v.Manufacturer,
  549. Model: v.Model,
  550. Owner: v.Owner,
  551. CivilCode: v.CivilCode,
  552. Address: v.Address,
  553. Port: v.Port,
  554. Parental: v.Parental,
  555. SafetyWay: v.SafetyWay,
  556. RegisterWay: v.RegisterWay,
  557. Secrecy: v.Secrecy,
  558. Status: ChannelStatus(v.Status),
  559. }
  560. d.addOrUpdateChannel(channel)
  561. case "DEL":
  562. //删除
  563. d.Debug("receive channel delete notify")
  564. d.deleteChannel(v.DeviceID)
  565. case "UPDATE":
  566. d.Debug("receive channel update notify")
  567. // 更新通道
  568. channel := ChannelInfo{
  569. DeviceID: v.DeviceID,
  570. ParentID: v.ParentID,
  571. Name: v.Name,
  572. Manufacturer: v.Manufacturer,
  573. Model: v.Model,
  574. Owner: v.Owner,
  575. CivilCode: v.CivilCode,
  576. Address: v.Address,
  577. Port: v.Port,
  578. Parental: v.Parental,
  579. SafetyWay: v.SafetyWay,
  580. RegisterWay: v.RegisterWay,
  581. Secrecy: v.Secrecy,
  582. Status: ChannelStatus(v.Status),
  583. }
  584. //fmt.Println("UpdateChannelStatus UPDATE 111111111111111")
  585. d.UpdateChannels(channel)
  586. }
  587. }
  588. }
  589. func (d *Device) channelOnline(DeviceID string) {
  590. if v, ok := d.channelMap.Load(DeviceID); ok {
  591. c := v.(*Channel)
  592. preStatus := c.Status
  593. c.Status = ChannelOnStatus
  594. if preStatus !=ChannelOnStatus{
  595. c.UpdateChanelStatus()
  596. }
  597. c.Debug("channel online", zap.String("channelId", DeviceID))
  598. } else {
  599. d.Debug("update channel status failed, not found", zap.String("channelId", DeviceID))
  600. }
  601. }
  602. func (d *Device) channelOffline(DeviceID string) {
  603. if v, ok := d.channelMap.Load(DeviceID); ok {
  604. c := v.(*Channel)
  605. preStatus := c.Status
  606. c.Status = ChannelOffStatus
  607. if preStatus != ChannelOffStatus{
  608. c.UpdateChanelStatus()
  609. }
  610. c.Debug("channel offline", zap.String("channelId", DeviceID))
  611. } else {
  612. d.Debug("update channel status failed, not found", zap.String("channelId", DeviceID))
  613. }
  614. }