123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424 |
- package mqtt_utils
- import (
- "context"
- "crypto/rc4"
- "encoding/hex"
- "encoding/json"
- "fmt"
- "git.getensh.com/common/gopkgs/cache"
- "git.getensh.com/common/gopkgs/logger"
- "git.getensh.com/common/gopkgs/mqtt"
- gmqtt "github.com/eclipse/paho.mqtt.golang"
- "github.com/tidwall/gjson"
- "go.uber.org/zap"
- "property-mqtt/parser"
- "property-mqtt/pb"
- pb_v1 "property-mqtt/pb/v1"
- "property-mqtt/utils"
- "strconv"
- "strings"
- "time"
- )
- const (
- UniqRecKeyPrefix = "UniqFaceDeviceRec_"
- UniqAckKeyPrefix = "UniqFaceDeviceAck_"
- UniqHeartKeyPrefix = "UniqFaceDeviceHeart_"
- UniqQcodeKeyPrefix = "UniqFaceQcode_"
- UniqCardKeyPrefix = "UniqCardRec_"
- )
- func getAllDevice() ([]string, error) {
- mreply, err := pb.Device.MqttFaceGate(context.Background(), &pb_v1.MqttFaceGateRequest{})
- if err != nil {
- return nil, err
- }
- return mreply.Sns, nil
- }
- func getKey(prefix string, faceid string, messageId string) string {
- return fmt.Sprintf("%s%s_%s", prefix, faceid, messageId)
- }
- func GetTopic(faceId string) string {
- return fmt.Sprintf("mqtt/face/%v", faceId)
- }
- // 分布式部署时,同时收到订阅的消息,只能有一个服务对该消息处理
- func MessageCanHandle(prefix string, faceid string, uniqflag string, second int64) bool {
- key := getKey(prefix, faceid, uniqflag)
- ret, _ := cache.Redis().SetNxEx(key, "1", second)
- return ret
- }
- func whiteCheck(sn string, protocol int32, codeVal string, codeType int32) (*pb_v1.GateWhiteMatchReply, error) {
- mreq := pb_v1.GateWhiteMatchRequest{
- Sn: sn,
- Protocol: protocol,
- CodeVal: codeVal,
- CodeType: codeType,
- }
- return pb.Device.GateWhiteMatch(context.Background(), &mreq)
- }
- func Rc4Decrypt(encrypt string, key string) string {
- if len(encrypt) < 4 {
- return ""
- }
- if encrypt[:4] != "CB01" {
- return ""
- }
- miwen, _ := hex.DecodeString(encrypt[4:])
- dest := make([]byte, len(miwen))
- cipher2, _ := rc4.NewCipher([]byte(key))
- cipher2.XORKeyStream(dest, []byte(miwen))
- return string(dest)
- }
- func checkQCode(codeStr string, gateKey string) (string, int32, string, string, int64) {
- loc, _ := time.LoadLocation("Local")
- visitor := int32(2)
- vname, vphone := "", ""
- vid := int64(0)
- text := Rc4Decrypt(codeStr, gateKey)
- if text == "" {
- return "", 0, "", "", 0
- }
- length := len(text)
- array := strings.Split(text[1:length-1], ",")
- uid := array[0]
- if len(array) < 4 {
- return "", 0, "", "", 0
- }
- subArray := strings.Split(array[0], "-")
- if len(subArray) > 3 {
- uid = subArray[0]
- visitor = 1
- vphone = subArray[1]
- vname = subArray[2]
- vid, _ = strconv.ParseInt(subArray[3], 10, 64)
- }
- start, _ := time.ParseInLocation("20060102150405", array[2], loc)
- end, _ := time.ParseInLocation("20060102150405", array[3], loc)
- now := time.Now()
- if now.Unix() < start.Unix() || now.Unix() > end.Unix() {
- return "", 0, "", "", 0
- }
- return uid, visitor, vname, vphone, vid
- }
- // 人脸识别结果
- func RecCallback(client gmqtt.Client, msg gmqtt.Message) {
- info := gjson.GetBytes(msg.Payload(), "info")
- if !info.IsObject() {
- return
- }
- faceId := info.Get("facesluiceId").String()
- timestr := info.Get("time").String()
- customId := info.Get("customId").String()
- if faceId == "" || customId == "" {
- return
- }
- // 只处理开门情况
- if info.Get("VerifyStatus").Int() != 1 {
- return
- }
- if !MessageCanHandle(UniqRecKeyPrefix, faceId, customId+"-"+timestr, 10) {
- return
- }
- t, _ := time.ParseInLocation("2006-01-02 15:04:05", timestr, time.Now().Location())
- uid, _ := strconv.ParseInt(customId, 10, 64)
- whiteInfo, err := whiteCheck(faceId, GateProtocolSaiboMqttV1, customId, 1)
- if err != nil || whiteInfo.DeviceId == 0 {
- return
- }
- mreq := pb_v1.GateRecordAddRequest{
- Sn: faceId,
- Protocol: GateProtocolSaiboMqttV1,
- OpenTime: t.Unix(),
- HouseholdUid: uid,
- Location: whiteInfo.Location,
- Direction: whiteInfo.Direction,
- HouseholdIdNumber: whiteInfo.IdNumber,
- HouseholdHousename: whiteInfo.HouseName,
- DeviceId: whiteInfo.DeviceId,
- HouseholdUser: whiteInfo.Name,
- IsVisitor: 2,
- GardenId: whiteInfo.GardenId,
- OpenType: 3,
- Online: 2,
- }
- _, err = pb.Device.GateRecordAdd(context.Background(), &mreq)
- if err != nil {
- logger.Error("func",
- zap.String("call", "pb.Device.GateRecordAdd"),
- zap.String("error", err.Error()))
- }
- }
- // 卡号识别结果
- func CardCallback(client gmqtt.Client, msg gmqtt.Message) {
- info := gjson.GetBytes(msg.Payload(), "info")
- if !info.IsObject() {
- return
- }
- faceId := info.Get("facesluiceId").String()
- timestr := info.Get("time").String()
- cardNumber := info.Get("CardInfo").Get("CardNum").String()
- if faceId == "" || cardNumber == "" {
- return
- }
- if !MessageCanHandle(UniqRecKeyPrefix, faceId, cardNumber+"-"+timestr, 10) {
- return
- }
- t, _ := time.ParseInLocation("2006-01-02 15:04:05", timestr, time.Now().Location())
- whiteInfo, err := whiteCheck(faceId, GateProtocolSaiboMqttV1, cardNumber, 2)
- if err != nil || whiteInfo.DeviceId == 0 {
- return
- }
- if whiteInfo.Status == 1 {
- messageId := fmt.Sprintf("cardopen_%s_%d_%s", cardNumber, time.Now().Unix(), utils.GenerateRandomStr(6, "mix"))
- mreq := UnlockCmd{
- Operator: "Unlock",
- MessageId: messageId,
- Info: UnlockInfo{OpenDoor: 1},
- }
- bytes, _ := json.Marshal(&mreq)
- // 向mqtt发送命令
- err := mqtt.Publish(mqtt.MqttCli, GetTopic(faceId), bytes)
- if err != nil {
- logger.Error("func",
- zap.String("call", "mqtt.Publish"),
- zap.String("error", err.Error()))
- return
- }
- recordReq := pb_v1.GateRecordAddRequest{
- Sn: faceId,
- Protocol: GateProtocolSaiboMqttV1,
- OpenTime: t.Unix(),
- HouseholdUid: 0,
- Location: whiteInfo.Location,
- Direction: whiteInfo.Direction,
- HouseholdIdNumber: whiteInfo.IdNumber,
- HouseholdHousename: whiteInfo.HouseName,
- DeviceId: whiteInfo.DeviceId,
- HouseholdUser: whiteInfo.Name,
- IsVisitor: 2,
- GardenId: whiteInfo.GardenId,
- OpenType: 2,
- CardNumber: cardNumber,
- Online: 1,
- }
- recordBytes, _ := json.Marshal(recordReq)
- cache.Redis().SetEx(messageId, 10, string(recordBytes))
- return
- }
- }
- // 设备对服务器的响应命令
- func AckCallback(client gmqtt.Client, msg gmqtt.Message) {
- info := gjson.GetBytes(msg.Payload(), "info")
- if !info.IsObject() {
- return
- }
- faceId := info.Get("facesluiceId").String()
- messageId := gjson.GetBytes(msg.Payload(), "messageId").String()
- operator := gjson.GetBytes(msg.Payload(), "operator").String()
- if faceId == "" || messageId == "" {
- return
- }
- //commond := GetCommand(faceId, messageId)
- //if commond == "" {
- // return
- //}
- if !MessageCanHandle(UniqAckKeyPrefix, faceId, messageId, 10) {
- return
- }
- if f, ok := CommandHandleMap[operator]; ok {
- go f("", string(msg.Payload()))
- }
- }
- // 心跳
- func HeartbeatCallback(client gmqtt.Client, msg gmqtt.Message) {
- info := gjson.GetBytes(msg.Payload(), "info")
- if !info.IsObject() {
- return
- }
- faceId := info.Get("facesluiceId").String()
- if faceId == "" {
- return
- }
- if !MessageCanHandle(UniqHeartKeyPrefix, faceId, "", 30) {
- return
- }
- mreq := pb_v1.GateOnlineRequest{Sn: faceId, Protocol: GateProtocolSaiboMqttV1}
- pb.Device.GateOnline(context.Background(), &mreq)
- }
- func QcodeCallback(client gmqtt.Client, msg gmqtt.Message) {
- info := gjson.GetBytes(msg.Payload(), "info")
- if !info.IsObject() {
- return
- }
- faceId := info.Get("facesluiceId").String()
- if faceId == "" {
- return
- }
- qcode := info.Get("QRCodeInfo").String()
- if qcode == "" {
- return
- }
- timestr := info.Get("time").String()
- if timestr == "" {
- return
- }
- if !MessageCanHandle(UniqQcodeKeyPrefix, faceId, qcode+timestr, 30) {
- return
- }
- uidStr, visitor, vname, vphone, vid := checkQCode(qcode, parser.Conf.GateKey)
- if uidStr == "" {
- return
- }
- uid, _ := strconv.ParseInt(uidStr, 10, 64)
- whiteInfo, err := whiteCheck(faceId, GateProtocolSaiboMqttV1, uidStr, 1)
- if err != nil || whiteInfo.Status != 1 {
- return
- }
- now := time.Now()
- if visitor == 1 {
- vreq := pb_v1.GateVisitorCheckRequest{
- Id: vid,
- OpenTime: now.Unix(),
- DeviceId: whiteInfo.DeviceId,
- }
- _, err := pb.Device.GateVisitorCheck(context.Background(), &vreq)
- if err != nil {
- return
- }
- }
- recordReq := pb_v1.GateRecordAddRequest{
- Sn: faceId,
- Protocol: GateProtocolSaiboMqttV1,
- OpenTime: now.Unix(),
- HouseholdUid: uid,
- Location: whiteInfo.Location,
- Direction: whiteInfo.Direction,
- HouseholdIdNumber: whiteInfo.IdNumber,
- HouseholdHousename: whiteInfo.HouseName,
- DeviceId: whiteInfo.DeviceId,
- HouseholdUser: whiteInfo.Name,
- IsVisitor: visitor,
- GardenId: whiteInfo.GardenId,
- OpenType: 1,
- CardNumber: "",
- VisitorPhone: vphone,
- VisitorName: vname,
- Online: 1,
- }
- messageId := fmt.Sprintf("qcodeopen_%d_%d_%s", uid, time.Now().Unix(), utils.GenerateRandomStr(6, "mix"))
- mreq := UnlockCmd{
- Operator: "Unlock",
- MessageId: messageId,
- Info: UnlockInfo{OpenDoor: 1},
- }
- bytes, _ := json.Marshal(&mreq)
- // 向mqtt发送命令
- err = mqtt.Publish(mqtt.MqttCli, GetTopic(faceId), bytes)
- if err != nil {
- logger.Error("func",
- zap.String("call", "mqtt.Publish"),
- zap.String("error", err.Error()))
- return
- }
- recordBytes, _ := json.Marshal(recordReq)
- cache.Redis().SetEx(messageId, 10, string(recordBytes))
- }
- func SubOne(sn string) error {
- v := sn
- // 人脸识别结果
- err := mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/%v/Rec", v), RecCallback)
- if err != nil {
- return err
- }
- // 回复服务器下发指令
- err = mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/%v/Ack", v), AckCallback)
- if err != nil {
- return err
- }
- // 二维码信息上报
- err = mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/%v/QRCode", v), QcodeCallback)
- if err != nil {
- return err
- }
- // 卡号信息上报
- err = mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/%v/Card", v), CardCallback)
- if err != nil {
- return err
- }
- return nil
- }
- func SubAll() {
- devices, err := getAllDevice()
- if err != nil {
- panic("mqtt 订阅前获取设备失败:" + err.Error())
- }
- for _, v := range devices {
- // 人脸识别结果
- err = mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/%v/Rec", v), RecCallback)
- if err != nil {
- panic("mqtt 订阅失败:" + err.Error())
- }
- // 回复服务器下发指令
- err = mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/%v/Ack", v), AckCallback)
- if err != nil {
- panic("mqtt 订阅失败:" + err.Error())
- }
- // 二维码信息上报
- err = mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/%v/QRCode", v), QcodeCallback)
- if err != nil {
- panic("mqtt 订阅失败:" + err.Error())
- }
- // 卡号信息上报
- err = mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/%v/Card", v), CardCallback)
- if err != nil {
- panic("mqtt 订阅失败:" + err.Error())
- }
- }
- // 心跳
- err = mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/heartbeat"), HeartbeatCallback)
- if err != nil {
- panic("mqtt 订阅失败:" + err.Error())
- }
- }
|