package utils import ( "context" "encoding/json" "fmt" "gd_gateway/apis" "gd_gateway/common.in/jsonrpc2" dutils "gd_gateway/common.in/utils" "gd_gateway/consts" "gd_gateway/errors" "gd_gateway/rpc_apis_v1" bcontext "github.com/astaxie/beego/context" "net/http" "os" "strings" "sync" "time" "github.com/syndtr/goleveldb/leveldb" "go.uber.org/zap" ) var discoveryType string func SetDiscoveryType(mode string) { discoveryType = mode } func getLevelDbDir() string { dir := "" if discoveryType == "k8s" { hostname, _ := os.Hostname() dir = fmt.Sprintf("/mnt/%s_leveldb_log", hostname) } else { dir = "/var/log/gd_access_local_leveldb.log" } fmt.Println("leveldb dir:", dir) return dir } func AddProviderCount(providerApiId int64, merchantChildApiId int64, state bool, rawCode string) { req := apis.ManagementThirdpartCountIncreaseReq{ ProviderApiId: providerApiId, MerchantChildApiId: merchantChildApiId, State: state, RawCode: rawCode, } rpc_apis_v1.AuthCheck.ManagementThirdpartCountIncrease(context.Background(), &req) } var Ldb *leveldb.DB var Lmu sync.Mutex var LdbBak *leveldb.DB var LmuBak sync.Mutex func getLevelDbKey(db *leveldb.DB, timestamp int64, apiid int64) string { for { key := fmt.Sprintf("%d-%d-%s", time.Now().UnixNano(), timestamp, apiid) bytes, _ := db.Get([]byte(key), nil) if len(bytes) > 0 { time.Sleep(200 * time.Millisecond) continue } return key } return "" } func DeleteAccessLogFromLevelDbBak(key string) { if LdbBak == nil { return } if key == "" { return } LdbBak.Delete([]byte(key), nil) } func WriteAccessLogToLevelDbBak(accesslog *apis.LogAddAccessLogReq) string { LmuBak.Lock() defer LmuBak.Unlock() bytes, _ := json.Marshal(accesslog) if LdbBak == nil { var err error LdbBak, err = leveldb.OpenFile("/var/log/gd_access_local_leveldb_bak.log", nil) if err != nil { l.Error("open leveldb", zap.String("call", "OpenFile"), zap.String("args", ""), zap.String("error", err.Error())) return "" } } key := getLevelDbKey(LdbBak, accesslog.TimeStamp, accesslog.ApiId) LdbBak.Put([]byte(key), bytes, nil) return key } func WriteAccessLogToLevelDb(accesslog *apis.LogAddAccessLogReq) { Lmu.Lock() defer Lmu.Unlock() bytes, _ := json.Marshal(accesslog) if Ldb == nil { var err error Ldb, err = leveldb.OpenFile(getLevelDbDir(), nil) if err != nil { l.Error("open leveldb", zap.String("call", "OpenFile"), zap.String("args", ""), zap.String("error", err.Error())) return } } key := getLevelDbKey(Ldb, accesslog.TimeStamp, accesslog.ApiId) Ldb.Put([]byte(key), bytes, nil) } func readLevelDb(db *leveldb.DB) { iter := db.NewIterator(nil, nil) for iter.Next() { bytes := iter.Value() accessLog := apis.LogAddAccessLogReq{} json.Unmarshal(bytes, &accessLog) _, err := rpc_apis_v1.AccessLog.LogAddAccessLog(context.Background(), &accessLog) if err == nil { db.Delete(iter.Key(), nil) } } } func LoopLevelDb() { if Ldb == nil { var err error Ldb, err = leveldb.OpenFile(getLevelDbDir(), nil) if err != nil { l.Error("open leveldb", zap.String("call", "OpenFile"), zap.String("args", ""), zap.String("error", err.Error())) fmt.Printf("open leveldb failed\n") os.Exit(-1) } } t := time.NewTicker(5 * 60 * time.Second) for { select { case <-t.C: readLevelDb(Ldb) default: time.Sleep(2 * time.Second) } } } func WriteAccessLog(ctx context.Context, accessLog apis.LogAddAccessLogReq, startTime uint64, merchantChildApiId int64, Err error) { go func() { endTime := uint64(time.Now().UnixNano()) accessLog.AccessLogWrite.Elapsed = float64(endTime-startTime) / 1000000 accessLog.TimeStamp = int64(startTime / 1e9) accessLog.AccessLogWrite.Msg = "成功" accessLog.MerchantChildApiId = merchantChildApiId if accessLog.ReplaceInfo.ReplaceMerchantDataApiId > 0 { accessLog.MerchantId = accessLog.ReplaceInfo.ReplaceMerchantId accessLog.ApiId = accessLog.ReplaceInfo.ReplaceBaseApiId accessLog.MerchantDataApiId = accessLog.ReplaceInfo.ReplaceMerchantDataApiId accessLog.MerchantChildApiId = accessLog.ReplaceInfo.ReplaceMerchantChildApiId merchantChildApiId = accessLog.ReplaceInfo.ReplaceMerchantChildApiId } if Err != nil { accessLog.AccessLogWrite.State = false var e jsonrpc2.Error err := json.Unmarshal([]byte(Err.Error()), &e) if err != nil { accessLog.IsReuse = true accessLog.AccessLogWrite.Code = 1100 accessLog.AccessLogWrite.Msg = Err.Error() } else { accessLog.AccessLogWrite.Code = e.Code if e.Code != 0 { accessLog.AccessLogWrite.Msg = e.Message } } if accessLog.RespCode == 0 && accessLog.AccessLogWrite.Code != 0 { accessLog.RespCode = accessLog.AccessLogWrite.Code } } else { accessLog.AccessLogWrite.State = true } //key := WriteAccessLogToLevelDbBak(&accessLog) _, err := rpc_apis_v1.AccessLog.LogAddAccessLog(ctx, &accessLog) if err != nil { l.Error("rpc", zap.String("call", "LogAddAccessLog"), zap.String("args", dutils.MarshalJsonString(accessLog)), zap.String("error", err.Error())) WriteAccessLogToLevelDb(&accessLog) } else { //DeleteAccessLogFromLevelDbBak(key) } if merchantChildApiId != 0 { for i, _ := range accessLog.ThirdpartLogWrites { rawCode := accessLog.ThirdpartLogWrites[i].RawCode if rawCode == "" { rawCode = fmt.Sprintf("%d", accessLog.ThirdpartLogWrites[i].Code) } AddProviderCount(accessLog.ThirdpartLogWrites[i].ProviderApiId, merchantChildApiId, accessLog.ThirdpartLogWrites[i].State, rawCode) } } }() } func WriteInvalidAccessLog(ctx context.Context, remoteAddr, requestParams string, merchantId, apiId, startTime int64, err error) { go func() { accessLog := apis.LogAddInvalidAccessLogReq{} accessLog.RequestParams = requestParams accessLog.RemoteAddr = remoteAddr accessLog.ApiId = apiId accessLog.MerchantId = merchantId endTime := time.Now().UnixNano() accessLog.Elapsed = float64(endTime-startTime) / 1000000 accessLog.TimeStamp = time.Now().Unix() var e jsonrpc2.Error err := json.Unmarshal([]byte(err.Error()), &e) if err != nil { accessLog.Code = 10002 accessLog.Msg = "内部服务错误" } else { accessLog.Code = e.Code accessLog.Msg = e.Message } _, err = rpc_apis_v1.AccessLog.LogAddInvalidAccessLog(ctx, &accessLog) if err != nil { l.Error("rpc", zap.String("call", "LogAddInvalidAccessLog"), zap.String("args", dutils.MarshalJsonString(accessLog)), zap.String("error", err.Error())) } }() } func FillMerchantApiInfo(merchantApiInfo *apis.MerchantApiInfo, merchantApiResult *apis.CheckMerchantApiResult) { merchantApiInfo.BaseApiId = merchantApiResult.BaseApiId merchantApiInfo.MerchantId = merchantApiResult.MerchantId merchantApiInfo.IsForceUpdate = merchantApiResult.IsForceUpdate merchantApiInfo.MerchantChildApiId = merchantApiResult.MerchantChildApiId merchantApiInfo.ReuseTime = merchantApiResult.ReuseTime merchantApiInfo.RandomPercentage = merchantApiResult.RandomPercentage merchantApiInfo.ApiTimeout = merchantApiResult.Timeout merchantApiInfo.ResponseParamConf = merchantApiResult.ResponseParamConf } func NewAccessLogNew(rawRequestParams string, isReuse bool, LReq []apis.ThirdpartLogWrite, RemoteAddr string, merchantApiResult *apis.CheckMerchantApiResult) apis.LogAddAccessLogReq { accessLog := apis.LogAddAccessLogReq{ MerchantId: merchantApiResult.MerchantId, ApiId: merchantApiResult.BaseApiId, MerchantDataApiId: merchantApiResult.MerchantDataApiId, RemoteAddr: RemoteAddr, ReplaceInfo: merchantApiResult.ReplaceInfo, AccessLogWrite: apis.AccessLogWrite{ IsReuse: isReuse, RequestParams: string(merchantApiResult.DecryptParam), RawRequestParams: rawRequestParams, //ResponseParams: resp.Data, 默认为空数据 }, } if LReq != nil { accessLog.ThirdpartLogWrites = LReq } return accessLog } func WriteAccessLogAndCountCode(ctx context.Context, accessLog apis.LogAddAccessLogReq, startTime uint64, merchantApiResult apis.CheckMerchantApiResult, err, respErr error) { if respErr != nil { var e jsonrpc2.Error err := json.Unmarshal([]byte(respErr.Error()), &e) if err != nil { accessLog.AccessLogWrite.RespCode = 1100 } else { accessLog.AccessLogWrite.RespCode = e.Code } } else { accessLog.AccessLogWrite.RespCode = 0 } // 写日志,记录原始日志 WriteAccessLog( ctx, accessLog, startTime, merchantApiResult.MerchantChildApiId, err) // 错误码计数,采用响应错误码计数 ManagementCheckCountCode(merchantApiResult, accessLog.AccessLogWrite.RespCode) // 释放token if merchantApiResult.Token.RouterToken != "" || merchantApiResult.Token.MerchantToken != "" { _, _ = rpc_apis_v1.AuthCheck.ReleaseToken(ctx, &merchantApiResult.Token) } } func IsApiTimeOut(ctx context.Context, accessLog apis.LogAddAccessLogReq, startTime uint64, merchantApiResult apis.CheckMerchantApiResult) error { // 检查接口是否超时,超时返回 if merchantApiResult.Timeout > 0 { endTime := uint64(time.Now().UnixNano()) if ((endTime - startTime) / 1e9) > merchantApiResult.Timeout { // 写日志并计数 WriteAccessLogAndCountCode(ctx, accessLog, startTime, merchantApiResult, errors.ApiTimeOut, errors.ApiTimeOut) return errors.ApiTimeOut } } if merchantApiResult.MinimalTimeConsuming > 0 { reqTime := ctx.Value(consts.RequestStar).(int64) useTime := int(time.Now().Unix() - reqTime) if useTime < merchantApiResult.MinimalTimeConsuming { time.Sleep(time.Duration(merchantApiResult.MinimalTimeConsuming-useTime) * time.Second) } } return nil } func HttpStateCodeReturn(ctx *bcontext.Context, merchantApiResult apis.CheckMerchantApiResult, respErr error) { fmt.Println(merchantApiResult.IsHttpCode, merchantApiResult.CountType, respErr, merchantApiResult.CountCode) if merchantApiResult.IsHttpCode == 1 && merchantApiResult.CountType == 1 { respCode := 0 if respErr != nil { var e jsonrpc2.Error err := json.Unmarshal([]byte(respErr.Error()), &e) if err != nil { respCode = 10002 } else { respCode = e.Code } } //fmt.Println("respcode 111111111111111:", respCode) code := fmt.Sprintf("%d", respCode) array := strings.Split(merchantApiResult.CountCode, ",") for _, v := range array { if v == code { return } } //fmt.Println("write 400 22222222222222222") ctx.ResponseWriter.WriteHeader(http.StatusBadRequest) //ctx.Output.Context.ResponseWriter.WriteHeader(http.StatusBadRequest) //ctx.Output.Status = http.StatusBadRequest //ctx.Input.Context.Request.Header.Set("gd-state", "400") } }