member.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package rpcpb
  15. import (
  16. "context"
  17. "crypto/tls"
  18. "fmt"
  19. "net/url"
  20. "os"
  21. "time"
  22. "github.com/coreos/etcd/clientv3"
  23. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  24. "github.com/coreos/etcd/pkg/transport"
  25. "github.com/coreos/etcd/snapshot"
  26. "github.com/dustin/go-humanize"
  27. "go.uber.org/zap"
  28. grpc "google.golang.org/grpc"
  29. "google.golang.org/grpc/credentials"
  30. )
  31. // ElectionTimeout returns an election timeout duration.
  32. func (m *Member) ElectionTimeout() time.Duration {
  33. return time.Duration(m.Etcd.ElectionTimeoutMs) * time.Millisecond
  34. }
  35. // DialEtcdGRPCServer creates a raw gRPC connection to an etcd member.
  36. func (m *Member) DialEtcdGRPCServer(opts ...grpc.DialOption) (*grpc.ClientConn, error) {
  37. dialOpts := []grpc.DialOption{
  38. grpc.WithTimeout(5 * time.Second),
  39. grpc.WithBlock(),
  40. }
  41. secure := false
  42. for _, cu := range m.Etcd.AdvertiseClientURLs {
  43. u, err := url.Parse(cu)
  44. if err != nil {
  45. return nil, err
  46. }
  47. if u.Scheme == "https" { // TODO: handle unix
  48. secure = true
  49. }
  50. }
  51. if secure {
  52. // assume save TLS assets are already stord on disk
  53. tlsInfo := transport.TLSInfo{
  54. CertFile: m.ClientCertPath,
  55. KeyFile: m.ClientKeyPath,
  56. TrustedCAFile: m.ClientTrustedCAPath,
  57. // TODO: remove this with generated certs
  58. // only need it for auto TLS
  59. InsecureSkipVerify: true,
  60. }
  61. tlsConfig, err := tlsInfo.ClientConfig()
  62. if err != nil {
  63. return nil, err
  64. }
  65. creds := credentials.NewTLS(tlsConfig)
  66. dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
  67. } else {
  68. dialOpts = append(dialOpts, grpc.WithInsecure())
  69. }
  70. dialOpts = append(dialOpts, opts...)
  71. return grpc.Dial(m.EtcdClientEndpoint, dialOpts...)
  72. }
  73. // CreateEtcdClientConfig creates a client configuration from member.
  74. func (m *Member) CreateEtcdClientConfig(opts ...grpc.DialOption) (cfg *clientv3.Config, err error) {
  75. secure := false
  76. for _, cu := range m.Etcd.AdvertiseClientURLs {
  77. var u *url.URL
  78. u, err = url.Parse(cu)
  79. if err != nil {
  80. return nil, err
  81. }
  82. if u.Scheme == "https" { // TODO: handle unix
  83. secure = true
  84. }
  85. }
  86. cfg = &clientv3.Config{
  87. Endpoints: []string{m.EtcdClientEndpoint},
  88. DialTimeout: 10 * time.Second,
  89. DialOptions: opts,
  90. }
  91. if secure {
  92. // assume save TLS assets are already stord on disk
  93. tlsInfo := transport.TLSInfo{
  94. CertFile: m.ClientCertPath,
  95. KeyFile: m.ClientKeyPath,
  96. TrustedCAFile: m.ClientTrustedCAPath,
  97. // TODO: remove this with generated certs
  98. // only need it for auto TLS
  99. InsecureSkipVerify: true,
  100. }
  101. var tlsConfig *tls.Config
  102. tlsConfig, err = tlsInfo.ClientConfig()
  103. if err != nil {
  104. return nil, err
  105. }
  106. cfg.TLS = tlsConfig
  107. }
  108. return cfg, err
  109. }
  110. // CreateEtcdClient creates a client from member.
  111. func (m *Member) CreateEtcdClient(opts ...grpc.DialOption) (*clientv3.Client, error) {
  112. cfg, err := m.CreateEtcdClientConfig(opts...)
  113. if err != nil {
  114. return nil, err
  115. }
  116. return clientv3.New(*cfg)
  117. }
  118. // CheckCompact ensures that historical data before given revision has been compacted.
  119. func (m *Member) CheckCompact(rev int64) error {
  120. cli, err := m.CreateEtcdClient()
  121. if err != nil {
  122. return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
  123. }
  124. defer cli.Close()
  125. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  126. wch := cli.Watch(ctx, "\x00", clientv3.WithFromKey(), clientv3.WithRev(rev-1))
  127. wr, ok := <-wch
  128. cancel()
  129. if !ok {
  130. return fmt.Errorf("watch channel terminated (endpoint %q)", m.EtcdClientEndpoint)
  131. }
  132. if wr.CompactRevision != rev {
  133. return fmt.Errorf("got compact revision %v, wanted %v (endpoint %q)", wr.CompactRevision, rev, m.EtcdClientEndpoint)
  134. }
  135. return nil
  136. }
  137. // Defrag runs defragmentation on this member.
  138. func (m *Member) Defrag() error {
  139. cli, err := m.CreateEtcdClient()
  140. if err != nil {
  141. return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
  142. }
  143. defer cli.Close()
  144. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
  145. _, err = cli.Defragment(ctx, m.EtcdClientEndpoint)
  146. cancel()
  147. return err
  148. }
  149. // RevHash fetches current revision and hash on this member.
  150. func (m *Member) RevHash() (int64, int64, error) {
  151. conn, err := m.DialEtcdGRPCServer()
  152. if err != nil {
  153. return 0, 0, err
  154. }
  155. defer conn.Close()
  156. mt := pb.NewMaintenanceClient(conn)
  157. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  158. resp, err := mt.Hash(ctx, &pb.HashRequest{}, grpc.FailFast(false))
  159. cancel()
  160. if err != nil {
  161. return 0, 0, err
  162. }
  163. return resp.Header.Revision, int64(resp.Hash), nil
  164. }
  165. // Rev fetches current revision on this member.
  166. func (m *Member) Rev(ctx context.Context) (int64, error) {
  167. cli, err := m.CreateEtcdClient()
  168. if err != nil {
  169. return 0, fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
  170. }
  171. defer cli.Close()
  172. resp, err := cli.Status(ctx, m.EtcdClientEndpoint)
  173. if err != nil {
  174. return 0, err
  175. }
  176. return resp.Header.Revision, nil
  177. }
  178. // Compact compacts member storage with given revision.
  179. // It blocks until it's physically done.
  180. func (m *Member) Compact(rev int64, timeout time.Duration) error {
  181. cli, err := m.CreateEtcdClient()
  182. if err != nil {
  183. return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
  184. }
  185. defer cli.Close()
  186. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  187. _, err = cli.Compact(ctx, rev, clientv3.WithCompactPhysical())
  188. cancel()
  189. return err
  190. }
  191. // IsLeader returns true if this member is the current cluster leader.
  192. func (m *Member) IsLeader() (bool, error) {
  193. cli, err := m.CreateEtcdClient()
  194. if err != nil {
  195. return false, fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
  196. }
  197. defer cli.Close()
  198. resp, err := cli.Status(context.Background(), m.EtcdClientEndpoint)
  199. if err != nil {
  200. return false, err
  201. }
  202. return resp.Header.MemberId == resp.Leader, nil
  203. }
  204. // WriteHealthKey writes a health key to this member.
  205. func (m *Member) WriteHealthKey() error {
  206. cli, err := m.CreateEtcdClient()
  207. if err != nil {
  208. return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
  209. }
  210. defer cli.Close()
  211. // give enough time-out in case expensive requests (range/delete) are pending
  212. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  213. _, err = cli.Put(ctx, "health", "good")
  214. cancel()
  215. if err != nil {
  216. return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
  217. }
  218. return nil
  219. }
  220. // SaveSnapshot downloads a snapshot file from this member, locally.
  221. // It's meant to requested remotely, so that local member can store
  222. // snapshot file on local disk.
  223. func (m *Member) SaveSnapshot(lg *zap.Logger) (err error) {
  224. // remove existing snapshot first
  225. if err = os.RemoveAll(m.SnapshotPath); err != nil {
  226. return err
  227. }
  228. var ccfg *clientv3.Config
  229. ccfg, err = m.CreateEtcdClientConfig()
  230. if err != nil {
  231. return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
  232. }
  233. lg.Info(
  234. "snapshot save START",
  235. zap.String("member-name", m.Etcd.Name),
  236. zap.Strings("member-client-urls", m.Etcd.AdvertiseClientURLs),
  237. zap.String("snapshot-path", m.SnapshotPath),
  238. )
  239. now := time.Now()
  240. mgr := snapshot.NewV3(lg)
  241. if err = mgr.Save(context.Background(), *ccfg, m.SnapshotPath); err != nil {
  242. return err
  243. }
  244. took := time.Since(now)
  245. var fi os.FileInfo
  246. fi, err = os.Stat(m.SnapshotPath)
  247. if err != nil {
  248. return err
  249. }
  250. var st snapshot.Status
  251. st, err = mgr.Status(m.SnapshotPath)
  252. if err != nil {
  253. return err
  254. }
  255. m.SnapshotInfo = &SnapshotInfo{
  256. MemberName: m.Etcd.Name,
  257. MemberClientURLs: m.Etcd.AdvertiseClientURLs,
  258. SnapshotPath: m.SnapshotPath,
  259. SnapshotFileSize: humanize.Bytes(uint64(fi.Size())),
  260. SnapshotTotalSize: humanize.Bytes(uint64(st.TotalSize)),
  261. SnapshotTotalKey: int64(st.TotalKey),
  262. SnapshotHash: int64(st.Hash),
  263. SnapshotRevision: st.Revision,
  264. Took: fmt.Sprintf("%v", took),
  265. }
  266. lg.Info(
  267. "snapshot save END",
  268. zap.String("member-name", m.SnapshotInfo.MemberName),
  269. zap.Strings("member-client-urls", m.SnapshotInfo.MemberClientURLs),
  270. zap.String("snapshot-path", m.SnapshotPath),
  271. zap.String("snapshot-file-size", m.SnapshotInfo.SnapshotFileSize),
  272. zap.String("snapshot-total-size", m.SnapshotInfo.SnapshotTotalSize),
  273. zap.Int64("snapshot-total-key", m.SnapshotInfo.SnapshotTotalKey),
  274. zap.Int64("snapshot-hash", m.SnapshotInfo.SnapshotHash),
  275. zap.Int64("snapshot-revision", m.SnapshotInfo.SnapshotRevision),
  276. zap.String("took", m.SnapshotInfo.Took),
  277. )
  278. return nil
  279. }
  280. // RestoreSnapshot restores a cluster from a given snapshot file on disk.
  281. // It's meant to requested remotely, so that local member can load the
  282. // snapshot file from local disk.
  283. func (m *Member) RestoreSnapshot(lg *zap.Logger) (err error) {
  284. if err = os.RemoveAll(m.EtcdOnSnapshotRestore.DataDir); err != nil {
  285. return err
  286. }
  287. if err = os.RemoveAll(m.EtcdOnSnapshotRestore.WALDir); err != nil {
  288. return err
  289. }
  290. lg.Info(
  291. "snapshot restore START",
  292. zap.String("member-name", m.Etcd.Name),
  293. zap.Strings("member-client-urls", m.Etcd.AdvertiseClientURLs),
  294. zap.String("snapshot-path", m.SnapshotPath),
  295. )
  296. now := time.Now()
  297. mgr := snapshot.NewV3(lg)
  298. err = mgr.Restore(snapshot.RestoreConfig{
  299. SnapshotPath: m.SnapshotInfo.SnapshotPath,
  300. Name: m.EtcdOnSnapshotRestore.Name,
  301. OutputDataDir: m.EtcdOnSnapshotRestore.DataDir,
  302. OutputWALDir: m.EtcdOnSnapshotRestore.WALDir,
  303. PeerURLs: m.EtcdOnSnapshotRestore.AdvertisePeerURLs,
  304. InitialCluster: m.EtcdOnSnapshotRestore.InitialCluster,
  305. InitialClusterToken: m.EtcdOnSnapshotRestore.InitialClusterToken,
  306. SkipHashCheck: false,
  307. // TODO: set SkipHashCheck it true, to recover from existing db file
  308. })
  309. took := time.Since(now)
  310. lg.Info(
  311. "snapshot restore END",
  312. zap.String("member-name", m.SnapshotInfo.MemberName),
  313. zap.Strings("member-client-urls", m.SnapshotInfo.MemberClientURLs),
  314. zap.String("snapshot-path", m.SnapshotPath),
  315. zap.String("snapshot-file-size", m.SnapshotInfo.SnapshotFileSize),
  316. zap.String("snapshot-total-size", m.SnapshotInfo.SnapshotTotalSize),
  317. zap.Int64("snapshot-total-key", m.SnapshotInfo.SnapshotTotalKey),
  318. zap.Int64("snapshot-hash", m.SnapshotInfo.SnapshotHash),
  319. zap.Int64("snapshot-revision", m.SnapshotInfo.SnapshotRevision),
  320. zap.String("took", took.String()),
  321. zap.Error(err),
  322. )
  323. return err
  324. }