common.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. package utils
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "gd_gateway/apis"
  7. "gd_gateway/common.in/jsonrpc2"
  8. dutils "gd_gateway/common.in/utils"
  9. "gd_gateway/consts"
  10. "gd_gateway/errors"
  11. "gd_gateway/rpc_apis_v1"
  12. bcontext "github.com/astaxie/beego/context"
  13. "net/http"
  14. "os"
  15. "strings"
  16. "sync"
  17. "time"
  18. "github.com/syndtr/goleveldb/leveldb"
  19. "go.uber.org/zap"
  20. )
  21. var discoveryType string
  22. func SetDiscoveryType(mode string) {
  23. discoveryType = mode
  24. }
  25. func getLevelDbDir() string {
  26. dir := ""
  27. if discoveryType == "k8s" {
  28. hostname, _ := os.Hostname()
  29. dir = fmt.Sprintf("/mnt/%s_leveldb_log", hostname)
  30. } else {
  31. dir = "/var/log/gd_access_local_leveldb.log"
  32. }
  33. fmt.Println("leveldb dir:", dir)
  34. return dir
  35. }
  36. func AddProviderCount(providerApiId int64, merchantChildApiId int64, state bool, rawCode string) {
  37. req := apis.ManagementThirdpartCountIncreaseReq{
  38. ProviderApiId: providerApiId,
  39. MerchantChildApiId: merchantChildApiId,
  40. State: state,
  41. RawCode: rawCode,
  42. }
  43. rpc_apis_v1.AuthCheck.ManagementThirdpartCountIncrease(context.Background(), &req)
  44. }
  45. var Ldb *leveldb.DB
  46. var Lmu sync.Mutex
  47. var LdbBak *leveldb.DB
  48. var LmuBak sync.Mutex
  49. func getLevelDbKey(db *leveldb.DB, timestamp int64, apiid int64) string {
  50. for {
  51. key := fmt.Sprintf("%d-%d-%s", time.Now().UnixNano(), timestamp, apiid)
  52. bytes, _ := db.Get([]byte(key), nil)
  53. if len(bytes) > 0 {
  54. time.Sleep(200 * time.Millisecond)
  55. continue
  56. }
  57. return key
  58. }
  59. return ""
  60. }
  61. func DeleteAccessLogFromLevelDbBak(key string) {
  62. if LdbBak == nil {
  63. return
  64. }
  65. if key == "" {
  66. return
  67. }
  68. LdbBak.Delete([]byte(key), nil)
  69. }
  70. func WriteAccessLogToLevelDbBak(accesslog *apis.LogAddAccessLogReq) string {
  71. LmuBak.Lock()
  72. defer LmuBak.Unlock()
  73. bytes, _ := json.Marshal(accesslog)
  74. if LdbBak == nil {
  75. var err error
  76. LdbBak, err = leveldb.OpenFile("/var/log/gd_access_local_leveldb_bak.log", nil)
  77. if err != nil {
  78. l.Error("open leveldb",
  79. zap.String("call", "OpenFile"),
  80. zap.String("args", ""),
  81. zap.String("error", err.Error()))
  82. return ""
  83. }
  84. }
  85. key := getLevelDbKey(LdbBak, accesslog.TimeStamp, accesslog.ApiId)
  86. LdbBak.Put([]byte(key), bytes, nil)
  87. return key
  88. }
  89. func WriteAccessLogToLevelDb(accesslog *apis.LogAddAccessLogReq) {
  90. Lmu.Lock()
  91. defer Lmu.Unlock()
  92. bytes, _ := json.Marshal(accesslog)
  93. if Ldb == nil {
  94. var err error
  95. Ldb, err = leveldb.OpenFile(getLevelDbDir(), nil)
  96. if err != nil {
  97. l.Error("open leveldb",
  98. zap.String("call", "OpenFile"),
  99. zap.String("args", ""),
  100. zap.String("error", err.Error()))
  101. return
  102. }
  103. }
  104. key := getLevelDbKey(Ldb, accesslog.TimeStamp, accesslog.ApiId)
  105. Ldb.Put([]byte(key), bytes, nil)
  106. }
  107. func readLevelDb(db *leveldb.DB) {
  108. iter := db.NewIterator(nil, nil)
  109. for iter.Next() {
  110. bytes := iter.Value()
  111. accessLog := apis.LogAddAccessLogReq{}
  112. json.Unmarshal(bytes, &accessLog)
  113. _, err := rpc_apis_v1.AccessLog.LogAddAccessLog(context.Background(), &accessLog)
  114. if err == nil {
  115. db.Delete(iter.Key(), nil)
  116. }
  117. }
  118. }
  119. func LoopLevelDb() {
  120. if Ldb == nil {
  121. var err error
  122. Ldb, err = leveldb.OpenFile(getLevelDbDir(), nil)
  123. if err != nil {
  124. l.Error("open leveldb",
  125. zap.String("call", "OpenFile"),
  126. zap.String("args", ""),
  127. zap.String("error", err.Error()))
  128. fmt.Printf("open leveldb failed\n")
  129. os.Exit(-1)
  130. }
  131. }
  132. t := time.NewTicker(5 * 60 * time.Second)
  133. for {
  134. select {
  135. case <-t.C:
  136. readLevelDb(Ldb)
  137. default:
  138. time.Sleep(2 * time.Second)
  139. }
  140. }
  141. }
  142. func WriteAccessLog(ctx context.Context, accessLog apis.LogAddAccessLogReq, startTime uint64, merchantChildApiId int64, Err error) {
  143. go func() {
  144. endTime := uint64(time.Now().UnixNano())
  145. accessLog.AccessLogWrite.Elapsed = float64(endTime-startTime) / 1000000
  146. accessLog.TimeStamp = int64(startTime / 1e9)
  147. accessLog.AccessLogWrite.Msg = "成功"
  148. accessLog.MerchantChildApiId = merchantChildApiId
  149. if accessLog.ReplaceInfo.ReplaceMerchantDataApiId > 0 {
  150. accessLog.MerchantId = accessLog.ReplaceInfo.ReplaceMerchantId
  151. accessLog.ApiId = accessLog.ReplaceInfo.ReplaceBaseApiId
  152. accessLog.MerchantDataApiId = accessLog.ReplaceInfo.ReplaceMerchantDataApiId
  153. accessLog.MerchantChildApiId = accessLog.ReplaceInfo.ReplaceMerchantChildApiId
  154. merchantChildApiId = accessLog.ReplaceInfo.ReplaceMerchantChildApiId
  155. }
  156. if Err != nil {
  157. accessLog.AccessLogWrite.State = false
  158. var e jsonrpc2.Error
  159. err := json.Unmarshal([]byte(Err.Error()), &e)
  160. if err != nil {
  161. accessLog.IsReuse = true
  162. accessLog.AccessLogWrite.Code = 1100
  163. accessLog.AccessLogWrite.Msg = Err.Error()
  164. } else {
  165. accessLog.AccessLogWrite.Code = e.Code
  166. if e.Code != 0 {
  167. accessLog.AccessLogWrite.Msg = e.Message
  168. }
  169. }
  170. if accessLog.RespCode == 0 && accessLog.AccessLogWrite.Code != 0 {
  171. accessLog.RespCode = accessLog.AccessLogWrite.Code
  172. }
  173. } else {
  174. accessLog.AccessLogWrite.State = true
  175. }
  176. //key := WriteAccessLogToLevelDbBak(&accessLog)
  177. _, err := rpc_apis_v1.AccessLog.LogAddAccessLog(ctx, &accessLog)
  178. if err != nil {
  179. l.Error("rpc",
  180. zap.String("call", "LogAddAccessLog"),
  181. zap.String("args", dutils.MarshalJsonString(accessLog)),
  182. zap.String("error", err.Error()))
  183. WriteAccessLogToLevelDb(&accessLog)
  184. } else {
  185. //DeleteAccessLogFromLevelDbBak(key)
  186. }
  187. if merchantChildApiId != 0 {
  188. for i, _ := range accessLog.ThirdpartLogWrites {
  189. rawCode := accessLog.ThirdpartLogWrites[i].RawCode
  190. if rawCode == "" {
  191. rawCode = fmt.Sprintf("%d", accessLog.ThirdpartLogWrites[i].Code)
  192. }
  193. AddProviderCount(accessLog.ThirdpartLogWrites[i].ProviderApiId, merchantChildApiId, accessLog.ThirdpartLogWrites[i].State, rawCode)
  194. }
  195. }
  196. }()
  197. }
  198. func WriteInvalidAccessLog(ctx context.Context, remoteAddr, requestParams string, merchantId, apiId, startTime int64, err error) {
  199. go func() {
  200. accessLog := apis.LogAddInvalidAccessLogReq{}
  201. accessLog.RequestParams = requestParams
  202. accessLog.RemoteAddr = remoteAddr
  203. accessLog.ApiId = apiId
  204. accessLog.MerchantId = merchantId
  205. endTime := time.Now().UnixNano()
  206. accessLog.Elapsed = float64(endTime-startTime) / 1000000
  207. accessLog.TimeStamp = time.Now().Unix()
  208. var e jsonrpc2.Error
  209. err := json.Unmarshal([]byte(err.Error()), &e)
  210. if err != nil {
  211. accessLog.Code = 10002
  212. accessLog.Msg = "内部服务错误"
  213. } else {
  214. accessLog.Code = e.Code
  215. accessLog.Msg = e.Message
  216. }
  217. _, err = rpc_apis_v1.AccessLog.LogAddInvalidAccessLog(ctx, &accessLog)
  218. if err != nil {
  219. l.Error("rpc",
  220. zap.String("call", "LogAddInvalidAccessLog"),
  221. zap.String("args", dutils.MarshalJsonString(accessLog)),
  222. zap.String("error", err.Error()))
  223. }
  224. }()
  225. }
  226. func FillMerchantApiInfo(merchantApiInfo *apis.MerchantApiInfo, merchantApiResult *apis.CheckMerchantApiResult) {
  227. merchantApiInfo.BaseApiId = merchantApiResult.BaseApiId
  228. merchantApiInfo.MerchantId = merchantApiResult.MerchantId
  229. merchantApiInfo.IsForceUpdate = merchantApiResult.IsForceUpdate
  230. merchantApiInfo.MerchantChildApiId = merchantApiResult.MerchantChildApiId
  231. merchantApiInfo.ReuseTime = merchantApiResult.ReuseTime
  232. merchantApiInfo.RandomPercentage = merchantApiResult.RandomPercentage
  233. merchantApiInfo.ApiTimeout = merchantApiResult.Timeout
  234. merchantApiInfo.ResponseParamConf = merchantApiResult.ResponseParamConf
  235. }
  236. func NewAccessLogNew(rawRequestParams string, isReuse bool, LReq []apis.ThirdpartLogWrite, RemoteAddr string, merchantApiResult *apis.CheckMerchantApiResult) apis.LogAddAccessLogReq {
  237. accessLog := apis.LogAddAccessLogReq{
  238. MerchantId: merchantApiResult.MerchantId,
  239. ApiId: merchantApiResult.BaseApiId,
  240. MerchantDataApiId: merchantApiResult.MerchantDataApiId,
  241. RemoteAddr: RemoteAddr,
  242. ReplaceInfo: merchantApiResult.ReplaceInfo,
  243. AccessLogWrite: apis.AccessLogWrite{
  244. IsReuse: isReuse,
  245. RequestParams: string(merchantApiResult.DecryptParam),
  246. RawRequestParams: rawRequestParams,
  247. //ResponseParams: resp.Data, 默认为空数据
  248. },
  249. }
  250. if LReq != nil {
  251. accessLog.ThirdpartLogWrites = LReq
  252. }
  253. return accessLog
  254. }
  255. func WriteAccessLogAndCountCode(ctx context.Context, accessLog apis.LogAddAccessLogReq, startTime uint64, merchantApiResult apis.CheckMerchantApiResult, err, respErr error) {
  256. if respErr != nil {
  257. var e jsonrpc2.Error
  258. err := json.Unmarshal([]byte(respErr.Error()), &e)
  259. if err != nil {
  260. accessLog.AccessLogWrite.RespCode = 1100
  261. } else {
  262. accessLog.AccessLogWrite.RespCode = e.Code
  263. }
  264. } else {
  265. accessLog.AccessLogWrite.RespCode = 0
  266. }
  267. // 写日志,记录原始日志
  268. WriteAccessLog(
  269. ctx,
  270. accessLog,
  271. startTime,
  272. merchantApiResult.MerchantChildApiId,
  273. err)
  274. // 错误码计数,采用响应错误码计数
  275. ManagementCheckCountCode(merchantApiResult, accessLog.AccessLogWrite.RespCode)
  276. // 释放token
  277. if merchantApiResult.Token.RouterToken != "" || merchantApiResult.Token.MerchantToken != "" {
  278. _, _ = rpc_apis_v1.AuthCheck.ReleaseToken(ctx, &merchantApiResult.Token)
  279. }
  280. }
  281. func IsApiTimeOut(ctx context.Context, accessLog apis.LogAddAccessLogReq, startTime uint64, merchantApiResult apis.CheckMerchantApiResult) error {
  282. // 检查接口是否超时,超时返回
  283. if merchantApiResult.Timeout > 0 {
  284. endTime := uint64(time.Now().UnixNano())
  285. if ((endTime - startTime) / 1e9) > merchantApiResult.Timeout {
  286. // 写日志并计数
  287. WriteAccessLogAndCountCode(ctx,
  288. accessLog,
  289. startTime,
  290. merchantApiResult,
  291. errors.ApiTimeOut, errors.ApiTimeOut)
  292. return errors.ApiTimeOut
  293. }
  294. }
  295. if merchantApiResult.MinimalTimeConsuming > 0 {
  296. reqTime := ctx.Value(consts.RequestStar).(int64)
  297. useTime := int(time.Now().Unix() - reqTime)
  298. if useTime < merchantApiResult.MinimalTimeConsuming {
  299. time.Sleep(time.Duration(merchantApiResult.MinimalTimeConsuming-useTime) * time.Second)
  300. }
  301. }
  302. return nil
  303. }
  304. func HttpStateCodeReturn(ctx *bcontext.Context, merchantApiResult apis.CheckMerchantApiResult, respErr error) {
  305. fmt.Println(merchantApiResult.IsHttpCode, merchantApiResult.CountType, respErr, merchantApiResult.CountCode)
  306. if merchantApiResult.IsHttpCode == 1 && merchantApiResult.CountType == 1 {
  307. respCode := 0
  308. if respErr != nil {
  309. var e jsonrpc2.Error
  310. err := json.Unmarshal([]byte(respErr.Error()), &e)
  311. if err != nil {
  312. respCode = 10002
  313. } else {
  314. respCode = e.Code
  315. }
  316. }
  317. //fmt.Println("respcode 111111111111111:", respCode)
  318. code := fmt.Sprintf("%d", respCode)
  319. array := strings.Split(merchantApiResult.CountCode, ",")
  320. for _, v := range array {
  321. if v == code {
  322. return
  323. }
  324. }
  325. //fmt.Println("write 400 22222222222222222")
  326. ctx.ResponseWriter.WriteHeader(http.StatusBadRequest)
  327. //ctx.Output.Context.ResponseWriter.WriteHeader(http.StatusBadRequest)
  328. //ctx.Output.Status = http.StatusBadRequest
  329. //ctx.Input.Context.Request.Header.Set("gd-state", "400")
  330. }
  331. }