123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- package gd_management
- import (
- "context"
- "encoding/json"
- "fmt"
- "runtime"
- "strings"
- "sync"
- "time"
- "gd_access_log/common.in/cache"
- "github.com/smallnest/rpcx/client"
- "github.com/smallnest/rpcx/protocol"
- "github.com/smallnest/rpcx/share"
- )
- type GdManagementXClient struct {
- xcli client.XClient
- }
- var nameMutex sync.RWMutex
- var nameMap = map[string]*ManagementGetNameReply{}
- func getNameFromMap(req *ManagementGetNameReq) *ManagementGetNameReply {
- bytes, _ := json.Marshal(*req)
- nameMutex.RLock()
- defer nameMutex.RUnlock()
- ret, ok := nameMap[string(bytes)]
- if ok == false {
- return nil
- }
- return ret
- }
- func setNameMap(req *ManagementGetNameReq, reply *ManagementGetNameReply) {
- bytes, _ := json.Marshal(*req)
- nameMutex.Lock()
- defer nameMutex.Unlock()
- nameMap[string(bytes)] = reply
- }
- func clearNameMap(key string) {
- nameMutex.Lock()
- defer nameMutex.Unlock()
- newmap := map[string]*ManagementGetNameReply{}
- for k, v := range nameMap {
- if strings.Contains(k, key) == false {
- newmap[k] = v
- }
- }
- nameMap = newmap
- }
- func CleanNameMap() {
- nameMutex.Lock()
- defer nameMutex.Unlock()
- i := 0
- array := make([]string, len(nameMap))
- for k, _ := range nameMap {
- array[i] = k
- i++
- }
- for _, v := range array {
- delete(nameMap, v)
- }
- }
- var NameNotifyChannel = "name_notify"
- func Watch() {
- go func() {
- cache.Redis.RegisterFunc(NameNotifyChannel, clearNameMap)
- cache.Redis.SubscribeAndHandle(NameNotifyChannel)
- }()
- }
- func (a *GdManagementXClient) Init(etcdAddrs []string, basePath, servicePath string) {
- opt := client.Option{
- Retries: 3,
- RPCPath: share.DefaultRPCPath,
- ConnectTimeout: 10 * time.Second,
- SerializeType: protocol.JSON,
- CompressType: protocol.None,
- BackupLatency: 10 * time.Millisecond,
- }
- if runtime.GOOS == "windows" {
- d := client.NewEtcdDiscovery(fmt.Sprintf("/%s", basePath), servicePath, etcdAddrs, nil)
- a.xcli = client.NewXClient(servicePath, client.Failtry, client.RandomSelect, d, opt)
- } else {
- d := client.NewEtcdDiscovery(basePath, servicePath, etcdAddrs, nil)
- a.xcli = client.NewXClient(servicePath, client.Failtry, client.RandomSelect, d, opt)
- }
- }
- // k8s模式下,服务发现方式需要改为点对点,使用k8s自身的Service进行服务发现,Service名称格式必须是abc-xyz的方式,不能使用"_"
- func (a *GdManagementXClient) InitForK8s(servicePath string, k8sServiceName string, k8sServicePort string) {
- opt := client.Option{
- Retries: 1,
- RPCPath: share.DefaultRPCPath,
- ConnectTimeout: 10 * time.Second,
- SerializeType: protocol.JSON,
- CompressType: protocol.None,
- BackupLatency: 10 * time.Millisecond,
- }
- sd := client.NewPeer2PeerDiscovery(fmt.Sprintf("tcp@%s:%s", k8sServiceName, k8sServicePort), "")
- a.xcli = client.NewXClient(servicePath, client.Failtry, client.RandomSelect, sd, opt)
- }
- func (a *GdManagementXClient) ManagementGetName(ctx context.Context, req *ManagementGetNameReq) (reply ManagementGetNameReply, err error) {
- if r := getNameFromMap(req); r != nil {
- reply = *r
- return
- }
- err = a.xcli.Call(ctx, "ManagementGetName", req, &reply)
- if err == nil {
- data := reply
- setNameMap(req, &data)
- }
- return
- }
- func (a *GdManagementXClient) ManagementGetProviderApiLimitCount(ctx context.Context, req *ManagementGetProviderApiLimitCountReq) (reply ManagementGetProviderApiLimitCountReply, err error) {
- err = a.xcli.Call(ctx, "ManagementGetProviderApiLimitCount", req, &reply)
- return
- }
- func (a *GdManagementXClient) MangementGetUserMerchantCountCode(ctx context.Context, req *MangementGetUserMerchantCountCodeReq) (reply MangementGetUserMerchantCountCodeReply, err error) {
- err = a.xcli.Call(ctx, "MangementGetUserMerchantCountCode", req, &reply)
- return
- }
- func (a *GdManagementXClient) GetApiThreshold(ctx context.Context, req *GetApiThresholdReq) (reply GetApiThresholdReply, err error) {
- err = a.xcli.Call(ctx, "GetApiThreshold", req, &reply)
- return
- }
- func (a *GdManagementXClient) ManagementGetMerchantIds(ctx context.Context, req *ManagementGetMerchantIdsReq) (reply ManagementGetMerchantIdsReply, err error) {
- err = a.xcli.Call(ctx, "ManagementGetMerchantIds", req, &reply)
- return
- }
- func (a *GdManagementXClient) ManagementGetMerchantApiId(ctx context.Context, req *ManagementGetMerchantApiIdReq) (reply ManagementGetMerchantApiIdReply, err error) {
- err = a.xcli.Call(ctx, "ManagementGetMerchantApiId", req, &reply)
- return
- }
- func (a *GdManagementXClient) ManagementGetProviderCountInfo(ctx context.Context, req *ManagementGetProviderCountInfoReq) (reply ManagementGetProviderCountInfoReply, err error) {
- err = a.xcli.Call(ctx, "ManagementGetProviderCountInfo", req, &reply)
- return
- }
- func (a *GdManagementXClient) ManagementGetProviderIds(ctx context.Context, req *ManagementGetProviderIdsReq) (reply ManagementGetProviderIdsReply, err error) {
- err = a.xcli.Call(ctx, "ManagementGetProviderIds", req, &reply)
- return
- }
- func (a *GdManagementXClient) ManagementGetApiShowInfo(ctx context.Context, req *ManagementGetApiShowInfoReq) (reply ManagementGetApiShowInfoReply, err error) {
- err = a.xcli.Call(ctx, "ManagementGetApiShowInfo", req, &reply)
- return
- }
- func (a *GdManagementXClient) ManagementGetMerchantApiTimeout(ctx context.Context, req *ManagementGetMerchantApiTimeoutReq) (reply ManagementGetMerchantApiTimeoutReply, err error) {
- err = a.xcli.Call(ctx, "ManagementGetMerchantApiTimeout", req, &reply)
- return
- }
|