dust.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. // Copyright 2019 github.com. All rights reserved.
  2. // Use of this source code is governed by github.com.
  3. package model
  4. import (
  5. "dust-monitor/consts"
  6. "dust-monitor/errors"
  7. "encoding/json"
  8. "fmt"
  9. "strings"
  10. "time"
  11. "github.com/jaryhe/gopkgs/logger"
  12. "go.uber.org/zap"
  13. iclient "github.com/influxdata/influxdb/client/v2"
  14. "github.com/jaryhe/gopkgs/influxdb"
  15. )
  16. func WriteDustData(sn string, tags map[string]string, fields map[string]interface{}, t time.Time, cmdType string) error {
  17. // 主动获取数据转换
  18. switch cmdType {
  19. case "2111":
  20. cmdType = "2011"
  21. case "2131":
  22. cmdType = "2031"
  23. case "2161":
  24. cmdType = "2061"
  25. }
  26. dbName := "dust_monitor_" + cmdType
  27. fmt.Printf("xxxxxxxx:%v,%v\n", dbName,sn)
  28. bp, err := iclient.NewBatchPoints(iclient.BatchPointsConfig{
  29. Database: dbName,
  30. Precision: "ns",
  31. })
  32. if err != nil {
  33. logger.Error("func",
  34. zap.String("call", "NewBatchPoints"),
  35. zap.String("args", dbName),
  36. zap.String("error", err.Error()))
  37. return errors.DataBaseError
  38. }
  39. pt, err := iclient.NewPoint(
  40. sn,
  41. tags,
  42. fields,
  43. t,
  44. )
  45. if err != nil {
  46. logger.Error("func",
  47. zap.String("call", "NewPoint"),
  48. zap.String("args", sn),
  49. zap.String("error", err.Error()))
  50. return errors.DataBaseError
  51. }
  52. bp.AddPoint(pt)
  53. err = influxdb.InfluxCli.Write(bp)
  54. if err != nil {
  55. logger.Error("func",
  56. zap.String("call", "InfluxCli.Write"),
  57. zap.String("args", sn),
  58. zap.String("error", err.Error()))
  59. if strings.Contains(err.Error(), consts.DatabaseNotFound) {
  60. Create(influxdb.InfluxCli, dbName)
  61. err = influxdb.InfluxCli.Write(bp)
  62. if err != nil {
  63. return errors.DataBaseError
  64. }
  65. return nil
  66. }
  67. return errors.DataBaseError
  68. }
  69. return nil
  70. }
  71. func Query(sql string, db string, result interface{}) ([]map[string]interface{}, error) {
  72. qt := iclient.Query{
  73. Database: db,
  74. Command: sql,
  75. }
  76. r, err := influxdb.InfluxCli.Query(qt)
  77. if err != nil {
  78. logger.Error("func",
  79. zap.String("call", "Query "+ db),
  80. zap.String("args", sql),
  81. zap.String("error", err.Error()))
  82. return nil, err
  83. }
  84. if r == nil {
  85. return nil, nil
  86. }
  87. if len(r.Results) == 0 {
  88. return nil, nil
  89. }
  90. if len(r.Results[0].Series) == 0 {
  91. return nil, nil
  92. }
  93. colNames := r.Results[0].Series[0].Columns
  94. var marray = make([]map[string]interface{}, len(r.Results[0].Series[0].Values))
  95. for i, row := range r.Results[0].Series[0].Values {
  96. item := map[string]interface{}{}
  97. for j, v := range row {
  98. if colNames[j] == "time" {
  99. t, _ := time.Parse(time.RFC3339, v.(string))
  100. v = t.Format("2006-01-02 15:04:05")
  101. }
  102. item[colNames[j]] = v
  103. }
  104. marray[i] = item
  105. }
  106. if result != nil {
  107. bytes, _ := json.Marshal(marray)
  108. err = json.Unmarshal(bytes, result)
  109. if err != nil {
  110. return nil, err
  111. }
  112. }
  113. return marray, nil
  114. }
  115. func Create(conn iclient.Client, db string) error {
  116. qt := iclient.Query{
  117. Command: "create database " + db,
  118. }
  119. _, err := conn.Query(qt)
  120. if err != nil {
  121. return err
  122. }
  123. return nil
  124. }
  125. func WriteDustDatas(sn string, tags map[string]string, fields []map[string]interface{}, t []time.Time, cmdType string) error {
  126. // 主动获取数据转换
  127. switch cmdType {
  128. case "2111":
  129. cmdType = "2011"
  130. case "2131":
  131. cmdType = "2031"
  132. case "2161":
  133. cmdType = "2061"
  134. }
  135. if len(fields) != len(t) {
  136. return errors.ParamsError
  137. }
  138. dbName := "dust_monitor_" + cmdType
  139. bp, err := iclient.NewBatchPoints(iclient.BatchPointsConfig{
  140. Database: dbName,
  141. Precision: "ns",
  142. })
  143. if err != nil {
  144. logger.Error("func",
  145. zap.String("call", "NewBatchPoints"),
  146. zap.String("args", dbName),
  147. zap.String("error", err.Error()))
  148. return errors.DataBaseError
  149. }
  150. pts := make([]*iclient.Point, len(fields))
  151. for i, _ := range fields {
  152. pt, err := iclient.NewPoint(
  153. sn,
  154. tags,
  155. fields[i],
  156. t[i],
  157. )
  158. if err != nil {
  159. logger.Error("func",
  160. zap.String("call", "NewPoint"),
  161. zap.String("args", sn),
  162. zap.String("error", err.Error()))
  163. return errors.DataBaseError
  164. }
  165. pts[i] = pt
  166. }
  167. bp.AddPoints(pts)
  168. err = influxdb.InfluxCli.Write(bp)
  169. if err != nil {
  170. logger.Error("func",
  171. zap.String("call", "InfluxCli.Write"),
  172. zap.String("args", sn),
  173. zap.String("error", err.Error()))
  174. if strings.Contains(err.Error(), consts.DatabaseNotFound) {
  175. Create(influxdb.InfluxCli, dbName)
  176. err = influxdb.InfluxCli.Write(bp)
  177. if err != nil {
  178. return errors.DataBaseError
  179. }
  180. return nil
  181. }
  182. return errors.DataBaseError
  183. }
  184. return nil
  185. }