etcd.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package etcd
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "strings"
  7. "time"
  8. "adm-management/parser"
  9. uuid "github.com/satori/go.uuid"
  10. clientv3 "go.etcd.io/etcd/client/v3"
  11. "go.etcd.io/etcd/client/v3/naming/endpoints"
  12. )
  13. var cli *clientv3.Client
  14. func Get() *clientv3.Client {
  15. return cli
  16. }
  17. func Init(addr, serviceIp, servicePort string) {
  18. var err error
  19. cli, err = clientv3.New(clientv3.Config{
  20. Endpoints: strings.Split(addr, ","),
  21. DialTimeout: 5 * time.Second,
  22. })
  23. if err != nil {
  24. panic(err)
  25. }
  26. // 注册续租
  27. err = relet(parser.Conf.Rpc.ADMManagement.ServiceName,
  28. serviceIp+":"+servicePort,
  29. parser.Conf.Rpc.Prefix,
  30. )
  31. if err != nil {
  32. panic(err)
  33. }
  34. }
  35. func getValue(addr string) string {
  36. return `{"Addr:":"http://` + addr + `"}`
  37. }
  38. // 续租
  39. func relet(serviceName, serviceAddr, prefix string) error {
  40. ctx := context.Background()
  41. // 创建一个租约
  42. lease := clientv3.NewLease(cli)
  43. cancelCtx, cancel := context.WithTimeout(ctx, time.Second*3)
  44. defer cancel()
  45. leaseResp, err := lease.Grant(cancelCtx, 3)
  46. if err != nil {
  47. return err
  48. }
  49. // 长链接
  50. leaseChannel, err := lease.KeepAlive(ctx, leaseResp.ID)
  51. if err != nil {
  52. return err
  53. }
  54. em, err := endpoints.NewManager(cli, prefix)
  55. if err != nil {
  56. return err
  57. }
  58. cancelCtx, cancel = context.WithTimeout(ctx, time.Second*3)
  59. defer cancel()
  60. if err := em.AddEndpoint(cancelCtx, fmt.Sprintf("%s/%s/%s", prefix, serviceName, uuid.NewV4().String()), endpoints.Endpoint{
  61. Addr: serviceAddr,
  62. }, clientv3.WithLease(leaseResp.ID)); err != nil {
  63. return err
  64. }
  65. log.Println("Register etcd success")
  66. del := func() {
  67. log.Println("Register close")
  68. cancelCtx, cancel = context.WithTimeout(ctx, time.Second*3)
  69. defer cancel()
  70. em.DeleteEndpoint(cancelCtx, serviceName)
  71. lease.Close()
  72. }
  73. // 保持注册状态(连接断开重连)
  74. keepRegister(ctx, leaseChannel, del, serviceName, serviceAddr, prefix)
  75. return nil
  76. }
  77. func keepRegister(
  78. ctx context.Context,
  79. leaseChannel <-chan *clientv3.LeaseKeepAliveResponse,
  80. cleanFunc func(),
  81. serviceName, serviceAddr, prefix string,
  82. ) {
  83. go func() {
  84. failedCount := 0
  85. for {
  86. select {
  87. case resp := <-leaseChannel:
  88. if resp != nil {
  89. // log.Println("keep alive success.")
  90. } else {
  91. log.Println("keep alive failed.")
  92. failedCount++
  93. for failedCount > 3 {
  94. cleanFunc()
  95. if err := relet(serviceName, serviceAddr, prefix); err != nil {
  96. time.Sleep(time.Second)
  97. continue
  98. }
  99. return
  100. }
  101. continue
  102. }
  103. case <-ctx.Done():
  104. cleanFunc()
  105. cli.Close()
  106. return
  107. }
  108. }
  109. }()
  110. }
  111. func UnRegisterEtcd(serviceName, serviceAddr string) {
  112. }