config.go 7.8 KB


  1. package config
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io/ioutil"
  7. "os"
  8. "strings"
  9. "time"
  10. "github.com/fsnotify/fsnotify"
  11. "go.etcd.io/etcd/client"
  12. )
  13. var Conf *Configure
  14. const ConfigPath = "conf/common.json"
  15. // mysql 配置
  16. type MysqlConfig struct {
  17. User string `json:"user"`
  18. Password string `json:"password"`
  19. Addr string `json:"addr"`
  20. DefaultDB string `json:"default_db"`
  21. Charset string `json:"charset"`
  22. MaxIdle json.Number `json:"max_idle"`
  23. MaxConn json.Number `json:"max_conn"`
  24. }
  25. type MongoConfig struct {
  26. User string `json:"user"`
  27. Password string `json:"password"`
  28. Addr string `json:"addr"`
  29. }
  30. // redis 配置
  31. type RedisConfig struct {
  32. Addrs string `json:"addrs"`
  33. Password string `json:"password"`
  34. DefaultDB json.Number `json:"default_db"`
  35. PoolSize json.Number `json:"pool_size"`
  36. MinIdleConns json.Number `json:"min_idle_conns"`
  37. MaxRetries json.Number `json:"max_retries"`
  38. IsCluster string `json:"is_cluster"`
  39. }
  40. type ElasticConfig struct {
  41. Addr string `json:"addr"`
  42. Sniff string `json:"sniff"`
  43. }
  44. type LogConfig struct {
  45. MaxSize json.Number `json:"max_size"`
  46. MaxBackups json.Number `json:"max_backups"`
  47. MaxAge json.Number `json:"max_age"`
  48. Level string `json:"level"`
  49. DisableStacktrace string `json:"disable_stacktrace"`
  50. }
  51. type RPCNode struct {
  52. Scheme string `json:"scheme"`
  53. Name string `json:"name"`
  54. UpdateInterval json.Number `json:"update_interval"`
  55. MysqlDB string `json:"mysql_db"`
  56. RedisDB json.Number `json:"redis_db"`
  57. Log LogConfig `json:"log"`
  58. ServiceName string `json:"service_name"`
  59. ServicePort json.Number `json:"service_port"`
  60. }
  61. type RPCConfig struct {
  62. BasePath string `json:"base_path"`
  63. Vehicle RPCNode `json:"gd_vehicle"`
  64. CarParts RPCNode `json:"gd_car_parts"`
  65. AuthCheck RPCNode `json:"gd_auth_check"`
  66. AdmData RPCNode `json:"gd_adm_data"`
  67. Service RPCNode `json:"gd_service"`
  68. VehicleAccessories RPCNode `json:"gd_vehicle_accessories"`
  69. }
  70. type ThirdPartConfig struct {
  71. HystrixWebhook string `json:"hystrix_webhook"`
  72. HystrixPublishChannel string `json:"hystrix_publish_channel"`
  73. CdbdClientId string `json:"cdbd_client_id"`
  74. CdbdSecret string `json:"cdbd_secret"`
  75. DyKey string `json:"dy_key"`
  76. DyUsername string `json:"dy_username"`
  77. ZrUsername string `json:"zr_username"`
  78. ZrPassword string `json:"zr_password"`
  79. }
  80. type Configure struct {
  81. RunMode string `json:"run_mode"`
  82. AppKey string `json:"app_key"`
  83. AppSecret string `json:"app_secret"`
  84. Mysql MysqlConfig `json:"mysql"`
  85. //Mongo MongoConfig `json:"mongo"`
  86. Redis RedisConfig `json:"redis"`
  87. Elastic ElasticConfig `json:"elastic"`
  88. ThirdPart ThirdPartConfig `json:"third_part"`
  89. Rpc RPCConfig `json:"rpc"`
  90. NoNeedVehicleInfoMerchant string `json:"no_need_vehicle_info_merchant"`
  91. }
  92. func watchEtcd(keysAPI client.KeysAPI, index uint64, runmode, key string, cli client.Client) {
  93. basePath := fmt.Sprintf("/%s/config", runmode)
  94. watcherOptions := &client.WatcherOptions{
  95. AfterIndex: index,
  96. Recursive: true,
  97. }
  98. watcher := keysAPI.Watcher(basePath, watcherOptions)
  99. for {
  100. r, err := watcher.Next(context.Background())
  101. if err != nil || r == nil {
  102. break
  103. }
  104. if r.Node != nil && r.PrevNode != nil {
  105. if r.Node.Key == r.PrevNode.Key && (r.Node.Value != r.PrevNode.Value) {
  106. reloadEtcd(runmode, key, cli)
  107. }
  108. }
  109. if r.Node != nil && r.PrevNode == nil {
  110. reloadEtcd(runmode, key, cli)
  111. }
  112. }
  113. }
  114. func reloadEtcd(runmode, key string, cli client.Client) {
  115. keysAPI := client.NewKeysAPI(cli)
  116. basePath := fmt.Sprintf("/%s/config", runmode)
  117. if resp, err := keysAPI.Get(context.Background(), basePath, &client.GetOptions{
  118. Recursive: true,
  119. }); err == nil && resp != nil && resp.Node != nil {
  120. conf := &Configure{}
  121. value := getNodeData(key, resp.Node)
  122. if jsonStr, err := json.Marshal(value); err == nil {
  123. if err := json.Unmarshal(jsonStr, &conf); err == nil {
  124. *Conf = *conf
  125. return
  126. } else {
  127. fmt.Printf("json Unmarshal failed. error:%s", err)
  128. }
  129. } else {
  130. fmt.Printf("json Marshal failed. error:%s", err)
  131. }
  132. } else {
  133. fmt.Printf("get %s failed. error:%s", basePath, err)
  134. }
  135. }
  136. // key 为加密密钥
  137. func GetConfig(runmode, key string, cli client.Client) *Configure {
  138. keysAPI := client.NewKeysAPI(cli)
  139. basePath := fmt.Sprintf("/%s/config", runmode)
  140. if resp, err := keysAPI.Get(context.Background(), basePath, &client.GetOptions{
  141. Recursive: true,
  142. }); err == nil && resp != nil && resp.Node != nil {
  143. Conf = &Configure{}
  144. value := getNodeData(key, resp.Node)
  145. if jsonStr, err := json.Marshal(value); err == nil {
  146. if err := json.Unmarshal(jsonStr, &Conf); err == nil {
  147. go watchEtcd(keysAPI, resp.Index, runmode, key, cli)
  148. return Conf
  149. } else {
  150. fmt.Printf("json Unmarshal failed. error:%s", err)
  151. }
  152. } else {
  153. fmt.Printf("json Marshal failed. error:%s", err)
  154. }
  155. } else {
  156. fmt.Printf("get %s failed. error:%s", basePath, err)
  157. os.Exit(1)
  158. }
  159. return nil
  160. }
  161. // 递归取出node的叶子节点值
  162. func getNodeData(key string, head *client.Node) (value interface{}) {
  163. s0 := strings.Split(head.Key, "/")
  164. len0 := len(s0)
  165. if len0 == 0 {
  166. return
  167. }
  168. if head.Dir {
  169. mapData := map[string]interface{}{}
  170. for _, node := range head.Nodes {
  171. s1 := strings.Split(node.Key, "/")
  172. len1 := len(s1)
  173. if len1 == 0 {
  174. break
  175. }
  176. mapData[s1[len1-1]] = getNodeData(key, node)
  177. }
  178. value = mapData
  179. } else {
  180. if key != "" && head.Value != "" {
  181. if bytesData, err := Base64URLDecode(head.Value); err != nil {
  182. fmt.Printf("Base64URLDecode(%s) failed. error:%s", head.Value, err)
  183. os.Exit(1)
  184. } else {
  185. if data, err := AesDecrypt(bytesData, []byte(key)); err != nil {
  186. fmt.Printf("AesDecrypt failed. error:%s", err)
  187. os.Exit(1)
  188. } else {
  189. value = string(data)
  190. }
  191. }
  192. } else {
  193. // 无加密,直接取值
  194. value = head.Value
  195. }
  196. }
  197. return
  198. }
  199. // 适配k8s 方式
  200. // 公共配置会以configmap的方式映射到容器的conf/common.json中
  201. func GetConfigForK8s() *Configure {
  202. buffer, err := ioutil.ReadFile(ConfigPath)
  203. if err != nil {
  204. fmt.Printf("get %s failed. error:%s", ConfigPath, err)
  205. return nil
  206. }
  207. Conf = &Configure{}
  208. if err := json.Unmarshal(buffer, Conf); err != nil {
  209. fmt.Printf("json Unmarshal failed. error:%s", err)
  210. return nil
  211. }
  212. go watchConfigFileForK8s()
  213. return Conf
  214. }
  215. func ReloadConfigForK8s() {
  216. buffer, err := ioutil.ReadFile(ConfigPath)
  217. if err != nil {
  218. fmt.Printf("get %s failed. error:%s", ConfigPath, err)
  219. }
  220. confTmp := &Configure{}
  221. if err := json.Unmarshal(buffer, confTmp); err != nil {
  222. fmt.Printf("json Unmarshal failed. error:%s", err)
  223. }
  224. Conf = confTmp
  225. }
  226. // 判断路径文件/文件夹是否存在
  227. func Exists(path string) bool {
  228. _, err := os.Stat(path)
  229. if err != nil {
  230. if os.IsExist(err) {
  231. return true
  232. }
  233. return false
  234. }
  235. return true
  236. }
  237. func watchConfigFileForK8s() {
  238. fileExist := true
  239. watch, err := fsnotify.NewWatcher()
  240. if err != nil {
  241. fmt.Printf("new file watcher failed\n\n")
  242. os.Exit(1)
  243. }
  244. defer watch.Close()
  245. /*err = watch.Add(ConfigPath)
  246. if err != nil {
  247. fmt.Printf("add file watcher failed\n\n")
  248. os.Exit(1)
  249. }*/
  250. for {
  251. // 判断文件是否存在
  252. if !Exists(ConfigPath) {
  253. time.Sleep(10 * time.Second)
  254. fileExist = false
  255. continue
  256. } else {
  257. watch.Remove(ConfigPath)
  258. watch.Add(ConfigPath)
  259. if !fileExist { // 文件重新创建
  260. ReloadConfigForK8s()
  261. }
  262. fileExist = true
  263. }
  264. select {
  265. case ev := <-watch.Events:
  266. {
  267. fmt.Println("op : ", ev.Op)
  268. ReloadConfigForK8s()
  269. }
  270. case err := <-watch.Errors:
  271. {
  272. fmt.Println("error : ", err)
  273. continue
  274. }
  275. }
  276. }
  277. }