relation.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. package task_management
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "adm-management/errors"
  7. "adm-management/model"
  8. v1 "adm-management/pb/v1"
  9. "git.getensh.com/common/gopkgsv2/database"
  10. "git.getensh.com/common/gopkgsv2/logger"
  11. "go.uber.org/zap"
  12. "google.golang.org/grpc/status"
  13. "gorm.io/gorm"
  14. )
  15. var a = 1
  16. func inSlice(arr []string, s string) bool {
  17. for _, v := range arr {
  18. if v == s {
  19. return true
  20. }
  21. }
  22. return false
  23. }
  24. // 任务信息 向上的关系
  25. func toTaskId(
  26. ctx context.Context,
  27. taskId int64,
  28. tables []string,
  29. ) (reply *v1.DataSourceRelationReply_Node, err error, t []string) {
  30. // 获取task信息
  31. task, err := model.NewTaskList().Get(database.DB().Where("task_id=?", taskId))
  32. if err != nil && err != gorm.ErrRecordNotFound {
  33. return reply, errors.SystemError, t
  34. }
  35. if err == gorm.ErrRecordNotFound {
  36. return reply, nil, t
  37. }
  38. if inSlice(tables, task.TaskName) {
  39. return reply, nil, t
  40. }
  41. tables = append(tables, task.TaskName)
  42. t = append(t, task.TaskName)
  43. reply = &v1.DataSourceRelationReply_Node{
  44. NodeName: task.TaskName,
  45. Desc: task.Desc,
  46. IsTask: true,
  47. }
  48. // 获取依赖信息
  49. dependency, err := model.NewDependency().GetSourceByTaskId(database.DB().Where("t1.task_id = ?", taskId))
  50. if err != nil && err != gorm.ErrRecordNotFound {
  51. return reply, errors.SystemError, t
  52. }
  53. if err == nil {
  54. if !inSlice(tables, dependency.TableName) {
  55. reply.Rely = append(reply.Rely, &v1.Rely{
  56. TableName: dependency.TableName,
  57. Desc: dependency.Desc,
  58. })
  59. tables = append(tables, dependency.TableName)
  60. t = append(t, dependency.TableName)
  61. }
  62. }
  63. // 向上
  64. pagination := model.NewPagination(0, 100, 0).GetLimitOffset()
  65. relationList, err := model.NewRelation().List(database.DB().Where("task_id = ? and is_on = 1", taskId), pagination)
  66. for _, v := range relationList {
  67. source1, err := model.NewDataList().Get(database.DB().Where("source_code = ?", v.SourceCode))
  68. if err != nil && err != gorm.ErrRecordNotFound {
  69. return nil, err, t
  70. }
  71. entry, err := toSourceCode(ctx, &v1.DataSourceRelationRequest{
  72. SourceCode: source1.SourceCode,
  73. }, tables)
  74. if err != nil {
  75. return nil, err, t
  76. }
  77. if entry != nil {
  78. reply.Entry = append(reply.Entry, entry)
  79. }
  80. }
  81. return reply, nil, t
  82. }
  83. // 任务信息 只需要向下的关系
  84. func fromTaskId(
  85. ctx context.Context,
  86. taskId int64,
  87. tables []string,
  88. in *v1.DataSourceRelationReply_Node,
  89. ) (reply *v1.DataSourceRelationReply_Node, err error) {
  90. // 获取task信息
  91. task, err := model.NewTaskList().Get(database.DB().Where("task_id=?", taskId))
  92. if err != nil && err != gorm.ErrRecordNotFound {
  93. return reply, errors.SystemError
  94. }
  95. if err == gorm.ErrRecordNotFound {
  96. return reply, nil
  97. }
  98. if inSlice(tables, task.TaskName) {
  99. return reply, nil
  100. }
  101. tables = append(tables, task.TaskName)
  102. if a == 1 {
  103. reply = in
  104. a++
  105. } else {
  106. reply = &v1.DataSourceRelationReply_Node{
  107. NodeName: task.TaskName,
  108. Desc: task.Desc,
  109. IsTask: true,
  110. }
  111. }
  112. // 获取依赖信息
  113. dependency, err := model.NewDependency().GetSourceByTaskId(database.DB().Where("t1.task_id = ?", taskId))
  114. if err != nil && err != gorm.ErrRecordNotFound {
  115. return reply, errors.SystemError
  116. }
  117. if err == nil {
  118. if !inSlice(tables, dependency.TableName) {
  119. reply.Rely = append(reply.Rely, &v1.Rely{
  120. TableName: dependency.TableName,
  121. Desc: dependency.Desc,
  122. })
  123. tables = append(tables, dependency.TableName)
  124. }
  125. }
  126. // use TargetTable find TargetSourceCode
  127. source, err := model.NewDataList().Get(database.DB().Where("table_name = ?", task.TargetTable))
  128. if err != nil && err != gorm.ErrRecordNotFound {
  129. return nil, err
  130. }
  131. if err == gorm.ErrRecordNotFound {
  132. reply.Output = []*v1.DataSourceRelationReply_Node{{
  133. NodeName: task.TargetTable,
  134. Desc: source.Desc,
  135. }}
  136. return reply, nil
  137. }
  138. output, err := fromSourceCode(ctx, &v1.DataSourceRelationRequest{
  139. SourceCode: source.SourceCode,
  140. }, tables, nil)
  141. if err != nil {
  142. return nil, err
  143. }
  144. if output != nil && output.Output != nil {
  145. reply.Output = []*v1.DataSourceRelationReply_Node{{
  146. NodeName: task.TargetTable,
  147. Desc: source.Desc,
  148. Output: output.Output,
  149. }}
  150. } else {
  151. reply.Output = []*v1.DataSourceRelationReply_Node{{
  152. NodeName: task.TargetTable,
  153. Desc: source.Desc,
  154. }}
  155. }
  156. return reply, nil
  157. }
  158. // toSourceCode 向上
  159. func toSourceCode(
  160. ctx context.Context,
  161. req *v1.DataSourceRelationRequest,
  162. tables []string,
  163. ) (reply *v1.DataSourceRelationReply_Node, err error) {
  164. // 获取数据源信息
  165. source, err := model.NewDataList().Get(database.DB().Where("source_code = ?", req.SourceCode))
  166. if err != nil && err != gorm.ErrRecordNotFound {
  167. return reply, err
  168. }
  169. if err == gorm.ErrRecordNotFound {
  170. return reply, nil
  171. }
  172. if inSlice(tables, source.TableName) {
  173. return reply, nil
  174. }
  175. tables = append(tables, source.TableName)
  176. reply = &v1.DataSourceRelationReply_Node{
  177. NodeName: source.TableName,
  178. Desc: source.Desc,
  179. }
  180. // 向上获取TaskId
  181. // 构造分页类
  182. pagination1 := model.NewPagination(0, 100, 0).GetLimitOffset()
  183. task, err := model.NewTaskList().
  184. List(database.DB().Where("target_table = ? ", source.TableName), pagination1)
  185. if err != nil && err != gorm.ErrRecordNotFound {
  186. return reply, errors.SystemError
  187. }
  188. if err == gorm.ErrRecordNotFound {
  189. return reply, nil
  190. }
  191. // 获取输入关系
  192. for _, v := range task {
  193. node, err, _ := toTaskId(ctx, v.TaskId, tables)
  194. if err != nil {
  195. return reply, err
  196. }
  197. if node != nil && node.NodeName != "" {
  198. reply.Entry = append(reply.Entry, node)
  199. }
  200. }
  201. return reply, nil
  202. }
  203. // fromeSourceCode 任务列表需要上下的全部关系
  204. func fromSourceCode(
  205. ctx context.Context,
  206. req *v1.DataSourceRelationRequest,
  207. tables []string,
  208. in *v1.DataSourceRelationReply_Node,
  209. ) (reply *v1.DataSourceRelationReply_Node, err error) {
  210. // 获取数据源信息
  211. source, err := model.NewDataList().Get(database.DB().Where("source_code = ?", req.SourceCode))
  212. if err != nil && err != gorm.ErrRecordNotFound {
  213. return reply, err
  214. }
  215. if err == gorm.ErrRecordNotFound {
  216. return reply, nil
  217. }
  218. if inSlice(tables, source.TableName) {
  219. return reply, nil
  220. }
  221. tables = append(tables, source.TableName)
  222. if a == 1 {
  223. reply = in
  224. a++
  225. } else {
  226. reply = &v1.DataSourceRelationReply_Node{
  227. NodeName: source.TableName,
  228. Desc: source.Desc,
  229. }
  230. }
  231. // 向下获取TaskId
  232. // 构造分页类
  233. pagination := model.NewPagination(0, 100, 0).GetLimitOffset()
  234. relationList, err := model.NewRelation().
  235. List(database.DB().Where("source_code = ? AND is_on = 1", req.SourceCode), pagination)
  236. if err != nil && err != gorm.ErrRecordNotFound {
  237. return reply, errors.SystemError
  238. }
  239. if err == gorm.ErrRecordNotFound {
  240. return reply, nil
  241. }
  242. // 获取输出关系
  243. for _, v := range relationList {
  244. node, err := fromTaskId(ctx, v.TaskId, tables, nil)
  245. if err != nil {
  246. return reply, err
  247. }
  248. if node != nil && node.NodeName != "" {
  249. reply.Output = append(reply.Output, node)
  250. }
  251. }
  252. return reply, nil
  253. }
  254. // DataSourceRelation 血缘关系
  255. func DataSourceRelation(
  256. ctx context.Context,
  257. req *v1.DataSourceRelationRequest,
  258. ) (reply *v1.DataSourceRelationReply, err error) {
  259. reply = &v1.DataSourceRelationReply{}
  260. defer func() {
  261. if r := recover(); r != nil {
  262. err = fmt.Errorf("%+v", r)
  263. e := &status.Status{}
  264. if er := json.Unmarshal([]byte(err.Error()), e); er != nil {
  265. logger.Error("err",
  266. zap.String("system_err", err.Error()),
  267. zap.Stack("stacktrace"))
  268. }
  269. }
  270. }()
  271. if req.SourceCode == "" && req.TaskId == 0 {
  272. return reply, errors.ParamsError
  273. }
  274. tables := make([]string, 0, 20)
  275. a = 1
  276. // source 需要上下的全部关系
  277. // task 只需要向下的关系
  278. switch {
  279. case req.SourceCode != "":
  280. reply.Node, err = toSourceCode(ctx, req, tables)
  281. reply.Node, err = fromSourceCode(ctx, req, tables, reply.Node)
  282. case req.TaskId != 0:
  283. reply.Node, err, tables = toTaskId(ctx, req.TaskId, tables)
  284. tables = tables[1:]
  285. reply.Node, err = fromTaskId(ctx, req.TaskId, tables, reply.Node)
  286. }
  287. if err != nil {
  288. return reply, err
  289. }
  290. return reply, nil
  291. }