123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- // Copyright 2019 github.com. All rights reserved.
- // Use of this source code is governed by github.com.
- package model
- import (
- "dust-monitor/consts"
- "dust-monitor/errors"
- "encoding/json"
- "strings"
- "time"
- "github.com/jaryhe/gopkgs/logger"
- "go.uber.org/zap"
- iclient "github.com/influxdata/influxdb/client/v2"
- "github.com/jaryhe/gopkgs/influxdb"
- )
- func WriteDustData(sn string, tags map[string]string, fields map[string]interface{}, t time.Time, dbName string) error {
- // 主动获取数据转换
- bp, err := iclient.NewBatchPoints(iclient.BatchPointsConfig{
- Database: dbName,
- Precision: "ns",
- })
- if err != nil {
- logger.Error("func",
- zap.String("call", "NewBatchPoints"),
- zap.String("args", dbName),
- zap.String("error", err.Error()))
- return errors.DataBaseError
- }
- pt, err := iclient.NewPoint(
- sn,
- tags,
- fields,
- t,
- )
- if err != nil {
- logger.Error("func",
- zap.String("call", "NewPoint"),
- zap.String("args", sn),
- zap.String("error", err.Error()))
- return errors.DataBaseError
- }
- bp.AddPoint(pt)
- err = influxdb.InfluxCli.Write(bp)
- if err != nil {
- logger.Error("func",
- zap.String("call", "InfluxCli.Write"),
- zap.String("args", sn),
- zap.String("error", err.Error()))
- if strings.Contains(err.Error(), consts.DatabaseNotFound) {
- Create(influxdb.InfluxCli, dbName)
- err = influxdb.InfluxCli.Write(bp)
- if err != nil {
- return errors.DataBaseError
- }
- return nil
- }
- return errors.DataBaseError
- }
- return nil
- }
- func Query(sql string, db string, result interface{}) ([]map[string]interface{}, error) {
- qt := iclient.Query{
- Database: db,
- Command: sql,
- }
- r, err := influxdb.InfluxCli.Query(qt)
- if err != nil {
- logger.Error("func",
- zap.String("call", "Query "+ db),
- zap.String("args", sql),
- zap.String("error", err.Error()))
- return nil, err
- }
- if r == nil {
- return nil, nil
- }
- if len(r.Results) == 0 {
- return nil, nil
- }
- if len(r.Results[0].Series) == 0 {
- return nil, nil
- }
- colNames := r.Results[0].Series[0].Columns
- var marray = make([]map[string]interface{}, len(r.Results[0].Series[0].Values))
- for i, row := range r.Results[0].Series[0].Values {
- item := map[string]interface{}{}
- for j, v := range row {
- if colNames[j] == "time" {
- t, _ := time.Parse(time.RFC3339, v.(string))
- v = t.Format("2006-01-02 15:04:05")
- }
- item[colNames[j]] = v
- }
- marray[i] = item
- }
- if result != nil {
- bytes, _ := json.Marshal(marray)
- err = json.Unmarshal(bytes, result)
- if err != nil {
- return nil, err
- }
- }
- return marray, nil
- }
- func Create(conn iclient.Client, db string) error {
- qt := iclient.Query{
- Command: "create database " + db,
- }
- _, err := conn.Query(qt)
- if err != nil {
- return err
- }
- return nil
- }
- func WriteDustDatas(sn string, tags map[string]string, fields []map[string]interface{}, t []time.Time, cmdType string) error {
- // 主动获取数据转换
- switch cmdType {
- case "2111":
- cmdType = "2011"
- case "2131":
- cmdType = "2031"
- case "2161":
- cmdType = "2061"
- }
- if len(fields) != len(t) {
- return errors.ParamsError
- }
- dbName := "dust_monitor_" + cmdType
- bp, err := iclient.NewBatchPoints(iclient.BatchPointsConfig{
- Database: dbName,
- Precision: "ns",
- })
- if err != nil {
- logger.Error("func",
- zap.String("call", "NewBatchPoints"),
- zap.String("args", dbName),
- zap.String("error", err.Error()))
- return errors.DataBaseError
- }
- pts := make([]*iclient.Point, len(fields))
- for i, _ := range fields {
- pt, err := iclient.NewPoint(
- sn,
- tags,
- fields[i],
- t[i],
- )
- if err != nil {
- logger.Error("func",
- zap.String("call", "NewPoint"),
- zap.String("args", sn),
- zap.String("error", err.Error()))
- return errors.DataBaseError
- }
- pts[i] = pt
- }
- bp.AddPoints(pts)
- err = influxdb.InfluxCli.Write(bp)
- if err != nil {
- logger.Error("func",
- zap.String("call", "InfluxCli.Write"),
- zap.String("args", sn),
- zap.String("error", err.Error()))
- if strings.Contains(err.Error(), consts.DatabaseNotFound) {
- Create(influxdb.InfluxCli, dbName)
- err = influxdb.InfluxCli.Write(bp)
- if err != nil {
- return errors.DataBaseError
- }
- return nil
- }
- return errors.DataBaseError
- }
- return nil
- }
|