package mqtt_utils import ( "context" "crypto/rc4" "encoding/hex" "encoding/json" "fmt" "git.getensh.com/common/gopkgs/cache" "git.getensh.com/common/gopkgs/logger" "git.getensh.com/common/gopkgs/mqtt" gmqtt "github.com/eclipse/paho.mqtt.golang" "github.com/tidwall/gjson" "go.uber.org/zap" "property-mqtt/parser" "property-mqtt/pb" pb_v1 "property-mqtt/pb/v1" "property-mqtt/utils" "strconv" "strings" "time" ) const ( UniqRecKeyPrefix = "UniqFaceDeviceRec_" UniqAckKeyPrefix = "UniqFaceDeviceAck_" UniqHeartKeyPrefix = "UniqFaceDeviceHeart_" UniqQcodeKeyPrefix = "UniqFaceQcode_" UniqCardKeyPrefix = "UniqCardRec_" ) func getAllDevice() ([]string, error) { mreply, err := pb.Device.MqttFaceGate(context.Background(), &pb_v1.MqttFaceGateRequest{}) if err != nil { return nil, err } return mreply.Sns, nil } func getKey(prefix string, faceid string, messageId string) string { return fmt.Sprintf("%s%s_%s", prefix, faceid, messageId) } func GetTopic(faceId string) string { return fmt.Sprintf("mqtt/face/%v", faceId) } // 分布式部署时,同时收到订阅的消息,只能有一个服务对该消息处理 func MessageCanHandle(prefix string, faceid string, uniqflag string, second int64) bool { key := getKey(prefix, faceid, uniqflag) ret, _ := cache.Redis().SetNxEx(key, "1", second) return ret } func whiteCheck(sn string, protocol int32, codeVal string, codeType int32) (*pb_v1.GateWhiteMatchReply, error) { mreq := pb_v1.GateWhiteMatchRequest{ Sn: sn, Protocol: protocol, CodeVal: codeVal, CodeType: codeType, } return pb.Device.GateWhiteMatch(context.Background(), &mreq) } func Rc4Decrypt(encrypt string, key string) string { if len(encrypt) < 4 { return "" } if encrypt[:4] != "CB01" { return "" } miwen, _ := hex.DecodeString(encrypt[4:]) dest := make([]byte, len(miwen)) cipher2, _ := rc4.NewCipher([]byte(key)) cipher2.XORKeyStream(dest, []byte(miwen)) return string(dest) } func checkQCode(codeStr string, gateKey string) (string, int32, string, string, int64) { loc, _ := time.LoadLocation("Local") visitor := int32(2) vname, vphone := "", "" vid := int64(0) text := Rc4Decrypt(codeStr, gateKey) if text == "" { return "", 0, "", "", 0 } length := len(text) array := strings.Split(text[1:length-1], ",") uid := array[0] if len(array) < 4 { return "", 0, "", "", 0 } subArray := strings.Split(array[0], "-") if len(subArray) > 3 { uid = subArray[0] visitor = 1 vphone = subArray[1] vname = subArray[2] vid, _ = strconv.ParseInt(subArray[3], 10, 64) } start, _ := time.ParseInLocation("20060102150405", array[2], loc) end, _ := time.ParseInLocation("20060102150405", array[3], loc) now := time.Now() if now.Unix() < start.Unix() || now.Unix() > end.Unix() { return "", 0, "", "", 0 } return uid, visitor, vname, vphone, vid } // 人脸识别结果 func RecCallback(client gmqtt.Client, msg gmqtt.Message) { info := gjson.GetBytes(msg.Payload(), "info") if !info.IsObject() { return } faceId := info.Get("facesluiceId").String() timestr := info.Get("time").String() customId := info.Get("customId").String() if faceId == "" || customId == "" { return } // 只处理开门情况 if info.Get("VerifyStatus").Int() != 1 { return } if !MessageCanHandle(UniqRecKeyPrefix, faceId, customId+"-"+timestr, 10) { return } t, _ := time.ParseInLocation("2006-01-02 15:04:05", timestr, time.Now().Location()) uid, _ := strconv.ParseInt(customId, 10, 64) whiteInfo, err := whiteCheck(faceId, GateProtocolSaiboMqttV1, customId, 1) if err != nil || whiteInfo.DeviceId == 0 { return } mreq := pb_v1.GateRecordAddRequest{ Sn: faceId, Protocol: GateProtocolSaiboMqttV1, OpenTime: t.Unix(), HouseholdUid: uid, Location: whiteInfo.Location, Direction: whiteInfo.Direction, HouseholdIdNumber: whiteInfo.IdNumber, HouseholdHousename: whiteInfo.HouseName, DeviceId: whiteInfo.DeviceId, HouseholdUser: whiteInfo.Name, IsVisitor: 2, GardenId: whiteInfo.GardenId, OpenType: 3, Online: 2, } _, err = pb.Device.GateRecordAdd(context.Background(), &mreq) if err != nil { logger.Error("func", zap.String("call", "pb.Device.GateRecordAdd"), zap.String("error", err.Error())) } } // 卡号识别结果 func CardCallback(client gmqtt.Client, msg gmqtt.Message) { info := gjson.GetBytes(msg.Payload(), "info") if !info.IsObject() { return } faceId := info.Get("facesluiceId").String() timestr := info.Get("time").String() cardNumber := info.Get("CardInfo").Get("CardNum").String() if faceId == "" || cardNumber == "" { return } if !MessageCanHandle(UniqRecKeyPrefix, faceId, cardNumber+"-"+timestr, 10) { return } t, _ := time.ParseInLocation("2006-01-02 15:04:05", timestr, time.Now().Location()) whiteInfo, err := whiteCheck(faceId, GateProtocolSaiboMqttV1, cardNumber, 2) if err != nil || whiteInfo.DeviceId == 0 { return } if whiteInfo.Status == 1 { messageId := fmt.Sprintf("cardopen_%s_%d_%s", cardNumber, time.Now().Unix(), utils.GenerateRandomStr(6, "mix")) mreq := UnlockCmd{ Operator: "Unlock", MessageId: messageId, Info: UnlockInfo{OpenDoor: 1}, } bytes, _ := json.Marshal(&mreq) // 向mqtt发送命令 err := mqtt.Publish(mqtt.MqttCli, GetTopic(faceId), bytes) if err != nil { logger.Error("func", zap.String("call", "mqtt.Publish"), zap.String("error", err.Error())) return } recordReq := pb_v1.GateRecordAddRequest{ Sn: faceId, Protocol: GateProtocolSaiboMqttV1, OpenTime: t.Unix(), HouseholdUid: 0, Location: whiteInfo.Location, Direction: whiteInfo.Direction, HouseholdIdNumber: whiteInfo.IdNumber, HouseholdHousename: whiteInfo.HouseName, DeviceId: whiteInfo.DeviceId, HouseholdUser: whiteInfo.Name, IsVisitor: 2, GardenId: whiteInfo.GardenId, OpenType: 2, CardNumber: cardNumber, Online: 1, } recordBytes, _ := json.Marshal(recordReq) cache.Redis().SetEx(messageId, 10, string(recordBytes)) return } } // 设备对服务器的响应命令 func AckCallback(client gmqtt.Client, msg gmqtt.Message) { info := gjson.GetBytes(msg.Payload(), "info") if !info.IsObject() { return } faceId := info.Get("facesluiceId").String() messageId := gjson.GetBytes(msg.Payload(), "messageId").String() operator := gjson.GetBytes(msg.Payload(), "operator").String() if faceId == "" || messageId == "" { return } //commond := GetCommand(faceId, messageId) //if commond == "" { // return //} if !MessageCanHandle(UniqAckKeyPrefix, faceId, messageId, 10) { return } if f, ok := CommandHandleMap[operator]; ok { go f("", string(msg.Payload())) } } // 心跳 func HeartbeatCallback(client gmqtt.Client, msg gmqtt.Message) { info := gjson.GetBytes(msg.Payload(), "info") if !info.IsObject() { return } faceId := info.Get("facesluiceId").String() if faceId == "" { return } if !MessageCanHandle(UniqHeartKeyPrefix, faceId, "", 30) { return } mreq := pb_v1.GateOnlineRequest{Sn: faceId, Protocol: GateProtocolSaiboMqttV1} pb.Device.GateOnline(context.Background(), &mreq) } func QcodeCallback(client gmqtt.Client, msg gmqtt.Message) { info := gjson.GetBytes(msg.Payload(), "info") if !info.IsObject() { return } faceId := info.Get("facesluiceId").String() if faceId == "" { return } qcode := info.Get("QRCodeInfo").String() if qcode == "" { return } timestr := info.Get("time").String() if timestr == "" { return } if !MessageCanHandle(UniqQcodeKeyPrefix, faceId, qcode+timestr, 30) { return } uidStr, visitor, vname, vphone, vid := checkQCode(qcode, parser.Conf.GateKey) if uidStr == "" { return } uid, _ := strconv.ParseInt(uidStr, 10, 64) whiteInfo, err := whiteCheck(faceId, GateProtocolSaiboMqttV1, uidStr, 1) if err != nil || whiteInfo.Status != 1 { return } now := time.Now() if visitor == 1 { vreq := pb_v1.GateVisitorCheckRequest{ Id: vid, OpenTime: now.Unix(), DeviceId: whiteInfo.DeviceId, } _, err := pb.Device.GateVisitorCheck(context.Background(), &vreq) if err != nil { return } } recordReq := pb_v1.GateRecordAddRequest{ Sn: faceId, Protocol: GateProtocolSaiboMqttV1, OpenTime: now.Unix(), HouseholdUid: uid, Location: whiteInfo.Location, Direction: whiteInfo.Direction, HouseholdIdNumber: whiteInfo.IdNumber, HouseholdHousename: whiteInfo.HouseName, DeviceId: whiteInfo.DeviceId, HouseholdUser: whiteInfo.Name, IsVisitor: visitor, GardenId: whiteInfo.GardenId, OpenType: 1, CardNumber: "", VisitorPhone: vphone, VisitorName: vname, Online: 1, } messageId := fmt.Sprintf("qcodeopen_%d_%d_%s", uid, time.Now().Unix(), utils.GenerateRandomStr(6, "mix")) mreq := UnlockCmd{ Operator: "Unlock", MessageId: messageId, Info: UnlockInfo{OpenDoor: 1}, } bytes, _ := json.Marshal(&mreq) // 向mqtt发送命令 err = mqtt.Publish(mqtt.MqttCli, GetTopic(faceId), bytes) if err != nil { logger.Error("func", zap.String("call", "mqtt.Publish"), zap.String("error", err.Error())) return } recordBytes, _ := json.Marshal(recordReq) cache.Redis().SetEx(messageId, 10, string(recordBytes)) } func SubOne(sn string) error { v := sn // 人脸识别结果 err := mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/%v/Rec", v), RecCallback) if err != nil { return err } // 回复服务器下发指令 err = mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/%v/Ack", v), AckCallback) if err != nil { return err } // 二维码信息上报 err = mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/%v/QRCode", v), QcodeCallback) if err != nil { return err } // 卡号信息上报 err = mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/%v/Card", v), CardCallback) if err != nil { return err } return nil } func SubAll() { devices, err := getAllDevice() if err != nil { panic("mqtt 订阅前获取设备失败:" + err.Error()) } for _, v := range devices { // 人脸识别结果 err = mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/%v/Rec", v), RecCallback) if err != nil { panic("mqtt 订阅失败:" + err.Error()) } // 回复服务器下发指令 err = mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/%v/Ack", v), AckCallback) if err != nil { panic("mqtt 订阅失败:" + err.Error()) } // 二维码信息上报 err = mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/%v/QRCode", v), QcodeCallback) if err != nil { panic("mqtt 订阅失败:" + err.Error()) } // 卡号信息上报 err = mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/%v/Card", v), CardCallback) if err != nil { panic("mqtt 订阅失败:" + err.Error()) } } // 心跳 err = mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/heartbeat"), HeartbeatCallback) if err != nil { panic("mqtt 订阅失败:" + err.Error()) } }