123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347 |
- package task_management
- import (
- "context"
- "encoding/json"
- "fmt"
- "adm-management/errors"
- "adm-management/model"
- v1 "adm-management/pb/v1"
- "git.getensh.com/common/gopkgsv2/database"
- "git.getensh.com/common/gopkgsv2/logger"
- "go.uber.org/zap"
- "google.golang.org/grpc/status"
- "gorm.io/gorm"
- )
- var a = 1
- func inSlice(arr []string, s string) bool {
- for _, v := range arr {
- if v == s {
- return true
- }
- }
- return false
- }
- // 任务信息 向上的关系
- func toTaskId(
- ctx context.Context,
- taskId int64,
- tables []string,
- ) (reply *v1.DataSourceRelationReply_Node, err error, t []string) {
- // 获取task信息
- task, err := model.NewTaskList().Get(database.DB().Where("task_id=?", taskId))
- if err != nil && err != gorm.ErrRecordNotFound {
- return reply, errors.SystemError, t
- }
- if err == gorm.ErrRecordNotFound {
- return reply, nil, t
- }
- if inSlice(tables, task.TaskName) {
- return reply, nil, t
- }
- tables = append(tables, task.TaskName)
- t = append(t, task.TaskName)
- reply = &v1.DataSourceRelationReply_Node{
- NodeName: task.TaskName,
- Desc: task.Desc,
- IsTask: true,
- }
- // 获取依赖信息
- dependency, err := model.NewDependency().GetSourceByTaskId(database.DB().Where("t1.task_id = ?", taskId))
- if err != nil && err != gorm.ErrRecordNotFound {
- return reply, errors.SystemError, t
- }
- if err == nil {
- if !inSlice(tables, dependency.TableName) {
- reply.Rely = append(reply.Rely, &v1.Rely{
- TableName: dependency.TableName,
- Desc: dependency.Desc,
- })
- tables = append(tables, dependency.TableName)
- t = append(t, dependency.TableName)
- }
- }
- // 向上
- pagination := model.NewPagination(0, 100, 0).GetLimitOffset()
- relationList, err := model.NewRelation().List(database.DB().Where("task_id = ? and is_on = 1", taskId), pagination)
- for _, v := range relationList {
- source1, err := model.NewDataList().Get(database.DB().Where("source_code = ?", v.SourceCode))
- if err != nil && err != gorm.ErrRecordNotFound {
- return nil, err, t
- }
- entry, err := toSourceCode(ctx, &v1.DataSourceRelationRequest{
- SourceCode: source1.SourceCode,
- }, tables)
- if err != nil {
- return nil, err, t
- }
- if entry != nil {
- reply.Entry = append(reply.Entry, entry)
- }
- }
- return reply, nil, t
- }
- // 任务信息 只需要向下的关系
- func fromTaskId(
- ctx context.Context,
- taskId int64,
- tables []string,
- in *v1.DataSourceRelationReply_Node,
- ) (reply *v1.DataSourceRelationReply_Node, err error) {
- // 获取task信息
- task, err := model.NewTaskList().Get(database.DB().Where("task_id=?", taskId))
- if err != nil && err != gorm.ErrRecordNotFound {
- return reply, errors.SystemError
- }
- if err == gorm.ErrRecordNotFound {
- return reply, nil
- }
- if inSlice(tables, task.TaskName) {
- return reply, nil
- }
- tables = append(tables, task.TaskName)
- if a == 1 {
- reply = in
- a++
- } else {
- reply = &v1.DataSourceRelationReply_Node{
- NodeName: task.TaskName,
- Desc: task.Desc,
- IsTask: true,
- }
- }
- // 获取依赖信息
- dependency, err := model.NewDependency().GetSourceByTaskId(database.DB().Where("t1.task_id = ?", taskId))
- if err != nil && err != gorm.ErrRecordNotFound {
- return reply, errors.SystemError
- }
- if err == nil {
- if !inSlice(tables, dependency.TableName) {
- reply.Rely = append(reply.Rely, &v1.Rely{
- TableName: dependency.TableName,
- Desc: dependency.Desc,
- })
- tables = append(tables, dependency.TableName)
- }
- }
- // use TargetTable find TargetSourceCode
- source, err := model.NewDataList().Get(database.DB().Where("table_name = ?", task.TargetTable))
- if err != nil && err != gorm.ErrRecordNotFound {
- return nil, err
- }
- if err == gorm.ErrRecordNotFound {
- reply.Output = []*v1.DataSourceRelationReply_Node{{
- NodeName: task.TargetTable,
- Desc: source.Desc,
- }}
- return reply, nil
- }
- output, err := fromSourceCode(ctx, &v1.DataSourceRelationRequest{
- SourceCode: source.SourceCode,
- }, tables, nil)
- if err != nil {
- return nil, err
- }
- if output != nil && output.Output != nil {
- reply.Output = []*v1.DataSourceRelationReply_Node{{
- NodeName: task.TargetTable,
- Desc: source.Desc,
- Output: output.Output,
- }}
- } else {
- reply.Output = []*v1.DataSourceRelationReply_Node{{
- NodeName: task.TargetTable,
- Desc: source.Desc,
- }}
- }
- return reply, nil
- }
- // toSourceCode 向上
- func toSourceCode(
- ctx context.Context,
- req *v1.DataSourceRelationRequest,
- tables []string,
- ) (reply *v1.DataSourceRelationReply_Node, err error) {
- // 获取数据源信息
- source, err := model.NewDataList().Get(database.DB().Where("source_code = ?", req.SourceCode))
- if err != nil && err != gorm.ErrRecordNotFound {
- return reply, err
- }
- if err == gorm.ErrRecordNotFound {
- return reply, nil
- }
- if inSlice(tables, source.TableName) {
- return reply, nil
- }
- tables = append(tables, source.TableName)
- reply = &v1.DataSourceRelationReply_Node{
- NodeName: source.TableName,
- Desc: source.Desc,
- }
- // 向上获取TaskId
- // 构造分页类
- pagination1 := model.NewPagination(0, 100, 0).GetLimitOffset()
- task, err := model.NewTaskList().
- List(database.DB().Where("target_table = ? ", source.TableName), pagination1)
- if err != nil && err != gorm.ErrRecordNotFound {
- return reply, errors.SystemError
- }
- if err == gorm.ErrRecordNotFound {
- return reply, nil
- }
- // 获取输入关系
- for _, v := range task {
- node, err, _ := toTaskId(ctx, v.TaskId, tables)
- if err != nil {
- return reply, err
- }
- if node != nil && node.NodeName != "" {
- reply.Entry = append(reply.Entry, node)
- }
- }
- return reply, nil
- }
- // fromeSourceCode 任务列表需要上下的全部关系
- func fromSourceCode(
- ctx context.Context,
- req *v1.DataSourceRelationRequest,
- tables []string,
- in *v1.DataSourceRelationReply_Node,
- ) (reply *v1.DataSourceRelationReply_Node, err error) {
- // 获取数据源信息
- source, err := model.NewDataList().Get(database.DB().Where("source_code = ?", req.SourceCode))
- if err != nil && err != gorm.ErrRecordNotFound {
- return reply, err
- }
- if err == gorm.ErrRecordNotFound {
- return reply, nil
- }
- if inSlice(tables, source.TableName) {
- return reply, nil
- }
- tables = append(tables, source.TableName)
- if a == 1 {
- reply = in
- a++
- } else {
- reply = &v1.DataSourceRelationReply_Node{
- NodeName: source.TableName,
- Desc: source.Desc,
- }
- }
- // 向下获取TaskId
- // 构造分页类
- pagination := model.NewPagination(0, 100, 0).GetLimitOffset()
- relationList, err := model.NewRelation().
- List(database.DB().Where("source_code = ? AND is_on = 1", req.SourceCode), pagination)
- if err != nil && err != gorm.ErrRecordNotFound {
- return reply, errors.SystemError
- }
- if err == gorm.ErrRecordNotFound {
- return reply, nil
- }
- // 获取输出关系
- for _, v := range relationList {
- node, err := fromTaskId(ctx, v.TaskId, tables, nil)
- if err != nil {
- return reply, err
- }
- if node != nil && node.NodeName != "" {
- reply.Output = append(reply.Output, node)
- }
- }
- return reply, nil
- }
- // DataSourceRelation 血缘关系
- func DataSourceRelation(
- ctx context.Context,
- req *v1.DataSourceRelationRequest,
- ) (reply *v1.DataSourceRelationReply, err error) {
- reply = &v1.DataSourceRelationReply{}
- defer func() {
- if r := recover(); r != nil {
- err = fmt.Errorf("%+v", r)
- e := &status.Status{}
- if er := json.Unmarshal([]byte(err.Error()), e); er != nil {
- logger.Error("err",
- zap.String("system_err", err.Error()),
- zap.Stack("stacktrace"))
- }
- }
- }()
- if req.SourceCode == "" && req.TaskId == 0 {
- return reply, errors.ParamsError
- }
- tables := make([]string, 0, 20)
- a = 1
- // source 需要上下的全部关系
- // task 只需要向下的关系
- switch {
- case req.SourceCode != "":
- reply.Node, err = toSourceCode(ctx, req, tables)
- reply.Node, err = fromSourceCode(ctx, req, tables, reply.Node)
- case req.TaskId != 0:
- reply.Node, err, tables = toTaskId(ctx, req.TaskId, tables)
- tables = tables[1:]
- reply.Node, err = fromTaskId(ctx, req.TaskId, tables, reply.Node)
- }
- if err != nil {
- return reply, err
- }
- return reply, nil
- }
|