sub.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. package mqtt_utils
  2. import (
  3. "context"
  4. "crypto/rc4"
  5. "encoding/hex"
  6. "encoding/json"
  7. "fmt"
  8. "git.getensh.com/common/gopkgs/cache"
  9. "git.getensh.com/common/gopkgs/logger"
  10. "git.getensh.com/common/gopkgs/mqtt"
  11. gmqtt "github.com/eclipse/paho.mqtt.golang"
  12. "github.com/tidwall/gjson"
  13. "go.uber.org/zap"
  14. "property-mqtt/parser"
  15. "property-mqtt/pb"
  16. pb_v1 "property-mqtt/pb/v1"
  17. "property-mqtt/utils"
  18. "strconv"
  19. "strings"
  20. "time"
  21. )
  22. const (
  23. UniqRecKeyPrefix = "UniqFaceDeviceRec_"
  24. UniqAckKeyPrefix = "UniqFaceDeviceAck_"
  25. UniqHeartKeyPrefix = "UniqFaceDeviceHeart_"
  26. UniqQcodeKeyPrefix = "UniqFaceQcode_"
  27. UniqCardKeyPrefix = "UniqCardRec_"
  28. )
  29. func getAllDevice() ([]string, error) {
  30. mreply, err := pb.Device.MqttFaceGate(context.Background(), &pb_v1.MqttFaceGateRequest{})
  31. if err != nil {
  32. return nil, err
  33. }
  34. return mreply.Sns, nil
  35. }
  36. func getKey(prefix string, faceid string, messageId string) string {
  37. return fmt.Sprintf("%s%s_%s", prefix, faceid, messageId)
  38. }
  39. func GetTopic(faceId string) string {
  40. return fmt.Sprintf("mqtt/face/%v", faceId)
  41. }
  42. // 分布式部署时,同时收到订阅的消息,只能有一个服务对该消息处理
  43. func MessageCanHandle(prefix string, faceid string, uniqflag string, second int64) bool {
  44. key := getKey(prefix, faceid, uniqflag)
  45. ret, _ := cache.Redis().SetNxEx(key, "1", second)
  46. return ret
  47. }
  48. func whiteCheck(sn string, protocol int32, codeVal string, codeType int32) (*pb_v1.GateWhiteMatchReply, error) {
  49. mreq := pb_v1.GateWhiteMatchRequest{
  50. Sn: sn,
  51. Protocol: protocol,
  52. CodeVal: codeVal,
  53. CodeType: codeType,
  54. }
  55. return pb.Device.GateWhiteMatch(context.Background(), &mreq)
  56. }
  57. func Rc4Decrypt(encrypt string, key string) string {
  58. if len(encrypt) < 4 {
  59. return ""
  60. }
  61. if encrypt[:4] != "CB01" {
  62. return ""
  63. }
  64. miwen, _ := hex.DecodeString(encrypt[4:])
  65. dest := make([]byte, len(miwen))
  66. cipher2, _ := rc4.NewCipher([]byte(key))
  67. cipher2.XORKeyStream(dest, []byte(miwen))
  68. return string(dest)
  69. }
  70. func checkQCode(codeStr string, gateKey string) (string, int32, string, string, int64) {
  71. loc, _ := time.LoadLocation("Local")
  72. visitor := int32(2)
  73. vname, vphone := "", ""
  74. vid := int64(0)
  75. text := Rc4Decrypt(codeStr, gateKey)
  76. if text == "" {
  77. return "", 0, "", "", 0
  78. }
  79. length := len(text)
  80. array := strings.Split(text[1:length-1], ",")
  81. uid := array[0]
  82. if len(array) < 4 {
  83. return "", 0, "", "", 0
  84. }
  85. subArray := strings.Split(array[0], "-")
  86. if len(subArray) > 3 {
  87. uid = subArray[0]
  88. visitor = 1
  89. vphone = subArray[1]
  90. vname = subArray[2]
  91. vid, _ = strconv.ParseInt(subArray[3], 10, 64)
  92. }
  93. start, _ := time.ParseInLocation("20060102150405", array[2], loc)
  94. end, _ := time.ParseInLocation("20060102150405", array[3], loc)
  95. now := time.Now()
  96. if now.Unix() < start.Unix() || now.Unix() > end.Unix() {
  97. return "", 0, "", "", 0
  98. }
  99. return uid, visitor, vname, vphone, vid
  100. }
  101. // 人脸识别结果
  102. func RecCallback(client gmqtt.Client, msg gmqtt.Message) {
  103. info := gjson.GetBytes(msg.Payload(), "info")
  104. if !info.IsObject() {
  105. return
  106. }
  107. faceId := info.Get("facesluiceId").String()
  108. timestr := info.Get("time").String()
  109. customId := info.Get("customId").String()
  110. if faceId == "" || customId == "" {
  111. return
  112. }
  113. // 只处理开门情况
  114. if info.Get("VerifyStatus").Int() != 1 {
  115. return
  116. }
  117. if !MessageCanHandle(UniqRecKeyPrefix, faceId, customId+"-"+timestr, 10) {
  118. return
  119. }
  120. t, _ := time.ParseInLocation("2006-01-02 15:04:05", timestr, time.Now().Location())
  121. uid, _ := strconv.ParseInt(customId, 10, 64)
  122. whiteInfo, err := whiteCheck(faceId, GateProtocolSaiboMqttV1, customId, 1)
  123. if err != nil || whiteInfo.DeviceId == 0 {
  124. return
  125. }
  126. mreq := pb_v1.GateRecordAddRequest{
  127. Sn: faceId,
  128. Protocol: GateProtocolSaiboMqttV1,
  129. OpenTime: t.Unix(),
  130. HouseholdUid: uid,
  131. Location: whiteInfo.Location,
  132. Direction: whiteInfo.Direction,
  133. HouseholdIdNumber: whiteInfo.IdNumber,
  134. HouseholdHousename: whiteInfo.HouseName,
  135. DeviceId: whiteInfo.DeviceId,
  136. HouseholdUser: whiteInfo.Name,
  137. IsVisitor: 2,
  138. GardenId: whiteInfo.GardenId,
  139. OpenType: 3,
  140. Online: 2,
  141. }
  142. _, err = pb.Device.GateRecordAdd(context.Background(), &mreq)
  143. if err != nil {
  144. logger.Error("func",
  145. zap.String("call", "pb.Device.GateRecordAdd"),
  146. zap.String("error", err.Error()))
  147. }
  148. }
  149. // 卡号识别结果
  150. func CardCallback(client gmqtt.Client, msg gmqtt.Message) {
  151. info := gjson.GetBytes(msg.Payload(), "info")
  152. if !info.IsObject() {
  153. return
  154. }
  155. faceId := info.Get("facesluiceId").String()
  156. timestr := info.Get("time").String()
  157. cardNumber := info.Get("CardInfo").Get("CardNum").String()
  158. if faceId == "" || cardNumber == "" {
  159. return
  160. }
  161. if !MessageCanHandle(UniqRecKeyPrefix, faceId, cardNumber+"-"+timestr, 10) {
  162. return
  163. }
  164. t, _ := time.ParseInLocation("2006-01-02 15:04:05", timestr, time.Now().Location())
  165. whiteInfo, err := whiteCheck(faceId, GateProtocolSaiboMqttV1, cardNumber, 2)
  166. if err != nil || whiteInfo.DeviceId == 0 {
  167. return
  168. }
  169. if whiteInfo.Status == 1 {
  170. messageId := fmt.Sprintf("cardopen_%s_%d_%s", cardNumber, time.Now().Unix(), utils.GenerateRandomStr(6, "mix"))
  171. mreq := UnlockCmd{
  172. Operator: "Unlock",
  173. MessageId: messageId,
  174. Info: UnlockInfo{OpenDoor: 1},
  175. }
  176. bytes, _ := json.Marshal(&mreq)
  177. // 向mqtt发送命令
  178. err := mqtt.Publish(mqtt.MqttCli, GetTopic(faceId), bytes)
  179. if err != nil {
  180. logger.Error("func",
  181. zap.String("call", "mqtt.Publish"),
  182. zap.String("error", err.Error()))
  183. return
  184. }
  185. recordReq := pb_v1.GateRecordAddRequest{
  186. Sn: faceId,
  187. Protocol: GateProtocolSaiboMqttV1,
  188. OpenTime: t.Unix(),
  189. HouseholdUid: 0,
  190. Location: whiteInfo.Location,
  191. Direction: whiteInfo.Direction,
  192. HouseholdIdNumber: whiteInfo.IdNumber,
  193. HouseholdHousename: whiteInfo.HouseName,
  194. DeviceId: whiteInfo.DeviceId,
  195. HouseholdUser: whiteInfo.Name,
  196. IsVisitor: 2,
  197. GardenId: whiteInfo.GardenId,
  198. OpenType: 2,
  199. CardNumber: cardNumber,
  200. Online: 1,
  201. }
  202. recordBytes, _ := json.Marshal(recordReq)
  203. cache.Redis().SetEx(messageId, 10, string(recordBytes))
  204. return
  205. }
  206. }
  207. // 设备对服务器的响应命令
  208. func AckCallback(client gmqtt.Client, msg gmqtt.Message) {
  209. info := gjson.GetBytes(msg.Payload(), "info")
  210. if !info.IsObject() {
  211. return
  212. }
  213. faceId := info.Get("facesluiceId").String()
  214. messageId := gjson.GetBytes(msg.Payload(), "messageId").String()
  215. operator := gjson.GetBytes(msg.Payload(), "operator").String()
  216. if faceId == "" || messageId == "" {
  217. return
  218. }
  219. //commond := GetCommand(faceId, messageId)
  220. //if commond == "" {
  221. // return
  222. //}
  223. if !MessageCanHandle(UniqAckKeyPrefix, faceId, messageId, 10) {
  224. return
  225. }
  226. if f, ok := CommandHandleMap[operator]; ok {
  227. go f("", string(msg.Payload()))
  228. }
  229. }
  230. // 心跳
  231. func HeartbeatCallback(client gmqtt.Client, msg gmqtt.Message) {
  232. info := gjson.GetBytes(msg.Payload(), "info")
  233. if !info.IsObject() {
  234. return
  235. }
  236. faceId := info.Get("facesluiceId").String()
  237. if faceId == "" {
  238. return
  239. }
  240. if !MessageCanHandle(UniqHeartKeyPrefix, faceId, "", 30) {
  241. return
  242. }
  243. mreq := pb_v1.GateOnlineRequest{Sn: faceId, Protocol: GateProtocolSaiboMqttV1}
  244. pb.Device.GateOnline(context.Background(), &mreq)
  245. }
  246. func QcodeCallback(client gmqtt.Client, msg gmqtt.Message) {
  247. info := gjson.GetBytes(msg.Payload(), "info")
  248. if !info.IsObject() {
  249. return
  250. }
  251. faceId := info.Get("facesluiceId").String()
  252. if faceId == "" {
  253. return
  254. }
  255. qcode := info.Get("QRCodeInfo").String()
  256. if qcode == "" {
  257. return
  258. }
  259. timestr := info.Get("time").String()
  260. if timestr == "" {
  261. return
  262. }
  263. if !MessageCanHandle(UniqQcodeKeyPrefix, faceId, qcode+timestr, 30) {
  264. return
  265. }
  266. uidStr, visitor, vname, vphone, vid := checkQCode(qcode, parser.Conf.GateKey)
  267. if uidStr == "" {
  268. return
  269. }
  270. uid, _ := strconv.ParseInt(uidStr, 10, 64)
  271. whiteInfo, err := whiteCheck(faceId, GateProtocolSaiboMqttV1, uidStr, 1)
  272. if err != nil || whiteInfo.Status != 1 {
  273. return
  274. }
  275. now := time.Now()
  276. if visitor == 1 {
  277. vreq := pb_v1.GateVisitorCheckRequest{
  278. Id: vid,
  279. OpenTime: now.Unix(),
  280. DeviceId: whiteInfo.DeviceId,
  281. }
  282. _, err := pb.Device.GateVisitorCheck(context.Background(), &vreq)
  283. if err != nil {
  284. return
  285. }
  286. }
  287. recordReq := pb_v1.GateRecordAddRequest{
  288. Sn: faceId,
  289. Protocol: GateProtocolSaiboMqttV1,
  290. OpenTime: now.Unix(),
  291. HouseholdUid: uid,
  292. Location: whiteInfo.Location,
  293. Direction: whiteInfo.Direction,
  294. HouseholdIdNumber: whiteInfo.IdNumber,
  295. HouseholdHousename: whiteInfo.HouseName,
  296. DeviceId: whiteInfo.DeviceId,
  297. HouseholdUser: whiteInfo.Name,
  298. IsVisitor: visitor,
  299. GardenId: whiteInfo.GardenId,
  300. OpenType: 1,
  301. CardNumber: "",
  302. VisitorPhone: vphone,
  303. VisitorName: vname,
  304. Online: 1,
  305. }
  306. messageId := fmt.Sprintf("qcodeopen_%d_%d_%s", uid, time.Now().Unix(), utils.GenerateRandomStr(6, "mix"))
  307. mreq := UnlockCmd{
  308. Operator: "Unlock",
  309. MessageId: messageId,
  310. Info: UnlockInfo{OpenDoor: 1},
  311. }
  312. bytes, _ := json.Marshal(&mreq)
  313. // 向mqtt发送命令
  314. err = mqtt.Publish(mqtt.MqttCli, GetTopic(faceId), bytes)
  315. if err != nil {
  316. logger.Error("func",
  317. zap.String("call", "mqtt.Publish"),
  318. zap.String("error", err.Error()))
  319. return
  320. }
  321. recordBytes, _ := json.Marshal(recordReq)
  322. cache.Redis().SetEx(messageId, 10, string(recordBytes))
  323. }
  324. func SubOne(sn string) error {
  325. v := sn
  326. // 人脸识别结果
  327. err := mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/%v/Rec", v), RecCallback)
  328. if err != nil {
  329. return err
  330. }
  331. // 回复服务器下发指令
  332. err = mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/%v/Ack", v), AckCallback)
  333. if err != nil {
  334. return err
  335. }
  336. // 二维码信息上报
  337. err = mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/%v/QRCode", v), QcodeCallback)
  338. if err != nil {
  339. return err
  340. }
  341. // 卡号信息上报
  342. err = mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/%v/Card", v), CardCallback)
  343. if err != nil {
  344. return err
  345. }
  346. return nil
  347. }
  348. func SubAll() {
  349. devices, err := getAllDevice()
  350. if err != nil {
  351. panic("mqtt 订阅前获取设备失败:" + err.Error())
  352. }
  353. for _, v := range devices {
  354. // 人脸识别结果
  355. err = mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/%v/Rec", v), RecCallback)
  356. if err != nil {
  357. panic("mqtt 订阅失败:" + err.Error())
  358. }
  359. // 回复服务器下发指令
  360. err = mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/%v/Ack", v), AckCallback)
  361. if err != nil {
  362. panic("mqtt 订阅失败:" + err.Error())
  363. }
  364. // 二维码信息上报
  365. err = mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/%v/QRCode", v), QcodeCallback)
  366. if err != nil {
  367. panic("mqtt 订阅失败:" + err.Error())
  368. }
  369. // 卡号信息上报
  370. err = mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/%v/Card", v), CardCallback)
  371. if err != nil {
  372. panic("mqtt 订阅失败:" + err.Error())
  373. }
  374. }
  375. // 心跳
  376. err = mqtt.Subscribe(mqtt.MqttCli, fmt.Sprintf("mqtt/face/heartbeat"), HeartbeatCallback)
  377. if err != nil {
  378. panic("mqtt 订阅失败:" + err.Error())
  379. }
  380. }