monitor.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. // Copyright 2019 autocareai.com. All rights reserved.
  2. // Use of this source code is governed by autocareai.com.
  3. package udp_packet
  4. import (
  5. "fmt"
  6. "github.com/jaryhe/gopkgs/mqtt"
  7. "net"
  8. "sync"
  9. "time"
  10. tower "tower-monitor/handler/v1"
  11. "tower-monitor/parser"
  12. )
  13. func monitorDataHandel(src []byte) (res []byte, sn string, err error) {
  14. // 解析数据帧
  15. res, sn, err = tower.TowerFrameParse(src)
  16. if err != nil {
  17. fmt.Printf("TowerFrameParse:%v\n", err)
  18. }
  19. return res, sn, err
  20. }
  21. var uncompleteMap = map[string][]byte{}
  22. var mux sync.Mutex
  23. func deleteUncompleteMap(key string) {
  24. mux.Lock()
  25. defer mux.Unlock()
  26. delete(uncompleteMap, key)
  27. }
  28. func addUncompleteMap(key string, data []byte) {
  29. mux.Lock()
  30. defer mux.Unlock()
  31. if _, ok := uncompleteMap[key]; !ok {
  32. go func() {
  33. time.Sleep(60 *time.Second)
  34. deleteUncompleteMap(key)
  35. }()
  36. }
  37. uncompleteMap[key] = data
  38. }
  39. func getUncompleteMap(key string) ([]byte, bool) {
  40. mux.Lock()
  41. defer mux.Unlock()
  42. result, ok := uncompleteMap[key]
  43. return result, ok
  44. }
  45. /* 仅调试使用
  46. func CheckUncomplete() {
  47. for {
  48. time.Sleep(1 *time.Second)
  49. fmt.Printf("%d\n", len(uncompleteMap))
  50. }
  51. }
  52. */
  53. func hexPrintf(data []byte) {
  54. fmt.Printf("read time:%s\n", time.Now().Format("2006-01-02 15:04:05"))
  55. for _, v := range data {
  56. fmt.Printf("%2x,",v)
  57. }
  58. fmt.Printf("\n")
  59. }
  60. func HandleSocketClient(conn *net.UDPConn, ch chan bool) {
  61. // 处理完后关闭连接
  62. defer func () {
  63. <-ch
  64. }()
  65. request := make([]byte, 2048)
  66. var left, result []byte
  67. var isComplete bool
  68. read_len, clientAddr, err := conn.ReadFromUDP(request)
  69. fmt.Printf("read read:%v,%v,%v\n", read_len, err, request[:read_len])
  70. hexPrintf(request[:read_len])
  71. if err != nil {
  72. return
  73. }
  74. if gleft, ok := getUncompleteMap(clientAddr.String()); ok {
  75. src := append(gleft, request[:read_len]...)
  76. result, left, isComplete, err = tower.TowerParseUdp(src)
  77. } else {
  78. result, left, isComplete, err = tower.TowerParseUdp(request[:read_len])
  79. }
  80. fmt.Printf("tower parse:%v,%v,%d\n", isComplete, err, len(left))
  81. if err != nil {
  82. deleteUncompleteMap(clientAddr.String())
  83. return
  84. }
  85. if len(left) > 0 {
  86. addUncompleteMap(clientAddr.String(), left)
  87. }
  88. // 包不完整
  89. if isComplete == false {
  90. return
  91. }
  92. fmt.Printf("complete:%v\n", result)
  93. // 处理数据
  94. res, _, err := monitorDataHandel(result)
  95. fmt.Printf("response:%v\n", err)
  96. hexPrintf(res)
  97. fmt.Printf("response over:%v\n", err)
  98. if err != nil {
  99. return
  100. }
  101. conn.WriteToUDP(res, clientAddr)
  102. }
  103. func MqttHandle(data []byte) {
  104. result,isComplete:= tower.TowerParseMqtt(data[:])
  105. // 包不完整
  106. if isComplete == false {
  107. return
  108. }
  109. fmt.Printf("complete:%s\n", result)
  110. // 处理数据
  111. res, sn, err := monitorDataHandel(result)
  112. if err != nil {
  113. return
  114. }
  115. mqtt.Publish(mqtt.MqttCli, parser.Conf.Mqtt.ResTopicPrefix+"/"+sn, res)
  116. }