123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- package manual_task
- import (
- "adm-management/consts"
- "adm-management/errors"
- "adm-management/model"
- "adm-management/parser"
- v1 "adm-management/pb/v1"
- "context"
- "encoding/json"
- "fmt"
- "strconv"
- "strings"
- "gorm.io/gorm"
- "git.getensh.com/common/gopkgsv2/database"
- "git.getensh.com/common/gopkgsv2/logger"
- "go.uber.org/zap"
- "google.golang.org/grpc/status"
- )
- type OdsMessage struct {
- MsgType string `json:"msg_type"`
- SourceCode string `json:"source_code"` // 来源编码
- OfflineTaskId int64 `json:"offline_task_id"` // 离线消息任务id
- TaskList []int `json:"task_list"` // 任务列表
- From int `json:"from"` // 消息来源类型 1 数据库 2 excel
- Content string `json:"content"`
- Timestamp int64 `json:"timestamp"`
- }
- func Create(ctx context.Context, req *v1.CreateRequest) (reply *v1.CreateReply, err error) {
- reply = &v1.CreateReply{}
- // 捕获各个task中的异常并返回给调用者
- defer func() {
- if r := recover(); r != nil {
- err = fmt.Errorf("%+v", r)
- e := &status.Status{}
- if er := json.Unmarshal([]byte(err.Error()), e); er != nil {
- logger.Error("err",
- zap.String("system_err", err.Error()),
- zap.Stack("stacktrace"))
- }
- }
- }()
- db := database.DB().Begin()
- err = createImpl(db, req)
- if err != nil {
- db.Rollback()
- } else {
- db.Commit()
- }
- return reply, err
- }
- func createImpl(db *gorm.DB, req *v1.CreateRequest) (err error) {
- req.Sql = strings.TrimSpace(req.Sql)
- req.Sql = strings.ToLower(req.Sql)
- if req.Type != consts.FromDb && req.Type != consts.FromExcel {
- return errors.ParamsError
- }
- if req.Type == consts.FromExcel && len(req.TaskIds) <= 0 {
- return errors.ParamsError
- }
- // 遍历任务名改为任务id
- var taskIds string
- var ids []string
- for _, i := range req.TaskIds {
- tasks, err := model.NewTaskList().Get(db.Where("task_name = ?", i))
- if err != nil && err != gorm.ErrRecordNotFound {
- return err
- }
- if err == gorm.ErrRecordNotFound {
- fmt.Println("not found")
- return nil
- }
- ids = append(ids, strconv.Itoa(int(tasks.TaskId)))
- }
- taskIds = strings.Join(ids, ",")
- // 构建消息
- odsMsg := OdsMessage{MsgType: consts.ODSOFFLINEIMPORT, From: int(req.Type)}
- if len(ids) > 0 {
- for _, v := range ids {
- taskId, _ := strconv.Atoi(v)
- odsMsg.TaskList = append(odsMsg.TaskList, taskId)
- }
- }
- offlineTask := &model.OfflineTask{TaskName: req.TaskName, Type: req.Type, Source: req.Source, Sql: req.Sql, TaskIds: taskIds}
- // TODO 事务处理
- err = offlineTask.Insert(db)
- if err != nil {
- if err == gorm.ErrRecordNotFound {
- return nil
- }
- return errors.SystemError
- }
- odsMsg.OfflineTaskId = offlineTask.ID
- // 选择源表
- if req.Type == 1 {
- // 通过source 获取库表
- //dataSource, err := model.NewDataList().Get(database.DB().Where("source_code = ?", req.Source))
- dataSource, err := model.NewDataList().Get(database.DB().Where("table_name = ?", req.Source))
- if err != nil && err != gorm.ErrRecordNotFound {
- return err
- }
- odsMsg.SourceCode = dataSource.SourceCode
- if req.Sql == "" {
- odsMsg.Content = fmt.Sprintf("select * from %s.%s", dataSource.Db, dataSource.TableName)
- } else if strings.HasPrefix(req.Sql, "where") {
- odsMsg.Content = fmt.Sprintf("select * from %s.%s %s", dataSource.Db, dataSource.TableName, req.Sql)
- } else {
- odsMsg.Content = fmt.Sprintf("select * from %s.%s where %s", dataSource.Db, dataSource.TableName, req.Sql)
- }
- // 判断sql是否正确
- err = db.Exec(fmt.Sprintf("%s limit 1", odsMsg.Content)).Error
- if err != nil {
- return errors.SqlError
- }
- } else {
- contentList := strings.Split(req.Source, "/")
- odsMsg.Content = contentList[len(contentList)-1]
- }
- // 发送消息
- odsMsgByte, err := json.Marshal(odsMsg)
- if err != nil {
- return err
- }
- err = parser.OdsMq.PublishMsg(odsMsgByte)
- if err != nil {
- return err
- }
- return nil
- }
|