dust.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. // Copyright 2019 github.com. All rights reserved.
  2. // Use of this source code is governed by github.com.
  3. package model
  4. import (
  5. "dust-monitor/consts"
  6. "dust-monitor/errors"
  7. "encoding/json"
  8. "strings"
  9. "time"
  10. "github.com/jaryhe/gopkgs/logger"
  11. "go.uber.org/zap"
  12. iclient "github.com/influxdata/influxdb/client/v2"
  13. "github.com/jaryhe/gopkgs/influxdb"
  14. )
  15. func WriteDustData(sn string, tags map[string]string, fields map[string]interface{}, t time.Time, dbName string) error {
  16. // 主动获取数据转换
  17. bp, err := iclient.NewBatchPoints(iclient.BatchPointsConfig{
  18. Database: dbName,
  19. Precision: "ns",
  20. })
  21. if err != nil {
  22. logger.Error("func",
  23. zap.String("call", "NewBatchPoints"),
  24. zap.String("args", dbName),
  25. zap.String("error", err.Error()))
  26. return errors.DataBaseError
  27. }
  28. pt, err := iclient.NewPoint(
  29. sn,
  30. tags,
  31. fields,
  32. t,
  33. )
  34. if err != nil {
  35. logger.Error("func",
  36. zap.String("call", "NewPoint"),
  37. zap.String("args", sn),
  38. zap.String("error", err.Error()))
  39. return errors.DataBaseError
  40. }
  41. bp.AddPoint(pt)
  42. err = influxdb.InfluxCli.Write(bp)
  43. if err != nil {
  44. logger.Error("func",
  45. zap.String("call", "InfluxCli.Write"),
  46. zap.String("args", sn),
  47. zap.String("error", err.Error()))
  48. if strings.Contains(err.Error(), consts.DatabaseNotFound) {
  49. Create(influxdb.InfluxCli, dbName)
  50. err = influxdb.InfluxCli.Write(bp)
  51. if err != nil {
  52. return errors.DataBaseError
  53. }
  54. return nil
  55. }
  56. return errors.DataBaseError
  57. }
  58. return nil
  59. }
  60. func Query(sql string, db string, result interface{}) ([]map[string]interface{}, error) {
  61. qt := iclient.Query{
  62. Database: db,
  63. Command: sql,
  64. }
  65. r, err := influxdb.InfluxCli.Query(qt)
  66. if err != nil {
  67. logger.Error("func",
  68. zap.String("call", "Query "+ db),
  69. zap.String("args", sql),
  70. zap.String("error", err.Error()))
  71. return nil, err
  72. }
  73. if r == nil {
  74. return nil, nil
  75. }
  76. if len(r.Results) == 0 {
  77. return nil, nil
  78. }
  79. if len(r.Results[0].Series) == 0 {
  80. return nil, nil
  81. }
  82. colNames := r.Results[0].Series[0].Columns
  83. var marray = make([]map[string]interface{}, len(r.Results[0].Series[0].Values))
  84. for i, row := range r.Results[0].Series[0].Values {
  85. item := map[string]interface{}{}
  86. for j, v := range row {
  87. if colNames[j] == "time" {
  88. t, _ := time.Parse(time.RFC3339, v.(string))
  89. v = t.Format("2006-01-02 15:04:05")
  90. }
  91. item[colNames[j]] = v
  92. }
  93. marray[i] = item
  94. }
  95. if result != nil {
  96. bytes, _ := json.Marshal(marray)
  97. err = json.Unmarshal(bytes, result)
  98. if err != nil {
  99. return nil, err
  100. }
  101. }
  102. return marray, nil
  103. }
  104. func Create(conn iclient.Client, db string) error {
  105. qt := iclient.Query{
  106. Command: "create database " + db,
  107. }
  108. _, err := conn.Query(qt)
  109. if err != nil {
  110. return err
  111. }
  112. return nil
  113. }
  114. func WriteDustDatas(sn string, tags map[string]string, fields []map[string]interface{}, t []time.Time, cmdType string) error {
  115. // 主动获取数据转换
  116. switch cmdType {
  117. case "2111":
  118. cmdType = "2011"
  119. case "2131":
  120. cmdType = "2031"
  121. case "2161":
  122. cmdType = "2061"
  123. }
  124. if len(fields) != len(t) {
  125. return errors.ParamsError
  126. }
  127. dbName := "dust_monitor_" + cmdType
  128. bp, err := iclient.NewBatchPoints(iclient.BatchPointsConfig{
  129. Database: dbName,
  130. Precision: "ns",
  131. })
  132. if err != nil {
  133. logger.Error("func",
  134. zap.String("call", "NewBatchPoints"),
  135. zap.String("args", dbName),
  136. zap.String("error", err.Error()))
  137. return errors.DataBaseError
  138. }
  139. pts := make([]*iclient.Point, len(fields))
  140. for i, _ := range fields {
  141. pt, err := iclient.NewPoint(
  142. sn,
  143. tags,
  144. fields[i],
  145. t[i],
  146. )
  147. if err != nil {
  148. logger.Error("func",
  149. zap.String("call", "NewPoint"),
  150. zap.String("args", sn),
  151. zap.String("error", err.Error()))
  152. return errors.DataBaseError
  153. }
  154. pts[i] = pt
  155. }
  156. bp.AddPoints(pts)
  157. err = influxdb.InfluxCli.Write(bp)
  158. if err != nil {
  159. logger.Error("func",
  160. zap.String("call", "InfluxCli.Write"),
  161. zap.String("args", sn),
  162. zap.String("error", err.Error()))
  163. if strings.Contains(err.Error(), consts.DatabaseNotFound) {
  164. Create(influxdb.InfluxCli, dbName)
  165. err = influxdb.InfluxCli.Write(bp)
  166. if err != nil {
  167. return errors.DataBaseError
  168. }
  169. return nil
  170. }
  171. return errors.DataBaseError
  172. }
  173. return nil
  174. }