123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368 |
- package utils
- import (
- "context"
- "encoding/json"
- "fmt"
- "gd_gateway/apis"
- "gd_gateway/common.in/jsonrpc2"
- dutils "gd_gateway/common.in/utils"
- "gd_gateway/consts"
- "gd_gateway/errors"
- "gd_gateway/rpc_apis_v1"
- bcontext "github.com/astaxie/beego/context"
- "net/http"
- "os"
- "strings"
- "sync"
- "time"
- "github.com/syndtr/goleveldb/leveldb"
- "go.uber.org/zap"
- )
- var discoveryType string
- func SetDiscoveryType(mode string) {
- discoveryType = mode
- }
- func getLevelDbDir() string {
- dir := ""
- if discoveryType == "k8s" {
- hostname, _ := os.Hostname()
- dir = fmt.Sprintf("/mnt/%s_leveldb_log", hostname)
- } else {
- dir = "/var/log/gd_access_local_leveldb.log"
- }
- fmt.Println("leveldb dir:", dir)
- return dir
- }
- func AddProviderCount(providerApiId int64, merchantChildApiId int64, state bool, rawCode string) {
- req := apis.ManagementThirdpartCountIncreaseReq{
- ProviderApiId: providerApiId,
- MerchantChildApiId: merchantChildApiId,
- State: state,
- RawCode: rawCode,
- }
- rpc_apis_v1.AuthCheck.ManagementThirdpartCountIncrease(context.Background(), &req)
- }
- var Ldb *leveldb.DB
- var Lmu sync.Mutex
- var LdbBak *leveldb.DB
- var LmuBak sync.Mutex
- func getLevelDbKey(db *leveldb.DB, timestamp int64, apiid int64) string {
- for {
- key := fmt.Sprintf("%d-%d-%s", time.Now().UnixNano(), timestamp, apiid)
- bytes, _ := db.Get([]byte(key), nil)
- if len(bytes) > 0 {
- time.Sleep(200 * time.Millisecond)
- continue
- }
- return key
- }
- return ""
- }
- func DeleteAccessLogFromLevelDbBak(key string) {
- if LdbBak == nil {
- return
- }
- if key == "" {
- return
- }
- LdbBak.Delete([]byte(key), nil)
- }
- func WriteAccessLogToLevelDbBak(accesslog *apis.LogAddAccessLogReq) string {
- LmuBak.Lock()
- defer LmuBak.Unlock()
- bytes, _ := json.Marshal(accesslog)
- if LdbBak == nil {
- var err error
- LdbBak, err = leveldb.OpenFile("/var/log/gd_access_local_leveldb_bak.log", nil)
- if err != nil {
- l.Error("open leveldb",
- zap.String("call", "OpenFile"),
- zap.String("args", ""),
- zap.String("error", err.Error()))
- return ""
- }
- }
- key := getLevelDbKey(LdbBak, accesslog.TimeStamp, accesslog.ApiId)
- LdbBak.Put([]byte(key), bytes, nil)
- return key
- }
- func WriteAccessLogToLevelDb(accesslog *apis.LogAddAccessLogReq) {
- Lmu.Lock()
- defer Lmu.Unlock()
- bytes, _ := json.Marshal(accesslog)
- if Ldb == nil {
- var err error
- Ldb, err = leveldb.OpenFile(getLevelDbDir(), nil)
- if err != nil {
- l.Error("open leveldb",
- zap.String("call", "OpenFile"),
- zap.String("args", ""),
- zap.String("error", err.Error()))
- return
- }
- }
- key := getLevelDbKey(Ldb, accesslog.TimeStamp, accesslog.ApiId)
- Ldb.Put([]byte(key), bytes, nil)
- }
- func readLevelDb(db *leveldb.DB) {
- iter := db.NewIterator(nil, nil)
- for iter.Next() {
- bytes := iter.Value()
- accessLog := apis.LogAddAccessLogReq{}
- json.Unmarshal(bytes, &accessLog)
- _, err := rpc_apis_v1.AccessLog.LogAddAccessLog(context.Background(), &accessLog)
- if err == nil {
- db.Delete(iter.Key(), nil)
- }
- }
- }
- func LoopLevelDb() {
- if Ldb == nil {
- var err error
- Ldb, err = leveldb.OpenFile(getLevelDbDir(), nil)
- if err != nil {
- l.Error("open leveldb",
- zap.String("call", "OpenFile"),
- zap.String("args", ""),
- zap.String("error", err.Error()))
- fmt.Printf("open leveldb failed\n")
- os.Exit(-1)
- }
- }
- t := time.NewTicker(5 * 60 * time.Second)
- for {
- select {
- case <-t.C:
- readLevelDb(Ldb)
- default:
- time.Sleep(2 * time.Second)
- }
- }
- }
- func WriteAccessLog(ctx context.Context, accessLog apis.LogAddAccessLogReq, startTime uint64, merchantChildApiId int64, Err error) {
- go func() {
- endTime := uint64(time.Now().UnixNano())
- accessLog.AccessLogWrite.Elapsed = float64(endTime-startTime) / 1000000
- accessLog.TimeStamp = int64(startTime / 1e9)
- accessLog.AccessLogWrite.Msg = "成功"
- accessLog.MerchantChildApiId = merchantChildApiId
- if accessLog.ReplaceInfo.ReplaceMerchantDataApiId > 0 {
- accessLog.MerchantId = accessLog.ReplaceInfo.ReplaceMerchantId
- accessLog.ApiId = accessLog.ReplaceInfo.ReplaceBaseApiId
- accessLog.MerchantDataApiId = accessLog.ReplaceInfo.ReplaceMerchantDataApiId
- accessLog.MerchantChildApiId = accessLog.ReplaceInfo.ReplaceMerchantChildApiId
- merchantChildApiId = accessLog.ReplaceInfo.ReplaceMerchantChildApiId
- }
- if Err != nil {
- accessLog.AccessLogWrite.State = false
- var e jsonrpc2.Error
- err := json.Unmarshal([]byte(Err.Error()), &e)
- if err != nil {
- accessLog.IsReuse = true
- accessLog.AccessLogWrite.Code = 1100
- accessLog.AccessLogWrite.Msg = Err.Error()
- } else {
- accessLog.AccessLogWrite.Code = e.Code
- if e.Code != 0 {
- accessLog.AccessLogWrite.Msg = e.Message
- }
- }
- if accessLog.RespCode == 0 && accessLog.AccessLogWrite.Code != 0 {
- accessLog.RespCode = accessLog.AccessLogWrite.Code
- }
- } else {
- accessLog.AccessLogWrite.State = true
- }
- //key := WriteAccessLogToLevelDbBak(&accessLog)
- _, err := rpc_apis_v1.AccessLog.LogAddAccessLog(ctx, &accessLog)
- if err != nil {
- l.Error("rpc",
- zap.String("call", "LogAddAccessLog"),
- zap.String("args", dutils.MarshalJsonString(accessLog)),
- zap.String("error", err.Error()))
- WriteAccessLogToLevelDb(&accessLog)
- } else {
- //DeleteAccessLogFromLevelDbBak(key)
- }
- if merchantChildApiId != 0 {
- for i, _ := range accessLog.ThirdpartLogWrites {
- rawCode := accessLog.ThirdpartLogWrites[i].RawCode
- if rawCode == "" {
- rawCode = fmt.Sprintf("%d", accessLog.ThirdpartLogWrites[i].Code)
- }
- AddProviderCount(accessLog.ThirdpartLogWrites[i].ProviderApiId, merchantChildApiId, accessLog.ThirdpartLogWrites[i].State, rawCode)
- }
- }
- }()
- }
- func WriteInvalidAccessLog(ctx context.Context, remoteAddr, requestParams string, merchantId, apiId, startTime int64, err error) {
- go func() {
- accessLog := apis.LogAddInvalidAccessLogReq{}
- accessLog.RequestParams = requestParams
- accessLog.RemoteAddr = remoteAddr
- accessLog.ApiId = apiId
- accessLog.MerchantId = merchantId
- endTime := time.Now().UnixNano()
- accessLog.Elapsed = float64(endTime-startTime) / 1000000
- accessLog.TimeStamp = time.Now().Unix()
- var e jsonrpc2.Error
- err := json.Unmarshal([]byte(err.Error()), &e)
- if err != nil {
- accessLog.Code = 10002
- accessLog.Msg = "内部服务错误"
- } else {
- accessLog.Code = e.Code
- accessLog.Msg = e.Message
- }
- _, err = rpc_apis_v1.AccessLog.LogAddInvalidAccessLog(ctx, &accessLog)
- if err != nil {
- l.Error("rpc",
- zap.String("call", "LogAddInvalidAccessLog"),
- zap.String("args", dutils.MarshalJsonString(accessLog)),
- zap.String("error", err.Error()))
- }
- }()
- }
- func FillMerchantApiInfo(merchantApiInfo *apis.MerchantApiInfo, merchantApiResult *apis.CheckMerchantApiResult) {
- merchantApiInfo.BaseApiId = merchantApiResult.BaseApiId
- merchantApiInfo.MerchantId = merchantApiResult.MerchantId
- merchantApiInfo.IsForceUpdate = merchantApiResult.IsForceUpdate
- merchantApiInfo.MerchantChildApiId = merchantApiResult.MerchantChildApiId
- merchantApiInfo.ReuseTime = merchantApiResult.ReuseTime
- merchantApiInfo.RandomPercentage = merchantApiResult.RandomPercentage
- merchantApiInfo.ApiTimeout = merchantApiResult.Timeout
- merchantApiInfo.ResponseParamConf = merchantApiResult.ResponseParamConf
- }
- func NewAccessLogNew(rawRequestParams string, isReuse bool, LReq []apis.ThirdpartLogWrite, RemoteAddr string, merchantApiResult *apis.CheckMerchantApiResult) apis.LogAddAccessLogReq {
- accessLog := apis.LogAddAccessLogReq{
- MerchantId: merchantApiResult.MerchantId,
- ApiId: merchantApiResult.BaseApiId,
- MerchantDataApiId: merchantApiResult.MerchantDataApiId,
- RemoteAddr: RemoteAddr,
- ReplaceInfo: merchantApiResult.ReplaceInfo,
- AccessLogWrite: apis.AccessLogWrite{
- IsReuse: isReuse,
- RequestParams: string(merchantApiResult.DecryptParam),
- RawRequestParams: rawRequestParams,
- //ResponseParams: resp.Data, 默认为空数据
- },
- }
- if LReq != nil {
- accessLog.ThirdpartLogWrites = LReq
- }
- return accessLog
- }
- func WriteAccessLogAndCountCode(ctx context.Context, accessLog apis.LogAddAccessLogReq, startTime uint64, merchantApiResult apis.CheckMerchantApiResult, err, respErr error) {
- if respErr != nil {
- var e jsonrpc2.Error
- err := json.Unmarshal([]byte(respErr.Error()), &e)
- if err != nil {
- accessLog.AccessLogWrite.RespCode = 1100
- } else {
- accessLog.AccessLogWrite.RespCode = e.Code
- }
- } else {
- accessLog.AccessLogWrite.RespCode = 0
- }
- // 写日志,记录原始日志
- WriteAccessLog(
- ctx,
- accessLog,
- startTime,
- merchantApiResult.MerchantChildApiId,
- err)
- // 错误码计数,采用响应错误码计数
- ManagementCheckCountCode(merchantApiResult, accessLog.AccessLogWrite.RespCode)
- // 释放token
- if merchantApiResult.Token.RouterToken != "" || merchantApiResult.Token.MerchantToken != "" {
- _, _ = rpc_apis_v1.AuthCheck.ReleaseToken(ctx, &merchantApiResult.Token)
- }
- }
- func IsApiTimeOut(ctx context.Context, accessLog apis.LogAddAccessLogReq, startTime uint64, merchantApiResult apis.CheckMerchantApiResult) error {
- // 检查接口是否超时,超时返回
- if merchantApiResult.Timeout > 0 {
- endTime := uint64(time.Now().UnixNano())
- if ((endTime - startTime) / 1e9) > merchantApiResult.Timeout {
- // 写日志并计数
- WriteAccessLogAndCountCode(ctx,
- accessLog,
- startTime,
- merchantApiResult,
- errors.ApiTimeOut, errors.ApiTimeOut)
- return errors.ApiTimeOut
- }
- }
- if merchantApiResult.MinimalTimeConsuming > 0 {
- reqTime := ctx.Value(consts.RequestStar).(int64)
- useTime := int(time.Now().Unix() - reqTime)
- if useTime < merchantApiResult.MinimalTimeConsuming {
- time.Sleep(time.Duration(merchantApiResult.MinimalTimeConsuming-useTime) * time.Second)
- }
- }
- return nil
- }
- func HttpStateCodeReturn(ctx *bcontext.Context, merchantApiResult apis.CheckMerchantApiResult, respErr error) {
- fmt.Println(merchantApiResult.IsHttpCode, merchantApiResult.CountType, respErr, merchantApiResult.CountCode)
- if merchantApiResult.IsHttpCode == 1 && merchantApiResult.CountType == 1 {
- respCode := 0
- if respErr != nil {
- var e jsonrpc2.Error
- err := json.Unmarshal([]byte(respErr.Error()), &e)
- if err != nil {
- respCode = 10002
- } else {
- respCode = e.Code
- }
- }
- //fmt.Println("respcode 111111111111111:", respCode)
- code := fmt.Sprintf("%d", respCode)
- array := strings.Split(merchantApiResult.CountCode, ",")
- for _, v := range array {
- if v == code {
- return
- }
- }
- //fmt.Println("write 400 22222222222222222")
- ctx.ResponseWriter.WriteHeader(http.StatusBadRequest)
- //ctx.Output.Context.ResponseWriter.WriteHeader(http.StatusBadRequest)
- //ctx.Output.Status = http.StatusBadRequest
- //ctx.Input.Context.Request.Header.Set("gd-state", "400")
- }
- }
|