package crontab import ( "context" "gd_crontab/apis" "gd_crontab/errors" "gd_crontab/rpc_apis" "gd_crontab/rpc_apis/gd_management" "fmt" "strconv" "strings" "time" "github.com/astaxie/beego/orm" ) func getMonthFromtimestamp(timestamp int64) int { return int(time.Unix(timestamp, 0).Month()) } func getUserCountWhereExport(req *apis.LogQueryUserAccessCountExportReq) (w string, s []interface{}) { if req.EndTimestamp != 0 && req.StartTimestamp != 0 { w += " and " + "timestamp >=" + " ? and timestamp < ?" s = append(s, req.StartTimestamp, req.EndTimestamp) } else { nowTime := time.Now() zeroTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), 0, 0, 0, 0, nowTime.Location()) startTimeStamp := zeroTime.Unix() endTimeStamp := nowTime.Unix() w += " and " + "timestamp >= " + " ? and timestamp < ?" s = append(s, startTimeStamp, endTimeStamp) } if len(req.ApiIdList) != 0 { w += " and api_id in (" + strings.Replace(strings.Trim(fmt.Sprint(req.ApiIdList), "[]"), " ", ",", -1) + ")" } if req.MerchantId != 0 { //strings.Replace(strings.Trim(fmt.Sprint(array_or_slice), "[]"), " ", ",", -1) w += " and " + "merchant_id" + " = ?" s = append(s, req.MerchantId) } if len(s) > 0 { //有条件才截取最前面的and w = string([]byte(w)[4:]) w = " where" + w + " and merchant_data_api_id > 0" return w, s } else { return " where merchant_data_api_id > 0", nil } } func getExportSql(req *apis.LogQueryUserAccessCountExportReq, where string) string { sqlStr := "select create_time as date, group_concat(distinct merchant_data_api_id) as merchant_data_api_id, api_name,count(state) as total ," + "sum(code=0 or code=1100 or code=1101 or code=1104 or code=1107) as valid" + ",sum(code=0 or code=1100 ) as success ,sum(code=0) as query," + "count(distinct case when code = 0 then search end) as nonredundant_query," + "count(distinct case when code = 0 or code=1100 or code=1101 or code=1104 or code=1107 then search end) as nonredundant," + "sum(is_reuse and code=0) as reuse, " + "avg(elapsed) as avg_elapsed, " + "sum(elapsed) as sum_elapsed, " + "sum(code=1000 or code=1001) as platform_error, " + "sum(code=1101) as provider_error,api_id" + "%s" + " from " + req.TabName + " as a %s group by create_time, api_id order by timestamp" if req.GroupByMerchant { sqlStr = "select create_time as date, merchant_name, api_id, merchant_id, group_concat(distinct merchant_data_api_id) as merchant_data_api_id, api_name,count(state) as total ," + "sum(code=0 or code=1100 or code=1101 or code=1104 or code=1107) as valid" + ",sum(code=0 or code=1100 ) as success ,sum(code=0) as query," + "count(distinct case when code = 0 then search end) as nonredundant_query," + "count(distinct case when code = 0 or code=1100 or code=1101 or code=1104 or code=1107 then search end) as nonredundant," + "sum(is_reuse and code=0) as reuse, " + "avg(elapsed) as avg_elapsed, " + "sum(elapsed) as sum_elapsed, " + "sum(code=1000 or code=1001) as platform_error, " + "sum(code=1101) as provider_error,api_id" + "%s" + " from " + req.TabName + " as a %s group by create_time, merchant_id, api_id order by timestamp" } concat := "" subSql := "" timeList := strings.Split(req.TimeList, ",") for _, v := range timeList { if v == "" { break } if subSql == "" { subSql = fmt.Sprintf("sum(elapsed > %s)", v) continue } subSql = fmt.Sprintf("%s,sum(elapsed > %s)", subSql, v) } if subSql != "" { concat = fmt.Sprintf(",concat_ws(',', %s) as big_elapseds", subSql) } sqlStr = fmt.Sprintf(sqlStr, concat, where) return sqlStr } func getUsedCount(o orm.Ormer, info *gd_management.UserMerchantCountCode, timeoutInfo map[string]int, start, end int64, tabname string) int { used := 0 if info.ComboType == 1 { start = 0 } else { start = end - 24*60*60 } timeout := 3600 * 24 key := fmt.Sprintf("%d%d", info.MerchantDataApiId, info.ApiId) if t, ok := timeoutInfo[key]; ok && t > 0 { timeout = t } sql := fmt.Sprintf("select count(1) from "+tabname+" where merchant_data_api_id = %d and api_id = %d and elapsed < %d and timestamp >= %d and timestamp < %d ", info.MerchantDataApiId, info.ApiId, timeout, start, end) if info.CountType == 1 { if info.CountCode == "" { return 0 } sql = fmt.Sprintf("select count(1) from "+tabname+" where merchant_data_api_id = %d and api_id = %d and elapsed < %d and timestamp >= %d and timestamp < %d and raw_code in(%s)", info.MerchantDataApiId, info.ApiId, timeout, start, end, info.CountCode) } o.Raw(sql).QueryRow(&used) return used } var lastTimeInfo string var countInfoCache = map[string]*apis.MerchantApiCountInfo{} func getApiCountInfo(countCode []gd_management.UserMerchantCountCode, timeoutInfo map[string]int, start, end int64, tabname string) map[string]*apis.MerchantApiCountInfo { if fmt.Sprintf("%d-%d", start, end) == lastTimeInfo && len(countInfoCache) > 0 { return countInfoCache } m := map[string]*apis.MerchantApiCountInfo{} for _, v := range countCode { key := fmt.Sprintf("%d", v.MerchantDataApiId) if _, ok := m[key]; ok == false { m[key] = &apis.MerchantApiCountInfo{ MerchantDataApiId: v.MerchantDataApiId, ComboType: v.ComboType, Count: v.Count, Used: v.DayUsed, Remain: v.Remain, } } } countInfoCache = m lastTimeInfo = fmt.Sprintf("%d-%d", start, end) return m } func getNotShow() (map[int64]bool, error) { req := gd_management.ManagementGetApiShowInfoReq{} reply, err := rpc_apis.Management.ManagementGetApiShowInfo(context.Background(), &req) if err != nil { return nil, err } ret := map[int64]bool{} for _, v := range reply.Infos { if v.IsShow == false { ret[v.ApiId] = true } } return ret, nil } func getCountCodeInfoNew(req *apis.LogQueryUserAccessCountExportReq, timeoutInfo map[string]int) (map[string]gd_management.UserMerchantCountCode, []gd_management.UserMerchantCountCode, error) { mReq := gd_management.MangementGetUserMerchantCountCodeReq{} mreply, err := rpc_apis.Management.MangementGetUserMerchantCountCode(context.Background(), &mReq) if err != nil { return nil, nil, err } countMap := make(map[string]gd_management.UserMerchantCountCode, len(mreply.UserMerchantCountCode)) for _, v := range mreply.UserMerchantCountCode { countMap[strconv.Itoa(int(v.MerchantDataApiId))+strconv.Itoa(int(v.ApiId))] = v } return countMap, mreply.UserMerchantCountCode, nil } func getTimeoutInfo() (map[string]int, error) { req := gd_management.ManagementGetMerchantApiTimeoutReq{} reply, err := rpc_apis.Management.ManagementGetMerchantApiTimeout(context.Background(), &req) if err != nil { return nil, err } ret := map[string]int{} for _, v := range reply.Infos { ret[fmt.Sprintf("%d%d", v.MerchantDataApiId, v.ApiId)] = v.Timeout } return ret, nil } func getStartEndTimestamp(reqStart, reqEnd int64, date string) (int64, int64) { loc, _ := time.LoadLocation("Local") t, err := time.ParseInLocation("2006-01-02", date, loc) if err != nil { return reqStart, reqEnd } if date == "" { return reqStart, reqEnd } dayStart := t.Unix() dayEnd := t.AddDate(0, 0, 1).Unix() if reqStart < dayStart || reqStart >= dayEnd { reqStart = dayStart } if reqEnd > dayEnd || reqEnd <= dayStart { reqEnd = dayEnd } return reqStart, reqEnd } func computeCharge(req *apis.LogQueryUserAccessCountExportReq, timeoutInfo map[string]int, info *apis.LogQueryUserAcessCountExport, countInfo map[string]*apis.MerchantApiCountInfo, countMap map[string]gd_management.UserMerchantCountCode) { // 用于统计计费中超时的数量 subSql := "" concat := "" timeList := strings.Split(req.TimeList, ",") for _, v := range timeList { if v == "" { break } if subSql == "" { subSql = fmt.Sprintf("sum(elapsed > %s)", v) continue } subSql = fmt.Sprintf("%s,sum(elapsed > %s)", subSql, v) } if subSql != "" { concat = fmt.Sprintf(", concat_ws(',', %s) as big_elapseds", subSql) } sqlStr := "" mdids := strings.Split(info.MerchantDataApiId, ",") for _, mdid := range mdids { if apiCountInfo, ok := countInfo[mdid]; ok { info.CountInfos = append(info.CountInfos, *apiCountInfo) } } info.ChargeBigElapsedList = make([]int, len(timeList)) for _, mdid := range mdids { bigelapsed := "" v, ok := countMap[mdid+fmt.Sprintf("%d", info.ApiId)] if ok == false { continue } count := int64(0) chargeReuseCount := int64(0) timeout := 3600 * 24 if t, ok := timeoutInfo[mdid+fmt.Sprintf("%d", info.ApiId)]; ok && t > 0 { timeout = t } switch { case v.CountType == 0 && timeout == 3600*24 && len(mdids) == 1: chargeReuseCount = info.Reuse count = info.Total bigelapsed = info.BigElapseds case v.CountType == 0 && (timeout != 3600*24 || len(mdids) > 1): sqlStr = "select count(id), sum(is_reuse)" + concat + " from " + req.TabName + " where merchant_data_api_id = %s and api_id = %d and elapsed < %d and timestamp >= ? and timestamp < ?" sqlStr = fmt.Sprintf(sqlStr, mdid, info.ApiId, timeout) start, end := getStartEndTimestamp(req.StartTimestamp, req.EndTimestamp, info.Date) if concat == "" { orm.NewOrm().Raw(sqlStr, start, end).QueryRow(&count, &chargeReuseCount) } else { orm.NewOrm().Raw(sqlStr, start, end).QueryRow(&count, &chargeReuseCount, &bigelapsed) } default: if v.CountCode == "" { count = 0 chargeReuseCount = 0 for i := 0; i < len(timeList); i++ { if bigelapsed == "" { bigelapsed = "0" } else { bigelapsed = fmt.Sprintf("%s,0", bigelapsed) } } } else { if strings.Contains(v.CountCode, "1100") { v.CountCode = fmt.Sprintf("%s,1105,1106,1107,1108,1109,1110", v.CountCode) } start, end := getStartEndTimestamp(req.StartTimestamp, req.EndTimestamp, info.Date) sqlStr = "select count(id), sum(is_reuse)" + concat + " from " + req.TabName + " where merchant_data_api_id = %s and api_id = %d and raw_code in (%s) and timestamp >= ? and timestamp < ? and elapsed < %d" sqlStr = fmt.Sprintf(sqlStr, mdid, info.ApiId, v.CountCode, timeout) if concat == "" { orm.NewOrm().Raw(sqlStr, start, end).QueryRow(&count, &chargeReuseCount) } else { orm.NewOrm().Raw(sqlStr, start, end).QueryRow(&count, &chargeReuseCount, &bigelapsed) } } } info.Charge += count info.ChargeReuse += chargeReuseCount if bigelapsed != "" { array := strings.Split(bigelapsed, ",") for i, _ := range array { vint, _ := strconv.Atoi(array[i]) info.ChargeBigElapsedList[i] += vint } } } info.ChargeRate = strconv.FormatFloat(float64(100*info.Charge)/float64(info.Total), 'f', 2, 64) + "%" if info.Valid == 0 { info.ChargeReuseRate = "0%" } else { info.ChargeReuseRate = strconv.FormatFloat(float64(100*info.ChargeReuse)/float64(info.Valid), 'f', 2, 64) + "%" } } func computeTimeout(info *apis.LogQueryUserAcessCountExport, timeoutInfo map[string]int, tabname string) { mdids := strings.Split(info.MerchantDataApiId, ",") for _, mdid := range mdids { timeout := 3600 * 24 originTimeoutInfo := 0 if t, ok := timeoutInfo[mdid+fmt.Sprintf("%d", info.ApiId)]; ok { originTimeoutInfo = t if t > 0 { timeout = t } } info.TimeoutInfo = fmt.Sprintf("%s,%d", info.TimeoutInfo, originTimeoutInfo) if len(mdids) == 1 && timeout == 3600*24 { continue } count := 0 sqlStr := "select count(id) from " + tabname + " where merchant_data_api_id = %s and api_id = %d and elapsed >= %d and create_time = ?" sqlStr = fmt.Sprintf(sqlStr, mdid, info.ApiId, timeout) orm.NewOrm().Raw(sqlStr, info.Date).QueryRow(&count) info.TimeoutCount += count } info.TimeoutInfo = strings.TrimPrefix(info.TimeoutInfo, ",") } func setSearchTimestamp(req *apis.LogQueryUserAccessCountExportReq) { if req.EndTimestamp == 0 || req.StartTimestamp == 0 { nowTime := time.Now() zeroTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), 0, 0, 0, 0, nowTime.Location()) req.StartTimestamp = zeroTime.Unix() req.EndTimestamp = nowTime.Unix() } } func computeBigelapse(info *apis.LogQueryUserAcessCountExport) { if info.BigElapseds != "" { array := strings.Split(info.BigElapseds, ",") info.BigElapsedList = make([]int, len(array)) for i, _ := range array { vint, _ := strconv.Atoi(array[i]) info.BigElapsedList[i] = vint } } } func computePercent(info *apis.LogQueryUserAcessCountExport) { if info.Valid != 0 { info.QueryRate = strconv.FormatFloat(float64(100*info.Query)/float64(info.Valid), 'f', 2, 64) + "%" info.NonredundantQueryRate = strconv.FormatFloat(float64(100*info.NonredundantQuery)/float64(info.Nonredundant), 'f', 2, 64) + "%" info.SuccessRate = strconv.FormatFloat(float64(100*info.Success)/float64(info.Valid), 'f', 2, 64) + "%" info.ReuseRate = strconv.FormatFloat(float64(100*info.Reuse)/float64(info.Valid), 'f', 2, 64) + "%" info.PlatformErrorRate = strconv.FormatFloat(float64(100*info.PlatformError)/float64(info.Valid), 'f', 2, 64) + "%" info.ProviderErrorRate = strconv.FormatFloat(float64(100*info.ProviderError)/float64(info.Valid), 'f', 2, 64) + "%" } else { info.QueryRate = "0%" info.SuccessRate = "0%" info.ReuseRate = "0%" } if info.Nonredundant == 0 { info.NonredundantQueryRate = "0%" } else { info.NonredundantQueryRate = strconv.FormatFloat(float64(100*info.NonredundantQuery)/float64(info.Nonredundant), 'f', 2, 64) + "%" } if info.Total != 0 { info.ValidRate = strconv.FormatFloat(float64(100*info.Valid)/float64(info.Total), 'f', 2, 64) + "%" } else { info.ValidRate = "0%" } } func getMonthTab(prefix string, timestamp int64) string { month := getMonthFromtimestamp(timestamp) if month > 12 || month < 1 { return "" } return fmt.Sprintf("%s%d", prefix, month) } type exportTime struct { start int64 end int64 month int } func nextMonthDay(t time.Time) time.Time { month := t.Month() for { t = t.AddDate(0, 0, 1) if t.Month() != month { return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()) } } return t } func getExportTimes(startTimestamp, endTimestamp int64) []exportTime { if time.Unix(startTimestamp, 0).Month() == time.Unix(endTimestamp, 0).Month() { return []exportTime{{ start: startTimestamp, end: endTimestamp, month: int(time.Unix(startTimestamp, 0).Month()), }} } ret := []exportTime{} start := startTimestamp for { end := nextMonthDay(time.Unix(start, 0)) if end.Unix() >= endTimestamp { item := exportTime{ start: start, end: endTimestamp, month: int(time.Unix(start, 0).Month()), } ret = append(ret, item) break } item := exportTime{ start: start, end: end.Unix(), month: int(time.Unix(start, 0).Month()), } ret = append(ret, item) start = end.Unix() } return ret } func logQueryUserAccessCountExport(req *apis.LogQueryUserAccessCountExportReq, reply *apis.LogQueryUserAccessCountExportReply, noShowMap map[int64]bool, timeoutInfo map[string]int, countMap map[string]gd_management.UserMerchantCountCode, countCodes []gd_management.UserMerchantCountCode) error { countInfo := map[string]*apis.MerchantApiCountInfo{} if req.NeedCount { countInfo = getApiCountInfo(countCodes, timeoutInfo, req.StartTimestamp, req.EndTimestamp, req.TabName) } // 查询 where, val := getUserCountWhereExport(req) sqlStr := getExportSql(req, where) o := orm.NewOrm() _, err := o.Raw(sqlStr, val).QueryRows(&reply.LogQueryUserAcessCount) if err != nil && err != orm.ErrNoRows { return errors.DataBaseError } // 计算指标 for index, info := range reply.LogQueryUserAcessCount { computeCharge(req, timeoutInfo, &reply.LogQueryUserAcessCount[index], countInfo, countMap) computeTimeout(&reply.LogQueryUserAcessCount[index], timeoutInfo, req.TabName) computeBigelapse(&reply.LogQueryUserAcessCount[index]) computePercent(&reply.LogQueryUserAcessCount[index]) if noShowMap != nil { if _, ok := noShowMap[info.ApiId]; ok == true { continue reply.LogQueryUserAcessCount[index].NotShow = true } } } // clear countMap = nil noShowMap = nil countInfo = nil timeoutInfo = nil return nil } func LogQueryUserAccessCountExportNew(req *apis.LogQueryUserAccessCountExportReq, reply *apis.LogQueryUserAccessCountExportReply) error { setSearchTimestamp(req) showInfo, err := getNotShow() if err != nil { return err } timeoutInfo, err := getTimeoutInfo() if err != nil { return err } countCodeInfo, countCodes, err := getCountCodeInfoNew(req, timeoutInfo) if err != nil { return err } if req.TabName == "t_gd_access_log_day" { return logQueryUserAccessCountExport(req, reply, showInfo, timeoutInfo, countCodeInfo, countCodes) } exportTimes := getExportTimes(req.StartTimestamp, req.EndTimestamp) for _, v := range exportTimes { req.StartTimestamp = v.start req.EndTimestamp = v.end req.TabName = getMonthTab("t_gd_access_log_month", req.StartTimestamp) subReply := &apis.LogQueryUserAccessCountExportReply{} err := logQueryUserAccessCountExport(req, subReply, showInfo, timeoutInfo, countCodeInfo, countCodes) if err != nil { return err } reply.LogQueryUserAcessCount = append(reply.LogQueryUserAcessCount, subReply.LogQueryUserAcessCount...) } return nil } func LogQueryUserAccessCountExport(ctx context.Context, req *apis.LogQueryUserAccessCountExportReq, reply *apis.LogQueryUserAccessCountExportReply) error { return LogQueryUserAccessCountExportNew(req, reply) }