123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- package gb28181
- import (
- "fmt"
- "sync"
- "time"
- "go.uber.org/zap"
- )
- // 对于录像查询,通过 queryKey (即 deviceId + channelId + sn) 唯一区分一次请求和响应
- // 并将其关联起来,以实现异步响应的目的
- // 提供单例实例供调用
- var RecordQueryLink = NewRecordQueryLink(time.Second * 60)
- type recordQueryLink struct {
- pendingResult map[string]recordQueryResult // queryKey 查询结果缓存
- pendingResp map[string]recordQueryResp // queryKey 待回复的查询请求
- timeout time.Duration // 查询结果的过期时间
- sync.RWMutex
- }
- type recordQueryResult struct {
- time time.Time
- err error
- sum int
- finished bool
- list []*Record
- }
- type recordQueryResp struct {
- respChan chan<- recordQueryResult
- timeout time.Duration
- startTime time.Time
- }
- func NewRecordQueryLink(resultTimeout time.Duration) *recordQueryLink {
- c := &recordQueryLink{
- timeout: resultTimeout,
- pendingResult: make(map[string]recordQueryResult),
- pendingResp: make(map[string]recordQueryResp),
- }
- return c
- }
- // 唯一区分一次录像查询
- func recordQueryKey(deviceId, channelId string, sn int) string {
- return fmt.Sprintf("%s-%s-%d", deviceId, channelId, sn)
- }
- // 定期清理过期的查询结果和请求
- func (c *recordQueryLink) cleanTimeout() {
- for k, s := range c.pendingResp {
- if time.Since(s.startTime) > s.timeout {
- if r, ok := c.pendingResult[k]; ok {
- c.notify(k, r)
- } else {
- c.notify(k, recordQueryResult{err: fmt.Errorf("query time out")})
- }
- }
- }
- for k, r := range c.pendingResult {
- if time.Since(r.time) > c.timeout {
- delete(c.pendingResult, k)
- }
- }
- }
- func (c *recordQueryLink) Put(deviceId, channelId string, sn int, sum int, record []*Record) {
- key, r := c.doPut(deviceId, channelId, sn, sum, record)
- if r.finished {
- c.notify(key, r)
- }
- }
- func (c *recordQueryLink) doPut(deviceId, channelId string, sn, sum int, record []*Record) (key string, r recordQueryResult) {
- c.Lock()
- defer c.Unlock()
- key = recordQueryKey(deviceId, channelId, sn)
- if v, ok := c.pendingResult[key]; ok {
- r = v
- } else {
- r = recordQueryResult{time: time.Now(), sum: sum, list: make([]*Record, 0)}
- }
- r.list = append(r.list, record...)
- if len(r.list) == sum {
- r.finished = true
- }
- c.pendingResult[key] = r
- GB28181Plugin.Logger.Debug("put record",
- zap.String("key", key),
- zap.Int("sum", sum),
- zap.Int("count", len(r.list)))
- return
- }
- func (c *recordQueryLink) WaitResult(
- deviceId, channelId string, sn int,
- timeout time.Duration) (resultCh <-chan recordQueryResult) {
- key := recordQueryKey(deviceId, channelId, sn)
- c.Lock()
- defer c.Unlock()
- respCh := make(chan recordQueryResult, 1)
- resultCh = respCh
- c.pendingResp[key] = recordQueryResp{startTime: time.Now(), timeout: timeout, respChan: respCh}
- return
- }
- func (c *recordQueryLink) notify(key string, r recordQueryResult) {
- if s, ok := c.pendingResp[key]; ok {
- s.respChan <- r
- }
- c.Lock()
- defer c.Unlock()
- delete(c.pendingResp, key)
- delete(c.pendingResult, key)
- GB28181Plugin.Logger.Debug("record notify", zap.String("key", key))
- }
|