client.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package gd_management
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "runtime"
  7. "strings"
  8. "sync"
  9. "time"
  10. "gd_access_log/common.in/cache"
  11. "github.com/smallnest/rpcx/client"
  12. "github.com/smallnest/rpcx/protocol"
  13. "github.com/smallnest/rpcx/share"
  14. )
  15. type GdManagementXClient struct {
  16. xcli client.XClient
  17. }
  18. var nameMutex sync.RWMutex
  19. var nameMap = map[string]*ManagementGetNameReply{}
  20. func getNameFromMap(req *ManagementGetNameReq) *ManagementGetNameReply {
  21. bytes, _ := json.Marshal(*req)
  22. nameMutex.RLock()
  23. defer nameMutex.RUnlock()
  24. ret, ok := nameMap[string(bytes)]
  25. if ok == false {
  26. return nil
  27. }
  28. return ret
  29. }
  30. func setNameMap(req *ManagementGetNameReq, reply *ManagementGetNameReply) {
  31. bytes, _ := json.Marshal(*req)
  32. nameMutex.Lock()
  33. defer nameMutex.Unlock()
  34. nameMap[string(bytes)] = reply
  35. }
  36. func clearNameMap(key string) {
  37. nameMutex.Lock()
  38. defer nameMutex.Unlock()
  39. newmap := map[string]*ManagementGetNameReply{}
  40. for k, v := range nameMap {
  41. if strings.Contains(k, key) == false {
  42. newmap[k] = v
  43. }
  44. }
  45. nameMap = newmap
  46. }
  47. func CleanNameMap() {
  48. nameMutex.Lock()
  49. defer nameMutex.Unlock()
  50. i := 0
  51. array := make([]string, len(nameMap))
  52. for k, _ := range nameMap {
  53. array[i] = k
  54. i++
  55. }
  56. for _, v := range array {
  57. delete(nameMap, v)
  58. }
  59. }
  60. var NameNotifyChannel = "name_notify"
  61. func Watch() {
  62. go func() {
  63. cache.Redis.RegisterFunc(NameNotifyChannel, clearNameMap)
  64. cache.Redis.SubscribeAndHandle(NameNotifyChannel)
  65. }()
  66. }
  67. func (a *GdManagementXClient) Init(etcdAddrs []string, basePath, servicePath string) {
  68. opt := client.Option{
  69. Retries: 3,
  70. RPCPath: share.DefaultRPCPath,
  71. ConnectTimeout: 10 * time.Second,
  72. SerializeType: protocol.JSON,
  73. CompressType: protocol.None,
  74. BackupLatency: 10 * time.Millisecond,
  75. }
  76. if runtime.GOOS == "windows" {
  77. d := client.NewEtcdDiscovery(fmt.Sprintf("/%s", basePath), servicePath, etcdAddrs, nil)
  78. a.xcli = client.NewXClient(servicePath, client.Failtry, client.RandomSelect, d, opt)
  79. } else {
  80. d := client.NewEtcdDiscovery(basePath, servicePath, etcdAddrs, nil)
  81. a.xcli = client.NewXClient(servicePath, client.Failtry, client.RandomSelect, d, opt)
  82. }
  83. }
  84. // k8s模式下,服务发现方式需要改为点对点,使用k8s自身的Service进行服务发现,Service名称格式必须是abc-xyz的方式,不能使用"_"
  85. func (a *GdManagementXClient) InitForK8s(servicePath string, k8sServiceName string, k8sServicePort string) {
  86. opt := client.Option{
  87. Retries: 1,
  88. RPCPath: share.DefaultRPCPath,
  89. ConnectTimeout: 10 * time.Second,
  90. SerializeType: protocol.JSON,
  91. CompressType: protocol.None,
  92. BackupLatency: 10 * time.Millisecond,
  93. }
  94. sd := client.NewPeer2PeerDiscovery(fmt.Sprintf("tcp@%s:%s", k8sServiceName, k8sServicePort), "")
  95. a.xcli = client.NewXClient(servicePath, client.Failtry, client.RandomSelect, sd, opt)
  96. }
  97. func (a *GdManagementXClient) ManagementGetName(ctx context.Context, req *ManagementGetNameReq) (reply ManagementGetNameReply, err error) {
  98. if r := getNameFromMap(req); r != nil {
  99. reply = *r
  100. return
  101. }
  102. err = a.xcli.Call(ctx, "ManagementGetName", req, &reply)
  103. if err == nil {
  104. data := reply
  105. setNameMap(req, &data)
  106. }
  107. return
  108. }
  109. func (a *GdManagementXClient) ManagementGetProviderApiLimitCount(ctx context.Context, req *ManagementGetProviderApiLimitCountReq) (reply ManagementGetProviderApiLimitCountReply, err error) {
  110. err = a.xcli.Call(ctx, "ManagementGetProviderApiLimitCount", req, &reply)
  111. return
  112. }
  113. func (a *GdManagementXClient) MangementGetUserMerchantCountCode(ctx context.Context, req *MangementGetUserMerchantCountCodeReq) (reply MangementGetUserMerchantCountCodeReply, err error) {
  114. err = a.xcli.Call(ctx, "MangementGetUserMerchantCountCode", req, &reply)
  115. return
  116. }
  117. func (a *GdManagementXClient) GetApiThreshold(ctx context.Context, req *GetApiThresholdReq) (reply GetApiThresholdReply, err error) {
  118. err = a.xcli.Call(ctx, "GetApiThreshold", req, &reply)
  119. return
  120. }
  121. func (a *GdManagementXClient) ManagementGetMerchantIds(ctx context.Context, req *ManagementGetMerchantIdsReq) (reply ManagementGetMerchantIdsReply, err error) {
  122. err = a.xcli.Call(ctx, "ManagementGetMerchantIds", req, &reply)
  123. return
  124. }
  125. func (a *GdManagementXClient) ManagementGetMerchantApiId(ctx context.Context, req *ManagementGetMerchantApiIdReq) (reply ManagementGetMerchantApiIdReply, err error) {
  126. err = a.xcli.Call(ctx, "ManagementGetMerchantApiId", req, &reply)
  127. return
  128. }
  129. func (a *GdManagementXClient) ManagementGetProviderCountInfo(ctx context.Context, req *ManagementGetProviderCountInfoReq) (reply ManagementGetProviderCountInfoReply, err error) {
  130. err = a.xcli.Call(ctx, "ManagementGetProviderCountInfo", req, &reply)
  131. return
  132. }
  133. func (a *GdManagementXClient) ManagementGetProviderIds(ctx context.Context, req *ManagementGetProviderIdsReq) (reply ManagementGetProviderIdsReply, err error) {
  134. err = a.xcli.Call(ctx, "ManagementGetProviderIds", req, &reply)
  135. return
  136. }
  137. func (a *GdManagementXClient) ManagementGetApiShowInfo(ctx context.Context, req *ManagementGetApiShowInfoReq) (reply ManagementGetApiShowInfoReply, err error) {
  138. err = a.xcli.Call(ctx, "ManagementGetApiShowInfo", req, &reply)
  139. return
  140. }
  141. func (a *GdManagementXClient) ManagementGetMerchantApiTimeout(ctx context.Context, req *ManagementGetMerchantApiTimeoutReq) (reply ManagementGetMerchantApiTimeoutReply, err error) {
  142. err = a.xcli.Call(ctx, "ManagementGetMerchantApiTimeout", req, &reply)
  143. return
  144. }