rtp_sort.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package utils
  2. import (
  3. "container/heap"
  4. "errors"
  5. "github.com/pion/rtp"
  6. )
  7. const MaxRtpDiff = 65000 //相邻两个包之间的最大差值
  8. type PriorityQueueRtp struct {
  9. itemHeap *packets
  10. current *rtp.Packet
  11. priorityMap map[uint16]bool
  12. lastPacket *rtp.Packet
  13. }
  14. func NewPqRtp() *PriorityQueueRtp {
  15. return &PriorityQueueRtp{
  16. itemHeap: &packets{},
  17. priorityMap: make(map[uint16]bool),
  18. }
  19. }
  20. func (p *PriorityQueueRtp) Len() int {
  21. return p.itemHeap.Len()
  22. }
  23. func (p *PriorityQueueRtp) Push(v rtp.Packet) {
  24. if p.priorityMap[v.SequenceNumber] {
  25. return
  26. }
  27. newItem := &packet{
  28. value: v,
  29. priority: v.SequenceNumber,
  30. }
  31. heap.Push(p.itemHeap, newItem)
  32. }
  33. func (p *PriorityQueueRtp) Pop() (rtp.Packet, error) {
  34. if len(*p.itemHeap) == 0 {
  35. return rtp.Packet{}, errors.New("empty queue")
  36. }
  37. item := heap.Pop(p.itemHeap).(*packet)
  38. return item.value, nil
  39. }
  40. func (p *PriorityQueueRtp) Empty() {
  41. old := *p.itemHeap
  42. *p.itemHeap = old[:0]
  43. }
  44. type packets []*packet
  45. type packet struct {
  46. value rtp.Packet
  47. priority uint16
  48. index int
  49. }
  50. func (p *packets) Len() int {
  51. return len(*p)
  52. }
  53. func (p *packets) Less(i, j int) bool {
  54. a, b := (*p)[i].priority, (*p)[j].priority
  55. if int(a)-int(b) > MaxRtpDiff || int(b)-int(a) > MaxRtpDiff {
  56. if a < b {
  57. return false
  58. }
  59. return true
  60. }
  61. return a < b
  62. }
  63. func (p *packets) Swap(i, j int) {
  64. (*p)[i], (*p)[j] = (*p)[j], (*p)[i]
  65. (*p)[i].index = i
  66. (*p)[j].index = j
  67. }
  68. func (p *packets) Push(x interface{}) {
  69. it := x.(*packet)
  70. it.index = len(*p)
  71. *p = append(*p, it)
  72. }
  73. func (p *packets) Pop() interface{} {
  74. old := *p
  75. n := len(old)
  76. item := old[n-1]
  77. old[n-1] = nil // avoid memory leak
  78. item.index = -1 // for safety
  79. *p = old[0 : n-1]
  80. return item
  81. }