data_import.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package data_import
  2. import (
  3. "adm-management/consts"
  4. "adm-management/parser"
  5. v1 "adm-management/pb/v1"
  6. "context"
  7. "encoding/json"
  8. "fmt"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "git.getensh.com/common/gopkgsv2/database"
  13. "git.getensh.com/common/gopkgsv2/logger"
  14. "go.uber.org/zap"
  15. "google.golang.org/grpc/status"
  16. )
  17. type OdsMessage struct {
  18. MsgType string `json:"msg_type"`
  19. SourceCode string `json:"source_code"` // 来源编码
  20. OfflineTaskId int64 `json:"offline_task_id"` // 离线消息任务id
  21. TaskList []int `json:"task_list"` // 任务列表
  22. From int `json:"from"` // 消息来源类型 1 数据库 2 excel
  23. Content string `json:"content"`
  24. Timestamp int64 `json:"timestamp"`
  25. }
  26. func DataImport(ctx context.Context, req *v1.DataImportRequest) (reply *v1.DataImportReply, err error) {
  27. reply = &v1.DataImportReply{}
  28. // 捕获各个task中的异常并返回给调用者
  29. defer func() {
  30. if r := recover(); r != nil {
  31. err = fmt.Errorf("%+v", r)
  32. e := &status.Status{}
  33. if er := json.Unmarshal([]byte(err.Error()), e); er != nil {
  34. logger.Error("err",
  35. zap.String("system_err", err.Error()),
  36. zap.Stack("stacktrace"))
  37. }
  38. }
  39. }()
  40. db := database.DB()
  41. var taskIds string
  42. err = db.Raw("SELECT task_ids FROM t_adm_data_template where id = ? ", req.Id).Find(&taskIds).Error
  43. ids := strings.Split(taskIds, ",")
  44. // 构建消息
  45. odsMsg := OdsMessage{MsgType: consts.ODSMANUALAMENDMENT, From: 0}
  46. if len(ids) > 0 {
  47. for _, v := range ids {
  48. taskId, _ := strconv.Atoi(v)
  49. odsMsg.TaskList = append(odsMsg.TaskList, taskId)
  50. }
  51. }
  52. odsMsg.Timestamp = time.Now().Unix()
  53. requestList := make(map[string]string)
  54. for i := 0; i < len(req.RequestList); i++ {
  55. requestList[req.RequestList[i].Request] = req.RequestList[i].Value
  56. }
  57. nContent := make(map[string]interface{})
  58. nContent["new_content"] = requestList
  59. c, _ := json.Marshal(nContent)
  60. content := string(c)
  61. odsMsg.Content = content
  62. // 发送消息
  63. odsMsgByte, err := json.Marshal(odsMsg)
  64. if err != nil {
  65. return reply, err
  66. }
  67. err = parser.OdsMq.PublishMsg(odsMsgByte)
  68. if err != nil {
  69. return reply, err
  70. }
  71. return reply, err
  72. }