rcvr.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. // Copyright 2019 getensh.com. All rights reserved.
  2. // Use of this source code is governed by getensh.com.
  3. // package impl implement all interfaces of micro service for vehicle
  4. package impl
  5. import (
  6. "context"
  7. "encoding/json"
  8. "gd_vehicle/impl/thirdparty_impl/cdbd"
  9. "gd_vehicle/impl/thirdparty_impl/dy"
  10. "gd_vehicle/impl/thirdparty_impl/dybd"
  11. "gd_vehicle/impl/thirdparty_impl/zr"
  12. "runtime"
  13. "sync"
  14. "time"
  15. "github.com/astaxie/beego/orm"
  16. "gd_vehicle/apis"
  17. "gd_vehicle/impl/thirdparty_impl/adm"
  18. "gd_vehicle/impl/vehicle"
  19. "gd_vehicle/common.in/cache"
  20. "gd_vehicle/common.in/jsonrpc2"
  21. "gd_vehicle/common.in/task"
  22. "gd_vehicle/common.in/utils"
  23. "go.uber.org/zap"
  24. )
  25. // 具体实现
  26. type Rcvr struct {
  27. }
  28. // 操作pm的公共锁
  29. var mutex sync.Mutex
  30. var pm = map[string]*ParamReuse{}
  31. //短时复用说明:
  32. //1.相同接口相同参数,如果一次调用未完成,又发起另一次调用,则复用第一次调用的结果
  33. //2.第一次是指全局的第一次(服务器集群内该接口的该类参数的第一次调用)
  34. //3.如果本次为第一次调用,需设置本地参数缓存(ParamReuse)和redis参数缓存(ParamReuse)
  35. //4.如果已有本地参数缓存,则等待结果
  36. //5.如果没有本地参数缓存,且没有redis缓存,则本次为第一次,作步骤3操作
  37. //6.如果没有本地参数缓存, 但已有redis缓存,则等待redis结果
  38. //7.本地参数引用计数为0时,清除本地参数缓存
  39. //8.redis参数引用计数为0时,清除redis缓存
  40. type ParamReuse struct {
  41. //参数引用计数
  42. Used int
  43. //响应结果
  44. Res string
  45. //是否使用了redis
  46. Redis bool `json:"-"`
  47. //参数锁引用计数
  48. Locked int `json:"-"`
  49. //参数锁
  50. Mx sync.Mutex `json:"-"`
  51. }
  52. const (
  53. ParamExist = 1
  54. ParamExistInRedis = 2
  55. ParamNotExist = 3
  56. )
  57. func getFuncName() string {
  58. funcPtr, _, _, _ := runtime.Caller(1)
  59. return runtime.FuncForPC(funcPtr).Name()
  60. }
  61. func paramLock(req string) {
  62. mutex.Lock()
  63. p, ok := pm[req]
  64. if ok == false {
  65. pm[req] = &ParamReuse{}
  66. p = pm[req]
  67. }
  68. p.Locked += 1
  69. mutex.Unlock()
  70. p.Mx.Lock()
  71. }
  72. func paramUnlock(req string, clean bool) {
  73. mutex.Lock()
  74. defer mutex.Unlock()
  75. if p, ok := pm[req]; ok {
  76. defer p.Mx.Unlock()
  77. if p.Locked > 0 {
  78. p.Locked -= 1
  79. }
  80. if clean == false {
  81. return
  82. }
  83. if p.Used > 0 {
  84. p.Used -= 1
  85. }
  86. if p.Locked == 0 && p.Used == 0 {
  87. if p.Redis {
  88. cleanParamReqFromRedis(req)
  89. }
  90. delete(pm, req)
  91. if len(pm) == 0 {
  92. pm = map[string]*ParamReuse{}
  93. }
  94. }
  95. }
  96. }
  97. func setParamReq(req string) int {
  98. paramLock(req)
  99. defer paramUnlock(req, false)
  100. p, _ := pm[req]
  101. if p.Used == 0 {
  102. utils.Lock(req)
  103. defer utils.UnLock(req)
  104. s, _ := cache.Redis.Get(req)
  105. p.Used = 1
  106. p.Redis = true
  107. if s == "" {
  108. pr := ParamReuse{
  109. Used: 1,
  110. }
  111. bytes, _ := json.Marshal(pr)
  112. cache.Redis.SetEx(req, 60, string(bytes))
  113. return ParamNotExist
  114. }
  115. pr := ParamReuse{}
  116. json.Unmarshal([]byte(s), &pr)
  117. pr.Used++
  118. bytes, _ := json.Marshal(pr)
  119. cache.Redis.SetEx(req, 60, string(bytes))
  120. return ParamExistInRedis
  121. }
  122. p.Used += 1
  123. return ParamExist
  124. }
  125. func cleanParamReqFromRedis(req string) {
  126. utils.Lock(req)
  127. defer utils.UnLock(req)
  128. s, _ := cache.Redis.Get(req)
  129. if s == "" {
  130. return
  131. }
  132. pr := ParamReuse{}
  133. json.Unmarshal([]byte(s), &pr)
  134. pr.Used -= 1
  135. if pr.Used == 0 {
  136. cache.Redis.Del(req)
  137. return
  138. }
  139. bytes, _ := json.Marshal(pr)
  140. cache.Redis.SetEx(req, 60, string(bytes))
  141. }
  142. func cleanParamReq(req string) {
  143. paramLock(req)
  144. paramUnlock(req, true)
  145. }
  146. func setParamRes(req string, res string) {
  147. paramLock(req)
  148. defer paramUnlock(req, false)
  149. p, _ := pm[req]
  150. p.Res = res
  151. if p.Redis == false {
  152. return
  153. }
  154. setParamResToRedis(req, res)
  155. }
  156. func setParamResLocal(req string, res string) {
  157. paramLock(req)
  158. defer paramUnlock(req, false)
  159. p, _ := pm[req]
  160. p.Res = res
  161. }
  162. func setParamResToRedis(req string, res string) {
  163. utils.Lock(req)
  164. defer utils.UnLock(req)
  165. s, _ := cache.Redis.Get(req)
  166. if s == "" {
  167. return
  168. }
  169. pr := ParamReuse{}
  170. json.Unmarshal([]byte(s), &pr)
  171. pr.Res = res
  172. bytes, _ := json.Marshal(pr)
  173. cache.Redis.SetEx(req, 60, string(bytes))
  174. }
  175. func getParamRes(req string) string {
  176. paramLock(req)
  177. defer paramUnlock(req, false)
  178. p, _ := pm[req]
  179. return p.Res
  180. }
  181. func getParamResFromRedis(req string) string {
  182. s, _ := cache.Redis.Get(req)
  183. if s == "" {
  184. return ""
  185. }
  186. pr := ParamReuse{}
  187. json.Unmarshal([]byte(s), &pr)
  188. return pr.Res
  189. }
  190. func setOrWaitParam(req string) string {
  191. exist := setParamReq(req)
  192. if exist == ParamNotExist {
  193. return ""
  194. }
  195. count := 0
  196. if exist == ParamExist {
  197. for {
  198. res := getParamRes(req)
  199. if res != "" {
  200. return res
  201. }
  202. time.Sleep(50 * time.Millisecond)
  203. count++
  204. if count >= 1200 {
  205. break
  206. }
  207. }
  208. }
  209. if exist == ParamExistInRedis {
  210. for {
  211. res := getParamResFromRedis(req)
  212. if res != "" {
  213. setParamResLocal(req, res)
  214. return res
  215. }
  216. time.Sleep(50 * time.Millisecond)
  217. count++
  218. if count >= 1200 {
  219. break
  220. }
  221. }
  222. }
  223. return ""
  224. }
  225. func (c *Rcvr) Query(ctx context.Context, req *apis.CommonReq, reply *apis.CommonReply) error {
  226. if false {
  227. funcName := getFuncName()
  228. newReq := *req
  229. newReq.MerchantApiInfo = apis.MerchantApiInfo{}
  230. reqBytes, _ := json.Marshal(newReq)
  231. reqString := string(reqBytes) + funcName
  232. setOrWaitParam(reqString)
  233. defer cleanParamReq(reqString)
  234. defer func() {
  235. setParamRes(reqString, "-")
  236. }()
  237. }
  238. t1 := func() error {
  239. return vehicle.Query(ctx, req, reply)
  240. }
  241. err := task.Do(ctx, t1)
  242. if err != nil {
  243. var e jsonrpc2.Error
  244. merr := json.Unmarshal([]byte(err.Error()), &e)
  245. if merr != nil {
  246. reply.ErrCode = int(1001)
  247. reply.ErrMsg = "服务错误"
  248. }
  249. reply.ErrCode = e.Code
  250. reply.ErrMsg = e.Message
  251. }
  252. return nil
  253. }
  254. func SetLogger(logger *zap.Logger) {
  255. vehicle.SetLogger(logger)
  256. adm.SetLogger(logger)
  257. cdbd.SetLogger(logger)
  258. dy.SetLogger(logger)
  259. zr.SetLogger(logger)
  260. dybd.SetLogger(logger)
  261. }
  262. func RegisterOrmModel() {
  263. orm.RegisterModel(
  264. new(apis.CdbdData))
  265. }