123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- // Copyright 2019 autocareai.com. All rights reserved.
- // Use of this source code is governed by autocareai.com.
- package udp_packet
- import (
- "fmt"
- "github.com/jaryhe/gopkgs/mqtt"
- "net"
- "sync"
- "time"
- tower "tower-monitor/handler/v1"
- "tower-monitor/parser"
- )
- func monitorDataHandel(src []byte) (res []byte, sn string, err error) {
- // 解析数据帧
- res, sn, err = tower.TowerFrameParse(src)
- if err != nil {
- fmt.Printf("TowerFrameParse:%v\n", err)
- }
- return res, sn, err
- }
- var uncompleteMap = map[string][]byte{}
- var mux sync.Mutex
- func deleteUncompleteMap(key string) {
- mux.Lock()
- defer mux.Unlock()
- delete(uncompleteMap, key)
- }
- func addUncompleteMap(key string, data []byte) {
- mux.Lock()
- defer mux.Unlock()
- if _, ok := uncompleteMap[key]; !ok {
- go func() {
- time.Sleep(60 *time.Second)
- deleteUncompleteMap(key)
- }()
- }
- uncompleteMap[key] = data
- }
- func getUncompleteMap(key string) ([]byte, bool) {
- mux.Lock()
- defer mux.Unlock()
- result, ok := uncompleteMap[key]
- return result, ok
- }
- /* 仅调试使用
- func CheckUncomplete() {
- for {
- time.Sleep(1 *time.Second)
- fmt.Printf("%d\n", len(uncompleteMap))
- }
- }
- */
- func hexPrintf(data []byte) {
- fmt.Printf("read time:%s\n", time.Now().Format("2006-01-02 15:04:05"))
- for _, v := range data {
- fmt.Printf("%2x,",v)
- }
- fmt.Printf("\n")
- }
- func HandleSocketClient(conn *net.UDPConn, ch chan bool) {
- // 处理完后关闭连接
- defer func () {
- <-ch
- }()
- request := make([]byte, 2048)
- var left, result []byte
- var isComplete bool
- read_len, clientAddr, err := conn.ReadFromUDP(request)
- fmt.Printf("read read:%v,%v,%v\n", read_len, err, request[:read_len])
- hexPrintf(request[:read_len])
- if err != nil {
- return
- }
- if gleft, ok := getUncompleteMap(clientAddr.String()); ok {
- src := append(gleft, request[:read_len]...)
- result, left, isComplete, err = tower.TowerParseUdp(src)
- } else {
- result, left, isComplete, err = tower.TowerParseUdp(request[:read_len])
- }
- fmt.Printf("tower parse:%v,%v,%d\n", isComplete, err, len(left))
- if err != nil {
- deleteUncompleteMap(clientAddr.String())
- return
- }
- if len(left) > 0 {
- addUncompleteMap(clientAddr.String(), left)
- }
- // 包不完整
- if isComplete == false {
- return
- }
- fmt.Printf("complete:%v\n", result)
- // 处理数据
- res, _, err := monitorDataHandel(result)
- fmt.Printf("response:%v\n", err)
- hexPrintf(res)
- fmt.Printf("response over:%v\n", err)
- if err != nil {
- return
- }
- conn.WriteToUDP(res, clientAddr)
- }
- func MqttHandle(data []byte) {
- result,isComplete:= tower.TowerParseMqtt(data[:])
- // 包不完整
- if isComplete == false {
- return
- }
- fmt.Printf("complete:%s\n", result)
- // 处理数据
- res, sn, err := monitorDataHandel(result)
- if err != nil {
- return
- }
- mqtt.Publish(mqtt.MqttCli, parser.Conf.Mqtt.ResTopicPrefix+"/"+sn, res)
- }
|