reorder.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. package util
  2. var RTPReorderBufferLen uint16 = 50
  3. // RTPReorder RTP包乱序重排
  4. type RTPReorder[T comparable] struct {
  5. lastSeq uint16 //最新收到的rtp包序号
  6. queue []T // 缓存队列,0号元素位置代表lastReq+1,永远保持为空
  7. Total uint32 // 总共收到的包数量
  8. Drop uint32 // 丢弃的包数量
  9. }
  10. func (p *RTPReorder[T]) Push(seq uint16, v T) (result T) {
  11. p.Total++
  12. // 初始化
  13. if len(p.queue) == 0 {
  14. p.lastSeq = seq
  15. p.queue = make([]T, RTPReorderBufferLen)
  16. return v
  17. }
  18. if seq < p.lastSeq && p.lastSeq-seq < 0x8000 {
  19. // 旧的包直接丢弃
  20. p.Drop++
  21. return
  22. }
  23. delta := seq - p.lastSeq
  24. if delta == 0 {
  25. // 重复包
  26. p.Drop++
  27. return
  28. }
  29. if delta == 1 {
  30. // 正常顺序,无需缓存
  31. p.lastSeq = seq
  32. p.pop()
  33. return v
  34. }
  35. if RTPReorderBufferLen < delta {
  36. //超过缓存最大范围,无法挽回,只能造成丢包(序号断裂)
  37. for {
  38. p.lastSeq++
  39. delta--
  40. head := p.pop()
  41. // 可以放得进去了
  42. if delta == RTPReorderBufferLen {
  43. p.queue[RTPReorderBufferLen-1] = v
  44. p.queue[0] = result
  45. return head
  46. } else if head != result {
  47. p.Drop++
  48. }
  49. }
  50. } else {
  51. // 出现后面的包先到达,缓存起来
  52. p.queue[delta-1] = v
  53. return
  54. }
  55. }
  56. func (p *RTPReorder[T]) pop() (result T) {
  57. copy(p.queue, p.queue[1:]) //整体数据向前移动一位,保持0号元素代表lastSeq+1
  58. p.queue[RTPReorderBufferLen-1] = result
  59. return p.queue[0]
  60. }
  61. // Pop 从缓存中取出一个包,需要连续调用直到返回nil
  62. func (p *RTPReorder[T]) Pop() (result T) {
  63. if len(p.queue) == 0 {
  64. return
  65. }
  66. if next := p.queue[0]; next != result {
  67. result = next
  68. p.lastSeq++
  69. p.pop()
  70. }
  71. return
  72. }