link.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. package gb28181
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "go.uber.org/zap"
  7. )
  8. // 对于录像查询,通过 queryKey (即 deviceId + channelId + sn) 唯一区分一次请求和响应
  9. // 并将其关联起来,以实现异步响应的目的
  10. // 提供单例实例供调用
  11. var RecordQueryLink = NewRecordQueryLink(time.Second * 60)
  12. type recordQueryLink struct {
  13. pendingResult map[string]recordQueryResult // queryKey 查询结果缓存
  14. pendingResp map[string]recordQueryResp // queryKey 待回复的查询请求
  15. timeout time.Duration // 查询结果的过期时间
  16. sync.RWMutex
  17. }
  18. type recordQueryResult struct {
  19. time time.Time
  20. err error
  21. sum int
  22. finished bool
  23. list []*Record
  24. }
  25. type recordQueryResp struct {
  26. respChan chan<- recordQueryResult
  27. timeout time.Duration
  28. startTime time.Time
  29. }
  30. func NewRecordQueryLink(resultTimeout time.Duration) *recordQueryLink {
  31. c := &recordQueryLink{
  32. timeout: resultTimeout,
  33. pendingResult: make(map[string]recordQueryResult),
  34. pendingResp: make(map[string]recordQueryResp),
  35. }
  36. return c
  37. }
  38. // 唯一区分一次录像查询
  39. func recordQueryKey(deviceId, channelId string, sn int) string {
  40. return fmt.Sprintf("%s-%s-%d", deviceId, channelId, sn)
  41. }
  42. // 定期清理过期的查询结果和请求
  43. func (c *recordQueryLink) cleanTimeout() {
  44. for k, s := range c.pendingResp {
  45. if time.Since(s.startTime) > s.timeout {
  46. if r, ok := c.pendingResult[k]; ok {
  47. c.notify(k, r)
  48. } else {
  49. c.notify(k, recordQueryResult{err: fmt.Errorf("query time out")})
  50. }
  51. }
  52. }
  53. for k, r := range c.pendingResult {
  54. if time.Since(r.time) > c.timeout {
  55. delete(c.pendingResult, k)
  56. }
  57. }
  58. }
  59. func (c *recordQueryLink) Put(deviceId, channelId string, sn int, sum int, record []*Record) {
  60. key, r := c.doPut(deviceId, channelId, sn, sum, record)
  61. if r.finished {
  62. c.notify(key, r)
  63. }
  64. }
  65. func (c *recordQueryLink) doPut(deviceId, channelId string, sn, sum int, record []*Record) (key string, r recordQueryResult) {
  66. c.Lock()
  67. defer c.Unlock()
  68. key = recordQueryKey(deviceId, channelId, sn)
  69. if v, ok := c.pendingResult[key]; ok {
  70. r = v
  71. } else {
  72. r = recordQueryResult{time: time.Now(), sum: sum, list: make([]*Record, 0)}
  73. }
  74. r.list = append(r.list, record...)
  75. if len(r.list) == sum {
  76. r.finished = true
  77. }
  78. c.pendingResult[key] = r
  79. GB28181Plugin.Logger.Debug("put record",
  80. zap.String("key", key),
  81. zap.Int("sum", sum),
  82. zap.Int("count", len(r.list)))
  83. return
  84. }
  85. func (c *recordQueryLink) WaitResult(
  86. deviceId, channelId string, sn int,
  87. timeout time.Duration) (resultCh <-chan recordQueryResult) {
  88. key := recordQueryKey(deviceId, channelId, sn)
  89. c.Lock()
  90. defer c.Unlock()
  91. respCh := make(chan recordQueryResult, 1)
  92. resultCh = respCh
  93. c.pendingResp[key] = recordQueryResp{startTime: time.Now(), timeout: timeout, respChan: respCh}
  94. return
  95. }
  96. func (c *recordQueryLink) notify(key string, r recordQueryResult) {
  97. if s, ok := c.pendingResp[key]; ok {
  98. s.respChan <- r
  99. }
  100. c.Lock()
  101. defer c.Unlock()
  102. delete(c.pendingResp, key)
  103. delete(c.pendingResult, key)
  104. GB28181Plugin.Logger.Debug("record notify", zap.String("key", key))
  105. }