monitor.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. // Copyright 2019 autocareai.com. All rights reserved.
  2. // Use of this source code is governed by autocareai.com.
  3. package udp_packet
  4. import (
  5. "fmt"
  6. "github.com/jaryhe/gopkgs/mqtt"
  7. "net"
  8. "sync"
  9. "time"
  10. lift "lift-monitor/handler/v1"
  11. "lift-monitor/parser"
  12. )
  13. func monitorDataHandel(src []byte) (res []byte, sn string, err error) {
  14. // 解析数据帧
  15. res, sn, err = lift.LiftFrameParse(src)
  16. if err != nil {
  17. fmt.Printf("LiftFrameParse:%v\n", err)
  18. }
  19. return res, sn, err
  20. }
  21. var uncompleteMap = map[string][]byte{}
  22. var mux sync.Mutex
  23. func deleteUncompleteMap(key string) {
  24. mux.Lock()
  25. defer mux.Unlock()
  26. delete(uncompleteMap, key)
  27. }
  28. func addUncompleteMap(key string, data []byte) {
  29. mux.Lock()
  30. defer mux.Unlock()
  31. if _, ok := uncompleteMap[key]; !ok {
  32. go func() {
  33. time.Sleep(60 *time.Second)
  34. deleteUncompleteMap(key)
  35. }()
  36. }
  37. uncompleteMap[key] = data
  38. }
  39. func getUncompleteMap(key string) ([]byte, bool) {
  40. mux.Lock()
  41. defer mux.Unlock()
  42. result, ok := uncompleteMap[key]
  43. return result, ok
  44. }
  45. /* 仅调试使用
  46. func CheckUncomplete() {
  47. for {
  48. time.Sleep(1 *time.Second)
  49. fmt.Printf("%d\n", len(uncompleteMap))
  50. }
  51. }
  52. */
  53. func hexPrintf(data []byte) {
  54. fmt.Printf("read time:%s\n", time.Now().Format("2006-01-02 15:04:05"))
  55. for _, v := range data {
  56. fmt.Printf("%2x,",v)
  57. }
  58. fmt.Printf("\n")
  59. }
  60. func HandleSocketClient(conn *net.UDPConn, ch chan bool) {
  61. // 处理完后关闭连接
  62. defer func () {
  63. <-ch
  64. }()
  65. request := make([]byte, 2048)
  66. var left, result []byte
  67. var isComplete bool
  68. fmt.Printf("xxxx\n")
  69. read_len, clientAddr, err := conn.ReadFromUDP(request)
  70. fmt.Printf("read read:%v,%v,%v\n", read_len, err, request[:read_len])
  71. hexPrintf(request[:read_len])
  72. if err != nil {
  73. return
  74. }
  75. if gleft, ok := getUncompleteMap(clientAddr.String()); ok {
  76. src := append(gleft, request[:read_len]...)
  77. result, left, isComplete, err = lift.LiftParseUdp(src)
  78. } else {
  79. result, left, isComplete, err = lift.LiftParseUdp(request[:read_len])
  80. }
  81. fmt.Printf("lift parse:%v,%v,%d\n", isComplete, err, len(left))
  82. if err != nil {
  83. deleteUncompleteMap(clientAddr.String())
  84. return
  85. }
  86. if len(left) > 0 {
  87. addUncompleteMap(clientAddr.String(), left)
  88. }
  89. // 包不完整
  90. if isComplete == false {
  91. return
  92. }
  93. fmt.Printf("complete:%v\n", result)
  94. // 处理数据
  95. res, _, err := monitorDataHandel(result)
  96. fmt.Printf("response:%v\n", err)
  97. hexPrintf(res)
  98. fmt.Printf("response over:%v\n", err)
  99. if err != nil {
  100. return
  101. }
  102. conn.WriteToUDP(res, clientAddr)
  103. }
  104. func MqttHandle(data []byte) {
  105. result,isComplete:= lift.LiftParseMqtt(data[:])
  106. // 包不完整
  107. if isComplete == false {
  108. return
  109. }
  110. fmt.Printf("complete:%s\n", result)
  111. // 处理数据
  112. res, sn, err := monitorDataHandel(result)
  113. if err != nil {
  114. return
  115. }
  116. mqtt.Publish(mqtt.MqttCli, parser.Conf.Mqtt.ResTopicPrefix+"/"+sn, res)
  117. }