log_hour_task.go 12 KB


  1. package crontab
  2. import (
  3. "context"
  4. "gd_crontab/apis"
  5. "fmt"
  6. "runtime"
  7. "strings"
  8. "time"
  9. "gd_crontab/common.in/utils"
  10. "github.com/astaxie/beego/orm"
  11. "go.uber.org/zap"
  12. )
  13. type TGdApiCodeHour struct {
  14. Id int64 `json:"id"`
  15. MerchantId int64 `json:"merchant_id"`
  16. ApiId int64 `json:"api_id"`
  17. Code int `json:"code"`
  18. Msg string `json:"msg"`
  19. Count int64 `json:"count"`
  20. Hour int64 `json:"hour"`
  21. State bool `json:"state"`
  22. MerchantName string `json:"merchant_name"`
  23. ApiName string `json:"api_name"`
  24. }
  25. type TGdProviderCodeHour struct {
  26. Id int64 `json:"id"`
  27. ProviderApiId int64 `json:"api_id"`
  28. Code string `json:"code"`
  29. Msg string `json:"msg"`
  30. Count int64 `json:"count"`
  31. Hour int64 `json:"hour"`
  32. State bool `json:"state"`
  33. ProviderName string `json:"provider_name"`
  34. ProviderApiName string `json:"provider_api_name"`
  35. }
  36. type TGdReportHour struct {
  37. Id int64
  38. MerchantId int64
  39. ApiId int64
  40. MerchantName string
  41. Date string `json:"date"`
  42. MerchantDataApiId int64
  43. ApiName string `json:"api_name"`
  44. Total int64 `json:"total"`
  45. Valid int64 `json:"valid"`
  46. Success int64 `json:"success"`
  47. Query int64 `json:"query"`
  48. Reuse int64 `json:"reuse"`
  49. SumElapsed float64 `json:"sum_elapsed"`
  50. PlatformError int64 `json:"platform_error"`
  51. ProviderError int64 `json:"provider_error"`
  52. Charge int64 `json:"charge"`
  53. Hour int64 `json:"hour"`
  54. NonredundantQuery int `json:"nonredundant_query"`
  55. Nonredundant int `json:"nonredundant"`
  56. ChargeReuse int64 `json:"charge_reuse"`
  57. }
  58. type TGdProviderReportHour struct {
  59. Id int64
  60. Date string `json:"date"`
  61. ProviderName string `json:"provider_name"`
  62. MerchantId int64 `json:"merchant_id"`
  63. ApiId int64 `json:"api_id"`
  64. ProviderApiName string `json:"provider_api_name"`
  65. Total int64 `json:"total"`
  66. Success int64 `json:"success"`
  67. Failed int64 `json:"failed"`
  68. Query int64 `json:"query"`
  69. SumElapsed float64 `json:"sum_elapsed"`
  70. Charge int64 `json:"charge"`
  71. ProviderApiId int64 `json:"provider_api_id"`
  72. Hour int64 `json:"hour"`
  73. }
  74. func computeApiReportNormal(start, end int64) (ret []TGdReportHour) {
  75. mreq := apis.LogQueryUserAccessCountExportReq{}
  76. mreply := apis.LogQueryUserAccessCountExportReply{}
  77. mreq.StartTimestamp = start
  78. mreq.EndTimestamp = end
  79. mreq.GroupByMerchant = true
  80. mreq.TabName = "t_gd_access_log_day"
  81. err := LogQueryUserAccessCountExport(context.Background(), &mreq, &mreply)
  82. if err != nil {
  83. l.Error("func",
  84. zap.String("call", "LogQueryUserAccessCountExport"),
  85. zap.String("args", utils.MarshalJsonString(mreq)),
  86. zap.String("error", err.Error()))
  87. return nil
  88. }
  89. ret = make([]TGdReportHour, len(mreply.LogQueryUserAcessCount))
  90. for i, v := range mreply.LogQueryUserAcessCount {
  91. item := &ret[i]
  92. item.Hour = start
  93. item.Date = time.Unix(start, 0).Format("2006-01-02")
  94. item.ApiId = v.ApiId
  95. item.Charge = v.Charge
  96. item.ApiName = v.ApiName
  97. item.MerchantName = v.MerchantName
  98. item.Valid = v.Valid
  99. item.Reuse = v.Reuse
  100. item.Total = v.Total
  101. item.Success = v.Success
  102. item.SumElapsed = v.SumElapsed
  103. item.ProviderError = v.ProviderError
  104. item.PlatformError = v.PlatformError
  105. item.Query = v.Query
  106. item.MerchantId = v.MerchantId
  107. item.NonredundantQuery = v.NonredundantQuery
  108. item.Nonredundant = v.Nonredundant
  109. item.ChargeReuse = v.ChargeReuse
  110. }
  111. return ret
  112. }
  113. func computeApiReportCodeDistribution(apiReport []TGdReportHour, start, end int64) (ret []TGdApiCodeHour) {
  114. for _, v := range apiReport {
  115. mreq := apis.LogQueryInterfaceCountReq{}
  116. mreq.MerchantId = v.MerchantId
  117. mreq.ApiId = v.ApiId
  118. mreq.StartTimestamp = start
  119. mreq.EndTimestamp = end
  120. mreq.TabName = "t_gd_access_log_day"
  121. mreq.GroupByState = true
  122. mreply := apis.LogQueryInterfaceAnalyzeErrorReply{}
  123. err := LogQueryInterfaceAnalyzeError(context.Background(), &mreq, &mreply)
  124. if err != nil {
  125. l.Error("func",
  126. zap.String("call", "LogQueryInterfaceAnalyzeError"),
  127. zap.String("args", utils.MarshalJsonString(mreq)),
  128. zap.String("error", err.Error()))
  129. return nil
  130. }
  131. codeInfos := make([]TGdApiCodeHour, len(mreply.List))
  132. for i, c := range mreply.List {
  133. codeInfos[i].ApiId = v.ApiId
  134. codeInfos[i].MerchantId = v.MerchantId
  135. codeInfos[i].MerchantName = v.MerchantName
  136. codeInfos[i].ApiName = v.ApiName
  137. codeInfos[i].Code = c.Code
  138. codeInfos[i].Msg = c.Msg
  139. codeInfos[i].Count = int64(c.Count)
  140. codeInfos[i].Hour = v.Hour
  141. codeInfos[i].State = c.State
  142. }
  143. ret = append(ret, codeInfos...)
  144. }
  145. return ret
  146. }
  147. func computeProviderReportNormal(start, end int64) (ret []TGdProviderReportHour) {
  148. mreq := apis.LogQueryProviderCountExportReq{}
  149. mreply := apis.LogQueryProviderCountExportReply{}
  150. mreq.StartTimestamp = start
  151. mreq.EndTimestamp = end
  152. mreq.GroupByMerchantApi = true
  153. mreq.TabName = "t_gd_thirdpart_log_day"
  154. err := LogQueryProviderCountExport(context.Background(), &mreq, &mreply)
  155. if err != nil {
  156. l.Error("func",
  157. zap.String("call", "LogQueryProviderCountExport"),
  158. zap.String("args", utils.MarshalJsonString(mreq)),
  159. zap.String("error", err.Error()))
  160. return nil
  161. }
  162. ret = make([]TGdProviderReportHour, len(mreply.LogQueryProviderCount))
  163. for i, v := range mreply.LogQueryProviderCount {
  164. item := &ret[i]
  165. item.Hour = start
  166. item.Date = time.Unix(start, 0).Format("2006-01-02")
  167. item.ProviderApiId = v.ProviderApiId
  168. item.Charge = v.Charge
  169. item.ProviderApiName = v.ProviderApiName
  170. item.ProviderName = v.ProviderName
  171. item.Total = v.Total
  172. item.Query = v.Query
  173. item.Success = v.Success
  174. item.SumElapsed = v.SumElapsed
  175. item.Failed = v.Failed
  176. item.MerchantId = v.MerchantId
  177. item.ApiId = v.ApiId
  178. }
  179. return ret
  180. }
  181. func computeProviderReportCodeDistribution(providerReport []TGdProviderReportHour, start, end int64) (ret []TGdProviderCodeHour) {
  182. m := map[int64]string{}
  183. for _, v := range providerReport {
  184. if _, ok := m[v.ProviderApiId]; ok == false {
  185. m[v.ProviderApiId] = "-"
  186. } else {
  187. continue
  188. }
  189. mreq := apis.ThirdPartyInterfaceErrorAnalyzeReq{}
  190. mreq.TabName = "t_gd_thirdpart_log_day"
  191. mreq.ApiId = v.ProviderApiId
  192. mreq.StartTimestamp = start
  193. mreq.EndTimestamp = end
  194. mreq.GroupByState = true
  195. mreply := apis.ThirdPartyInterfaceErrorAnalyzeReply{}
  196. err := ThirdPartyInterfaceErrorAnalyze(context.Background(), &mreq, &mreply)
  197. if err != nil {
  198. l.Error("func",
  199. zap.String("call", "ThirdPartyInterfaceErrorAnalyze"),
  200. zap.String("args", utils.MarshalJsonString(mreq)),
  201. zap.String("error", err.Error()))
  202. return nil
  203. }
  204. codeInfos := make([]TGdProviderCodeHour, len(mreply.List))
  205. for i, c := range mreply.List {
  206. codeInfos[i].ProviderApiId = v.ProviderApiId
  207. codeInfos[i].ProviderApiName = v.ProviderApiName
  208. codeInfos[i].ProviderName = v.ProviderName
  209. codeInfos[i].Code = c.Code
  210. codeInfos[i].Msg = c.Msg
  211. codeInfos[i].Count = int64(c.Count)
  212. codeInfos[i].Hour = v.Hour
  213. codeInfos[i].State = c.State
  214. }
  215. ret = append(ret, codeInfos...)
  216. }
  217. return ret
  218. }
  219. func insertApiReport(apiReport []TGdReportHour, codeInfos []TGdApiCodeHour) {
  220. if len(apiReport) > 0 {
  221. hourDb.Raw("delete from t_gd_report_hour where hour=?", apiReport[0].Hour).Exec()
  222. hourDb.InsertMulti(len(apiReport), &apiReport)
  223. }
  224. if len(codeInfos) > 0 {
  225. hourDb.Raw("delete from t_gd_api_code_hour where hour=?", codeInfos[0].Hour).Exec()
  226. hourDb.InsertMulti(len(codeInfos), &codeInfos)
  227. }
  228. }
  229. func insertProviderReport(providerReport []TGdProviderReportHour, codeInfos []TGdProviderCodeHour) {
  230. if len(providerReport) > 0 {
  231. newReport := []TGdProviderReportHour{}
  232. for i, _ := range providerReport {
  233. newReport = append(newReport, providerReport[i])
  234. }
  235. if len(newReport) > 0 {
  236. hourDb.Raw("delete from t_gd_provider_report_hour where hour=?", newReport[0].Hour).Exec()
  237. hourDb.InsertMulti(len(newReport), &newReport)
  238. }
  239. }
  240. if len(codeInfos) > 0 {
  241. hourDb.Raw("delete from t_gd_provider_code_hour where hour=?", codeInfos[0].Hour).Exec()
  242. hourDb.InsertMulti(len(codeInfos), &codeInfos)
  243. }
  244. }
  245. func insertApiReportLocal(apiReport []TGdReportHour, codeInfos []TGdApiCodeHour, db orm.Ormer) {
  246. if len(apiReport) > 0 {
  247. db.InsertMulti(len(apiReport), &apiReport)
  248. }
  249. if len(codeInfos) > 0 {
  250. db.Raw("delete from t_gd_api_code_hour where hour=?", codeInfos[0].Hour).Exec()
  251. db.InsertMulti(len(codeInfos), &codeInfos)
  252. }
  253. }
  254. func insertProviderReportLocal(providerReport []TGdProviderReportHour, codeInfos []TGdProviderCodeHour, db orm.Ormer) {
  255. if len(providerReport) > 0 {
  256. newReport := []TGdProviderReportHour{}
  257. for i, v := range providerReport {
  258. if (strings.Contains(v.ProviderName, "觅实") || strings.Contains(v.ProviderName, "A02-MS")) && strings.Contains(v.ProviderApiName, "MSN0") == false && strings.Contains(v.ProviderApiName, "MSM00001") == false && strings.Contains(v.ProviderApiName, "觅实N0") == false {
  259. continue
  260. }
  261. newReport = append(newReport, providerReport[i])
  262. }
  263. if len(newReport) > 0 {
  264. db.InsertMulti(len(newReport), &newReport)
  265. }
  266. }
  267. if len(codeInfos) > 0 {
  268. db.Raw("delete from t_gd_provider_code_hour where hour=?", codeInfos[0].Hour).Exec()
  269. db.InsertMulti(len(codeInfos), &codeInfos)
  270. }
  271. }
  272. func computeApiReportHour(start, end int64) {
  273. apiReport := computeApiReportNormal(start, end)
  274. codeInfos := computeApiReportCodeDistribution(apiReport, start, end)
  275. insertApiReport(apiReport, codeInfos)
  276. }
  277. func computeProviderReportHour(start, end int64) {
  278. providerReport := computeProviderReportNormal(start, end)
  279. codeInfos := computeProviderReportCodeDistribution(providerReport, start, end)
  280. insertProviderReport(providerReport, codeInfos)
  281. }
  282. func apiReportHour(hour int64) {
  283. end := hour
  284. for {
  285. if end-hour > 12*3600 {
  286. break
  287. }
  288. if hourDb.QueryTable("t_gd_report_hour").Filter("hour", hour).Exist() == false || (end-hour <= 2*3600) {
  289. computeApiReportHour(hour, hour+3600)
  290. }
  291. hour = hour - 3600
  292. }
  293. }
  294. func GetLocalDb(user, pass, addr string) orm.Ormer {
  295. if addr != "" {
  296. dataSource := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=%s", user, pass, addr, "db_gd_access_log", "utf8")
  297. if err := orm.RegisterDataBase("local", "mysql", dataSource, 2, 2); err != nil {
  298. fmt.Printf("%v\n", err.Error())
  299. return nil
  300. }
  301. }
  302. db = orm.NewOrm()
  303. db.Using("local")
  304. return db
  305. }
  306. func GernerateHourReport(start, end int64, isApi, isProvider bool, db orm.Ormer) {
  307. for i := start; i < end; i += 3600 {
  308. hourDb = orm.NewOrm()
  309. if isApi {
  310. apiReport := computeApiReportNormal(start, end)
  311. codeInfos := computeApiReportCodeDistribution(apiReport, start, end)
  312. insertApiReportLocal(apiReport, codeInfos, db)
  313. }
  314. if isProvider {
  315. providerReport := computeProviderReportNormal(start, end)
  316. codeInfos := computeProviderReportCodeDistribution(providerReport, start, end)
  317. insertProviderReportLocal(providerReport, codeInfos, db)
  318. }
  319. }
  320. }
  321. func providerReportHour(hour int64) {
  322. end := hour
  323. for {
  324. if end-hour > 12*3600 {
  325. break
  326. }
  327. if hourDb.QueryTable("t_gd_provider_report_hour").Filter("hour", hour).Exist() == false || (end-hour <= 2*3600) {
  328. computeProviderReportHour(hour, hour+3600)
  329. }
  330. hour = hour - 3600
  331. }
  332. }
  333. var hourDb orm.Ormer
  334. func LogHourTask() {
  335. go func() {
  336. hourDb = orm.NewOrm()
  337. now := time.Now()
  338. hour := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()).Unix() - 3600
  339. hourDb.Begin()
  340. apiReportHour(hour)
  341. providerReportHour(hour)
  342. hourDb.Commit()
  343. for {
  344. now := time.Now()
  345. next := now.Add(time.Hour)
  346. next = time.Date(next.Year(), next.Month(), next.Day(), next.Hour(), 2, 0, 0, next.Location())
  347. t := time.NewTimer(next.Sub(now))
  348. <-t.C
  349. t.Stop()
  350. hour := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()).Unix()
  351. hourDb.Begin()
  352. apiReportHour(hour)
  353. providerReportHour(hour)
  354. hourDb.Commit()
  355. runtime.GC()
  356. }
  357. }()
  358. }