gate_record.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. // Copyright 2019 github.com. All rights reserved.
  2. // Use of this source code is governed by github.com.
  3. package model
  4. import (
  5. "encoding/json"
  6. "property-mqtt/errors"
  7. "strings"
  8. "time"
  9. "git.getensh.com/common/gopkgs/logger"
  10. "go.uber.org/zap"
  11. "git.getensh.com/common/gopkgs/influxdb"
  12. iclient "github.com/influxdata/influxdb/client/v2"
  13. )
  14. const (
  15. DatabaseNotFound = "database not found"
  16. GateDbname = "property_device_gate"
  17. )
  18. func WriteGateData(deviceId string, tags map[string]string, fields map[string]interface{}, t time.Time) error {
  19. // 主动获取数据转换
  20. bp, err := iclient.NewBatchPoints(iclient.BatchPointsConfig{
  21. Database: GateDbname,
  22. Precision: "ns",
  23. })
  24. if err != nil {
  25. logger.Error("func",
  26. zap.String("call", "NewBatchPoints"),
  27. zap.String("args", GateDbname),
  28. zap.String("error", err.Error()))
  29. return errors.DataBaseError
  30. }
  31. pt, err := iclient.NewPoint(
  32. deviceId,
  33. tags,
  34. fields,
  35. t,
  36. )
  37. if err != nil {
  38. logger.Error("func",
  39. zap.String("call", "NewPoint"),
  40. zap.String("args", deviceId),
  41. zap.String("error", err.Error()))
  42. return errors.DataBaseError
  43. }
  44. bp.AddPoint(pt)
  45. err = influxdb.InfluxCli.Write(bp)
  46. if err != nil {
  47. logger.Error("func",
  48. zap.String("call", "InfluxCli.Write"),
  49. zap.String("args", deviceId),
  50. zap.String("error", err.Error()))
  51. if strings.Contains(err.Error(), DatabaseNotFound) {
  52. Create(influxdb.InfluxCli)
  53. err = influxdb.InfluxCli.Write(bp)
  54. if err != nil {
  55. return errors.DataBaseError
  56. }
  57. return nil
  58. }
  59. return errors.DataBaseError
  60. }
  61. return nil
  62. }
  63. func Query(sql string, dbname string, result interface{},) ([]map[string]interface{}, error) {
  64. qt := iclient.Query{
  65. Database: dbname,
  66. Command: sql,
  67. }
  68. r, err := influxdb.InfluxCli.Query(qt)
  69. if err != nil {
  70. logger.Error("func",
  71. zap.String("call", "Query "+ dbname),
  72. zap.String("args", sql),
  73. zap.String("error", err.Error()))
  74. return nil, err
  75. }
  76. if r == nil {
  77. return nil, nil
  78. }
  79. if len(r.Results) == 0 {
  80. return nil, nil
  81. }
  82. if len(r.Results[0].Series) == 0 {
  83. return nil, nil
  84. }
  85. colNames := r.Results[0].Series[0].Columns
  86. var marray = make([]map[string]interface{}, len(r.Results[0].Series[0].Values))
  87. for i, row := range r.Results[0].Series[0].Values {
  88. item := map[string]interface{}{}
  89. for j, v := range row {
  90. if colNames[j] == "time" {
  91. t, _ := time.Parse(time.RFC3339, v.(string))
  92. v = t.Format("2006-01-02 15:04:05")
  93. }
  94. item[colNames[j]] = v
  95. }
  96. marray[i] = item
  97. }
  98. if result != nil {
  99. bytes, _ := json.Marshal(marray)
  100. err = json.Unmarshal(bytes, result)
  101. if err != nil {
  102. return nil, err
  103. }
  104. }
  105. return marray, nil
  106. }
  107. func Create(conn iclient.Client) error {
  108. qt := iclient.Query{
  109. Command: "create database " + GateDbname,
  110. }
  111. _, err := conn.Query(qt)
  112. if err != nil {
  113. return err
  114. }
  115. return nil
  116. }
  117. func WriteGateDatas(deviceId string, tags map[string]string, fields []map[string]interface{}, t []time.Time) error {
  118. // 主动获取数据转换
  119. bp, err := iclient.NewBatchPoints(iclient.BatchPointsConfig{
  120. Database: GateDbname,
  121. Precision: "ns",
  122. })
  123. if err != nil {
  124. logger.Error("func",
  125. zap.String("call", "NewBatchPoints"),
  126. zap.String("args", GateDbname),
  127. zap.String("error", err.Error()))
  128. return errors.DataBaseError
  129. }
  130. pts := make([]*iclient.Point, len(fields))
  131. for i, _ := range fields {
  132. pt, err := iclient.NewPoint(
  133. deviceId,
  134. tags,
  135. fields[i],
  136. t[i],
  137. )
  138. if err != nil {
  139. logger.Error("func",
  140. zap.String("call", "NewPoint"),
  141. zap.String("args", deviceId),
  142. zap.String("error", err.Error()))
  143. return errors.DataBaseError
  144. }
  145. pts[i] = pt
  146. }
  147. bp.AddPoints(pts)
  148. err = influxdb.InfluxCli.Write(bp)
  149. if err != nil {
  150. logger.Error("func",
  151. zap.String("call", "InfluxCli.Write"),
  152. zap.String("args", deviceId),
  153. zap.String("error", err.Error()))
  154. if strings.Contains(err.Error(), DatabaseNotFound) {
  155. Create(influxdb.InfluxCli)
  156. err = influxdb.InfluxCli.Write(bp)
  157. if err != nil {
  158. return errors.DataBaseError
  159. }
  160. return nil
  161. }
  162. return errors.DataBaseError
  163. }
  164. return nil
  165. }