// Copyright 2019 autocareai.com. All rights reserved. // Use of this source code is governed by autocareai.com. package udp_packet import ( "fmt" "github.com/jaryhe/gopkgs/mqtt" "net" "sync" "time" tower "tower-monitor/handler/v1" "tower-monitor/parser" ) func monitorDataHandel(src []byte) (res []byte, sn string, err error) { // 解析数据帧 res, sn, err = tower.TowerFrameParse(src) if err != nil { fmt.Printf("TowerFrameParse:%v\n", err) } return res, sn, err } var uncompleteMap = map[string][]byte{} var mux sync.Mutex func deleteUncompleteMap(key string) { mux.Lock() defer mux.Unlock() delete(uncompleteMap, key) } func addUncompleteMap(key string, data []byte) { mux.Lock() defer mux.Unlock() if _, ok := uncompleteMap[key]; !ok { go func() { time.Sleep(60 *time.Second) deleteUncompleteMap(key) }() } uncompleteMap[key] = data } func getUncompleteMap(key string) ([]byte, bool) { mux.Lock() defer mux.Unlock() result, ok := uncompleteMap[key] return result, ok } /* 仅调试使用 func CheckUncomplete() { for { time.Sleep(1 *time.Second) fmt.Printf("%d\n", len(uncompleteMap)) } } */ func hexPrintf(data []byte) { fmt.Printf("read time:%s\n", time.Now().Format("2006-01-02 15:04:05")) for _, v := range data { fmt.Printf("%2x,",v) } fmt.Printf("\n") } func HandleSocketClient(conn *net.UDPConn, ch chan bool) { // 处理完后关闭连接 defer func () { <-ch }() request := make([]byte, 2048) var left, result []byte var isComplete bool read_len, clientAddr, err := conn.ReadFromUDP(request) fmt.Printf("read read:%v,%v,%v\n", read_len, err, request[:read_len]) hexPrintf(request[:read_len]) if err != nil { return } if gleft, ok := getUncompleteMap(clientAddr.String()); ok { src := append(gleft, request[:read_len]...) result, left, isComplete, err = tower.TowerParseUdp(src) } else { result, left, isComplete, err = tower.TowerParseUdp(request[:read_len]) } fmt.Printf("tower parse:%v,%v,%d\n", isComplete, err, len(left)) if err != nil { deleteUncompleteMap(clientAddr.String()) return } if len(left) > 0 { addUncompleteMap(clientAddr.String(), left) } // 包不完整 if isComplete == false { return } fmt.Printf("complete:%v\n", result) // 处理数据 res, _, err := monitorDataHandel(result) fmt.Printf("response:%v\n", err) hexPrintf(res) fmt.Printf("response over:%v\n", err) if err != nil { return } conn.WriteToUDP(res, clientAddr) } func MqttHandle(data []byte) { result,isComplete:= tower.TowerParseMqtt(data[:]) // 包不完整 if isComplete == false { return } fmt.Printf("complete:%s\n", result) // 处理数据 res, sn, err := monitorDataHandel(result) if err != nil { return } mqtt.Publish(mqtt.MqttCli, parser.Conf.Mqtt.ResTopicPrefix+"/"+sn, res) }