config.go 7.8 KB


  1. package config
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io/ioutil"
  7. "os"
  8. "strings"
  9. "time"
  10. "github.com/fsnotify/fsnotify"
  11. "go.etcd.io/etcd/client"
  12. )
  13. var Conf *Configure
  14. const ConfigPath = "conf/common.json"
  15. // mysql 配置
  16. type MysqlConfig struct {
  17. User string `json:"user"`
  18. Password string `json:"password"`
  19. Addr string `json:"addr"`
  20. DefaultDB string `json:"default_db"`
  21. Charset string `json:"charset"`
  22. MaxIdle json.Number `json:"max_idle"`
  23. MaxConn json.Number `json:"max_conn"`
  24. }
  25. type MongoConfig struct {
  26. User string `json:"user"`
  27. Password string `json:"password"`
  28. Addr string `json:"addr"`
  29. }
  30. // redis 配置
  31. type RedisConfig struct {
  32. Addrs string `json:"addrs"`
  33. Password string `json:"password"`
  34. DefaultDB json.Number `json:"default_db"`
  35. PoolSize json.Number `json:"pool_size"`
  36. MinIdleConns json.Number `json:"min_idle_conns"`
  37. MaxRetries json.Number `json:"max_retries"`
  38. IsCluster string `json:"is_cluster"`
  39. }
  40. type ElasticConfig struct {
  41. Addr string `json:"addr"`
  42. Sniff string `json:"sniff"`
  43. }
  44. type LogConfig struct {
  45. MaxSize json.Number `json:"max_size"`
  46. MaxBackups json.Number `json:"max_backups"`
  47. MaxAge json.Number `json:"max_age"`
  48. Level string `json:"level"`
  49. DisableStacktrace string `json:"disable_stacktrace"`
  50. }
  51. type RPCNode struct {
  52. Scheme string `json:"scheme"`
  53. Name string `json:"name"`
  54. UpdateInterval json.Number `json:"update_interval"`
  55. MysqlDB string `json:"mysql_db"`
  56. RedisDB json.Number `json:"redis_db"`
  57. Log LogConfig `json:"log"`
  58. ServiceName string `json:"service_name"`
  59. ServicePort json.Number `json:"service_port"`
  60. }
  61. type RPCConfig struct {
  62. BasePath string `json:"base_path"`
  63. Service RPCNode `json:"gd_service"`
  64. AuthCheck RPCNode `json:"gd_auth_check"`
  65. AdmData RPCNode `json:"gd_adm_data"`
  66. }
  67. type ThirdPartConfig struct {
  68. HystrixWebhook string `json:"hystrix_webhook"`
  69. HystrixPublishChannel string `json:"hystrix_publish_channel"`
  70. CdbdClientId string `json:"cdbd_client_id"`
  71. CdbdSecret string `json:"cdbd_secret"`
  72. DyKey string `json:"dy_key"`
  73. DyUsername string `json:"dy_username"`
  74. ZrUsername string `json:"zr_username"`
  75. ZrPassword string `json:"zr_password"`
  76. DybdTokenUrl string `json:"dybd_token_url"`
  77. DybdXlAppkey string `json:"dybd_xl_appkey"`
  78. DybdXwAppkey string `json:"dybd_xw_appkey"`
  79. DybdXlAllAppkey string `json:"dybd_xl_all_appkey"`
  80. }
  81. type Configure struct {
  82. RunMode string `json:"run_mode"`
  83. AppKey string `json:"app_key"`
  84. AppSecret string `json:"app_secret"`
  85. Mysql MysqlConfig `json:"mysql"`
  86. //Mongo MongoConfig `json:"mongo"`
  87. Redis RedisConfig `json:"redis"`
  88. Elastic ElasticConfig `json:"elastic"`
  89. ThirdPart ThirdPartConfig `json:"third_part"`
  90. Rpc RPCConfig `json:"rpc"`
  91. NoNeedVehicleInfoMerchant string `json:"no_need_vehicle_info_merchant"`
  92. }
  93. func watchEtcd(keysAPI client.KeysAPI, index uint64, runmode, key string, cli client.Client) {
  94. basePath := fmt.Sprintf("/%s/config", runmode)
  95. watcherOptions := &client.WatcherOptions{
  96. AfterIndex: index,
  97. Recursive: true,
  98. }
  99. watcher := keysAPI.Watcher(basePath, watcherOptions)
  100. for {
  101. r, err := watcher.Next(context.Background())
  102. if err != nil || r == nil {
  103. break
  104. }
  105. if r.Node != nil && r.PrevNode != nil {
  106. if r.Node.Key == r.PrevNode.Key && (r.Node.Value != r.PrevNode.Value) {
  107. reloadEtcd(runmode, key, cli)
  108. }
  109. }
  110. if r.Node != nil && r.PrevNode == nil {
  111. reloadEtcd(runmode, key, cli)
  112. }
  113. }
  114. }
  115. func reloadEtcd(runmode, key string, cli client.Client) {
  116. keysAPI := client.NewKeysAPI(cli)
  117. basePath := fmt.Sprintf("/%s/config", runmode)
  118. if resp, err := keysAPI.Get(context.Background(), basePath, &client.GetOptions{
  119. Recursive: true,
  120. }); err == nil && resp != nil && resp.Node != nil {
  121. conf := &Configure{}
  122. value := getNodeData(key, resp.Node)
  123. if jsonStr, err := json.Marshal(value); err == nil {
  124. if err := json.Unmarshal(jsonStr, &conf); err == nil {
  125. *Conf = *conf
  126. return
  127. } else {
  128. fmt.Printf("json Unmarshal failed. error:%s", err)
  129. }
  130. } else {
  131. fmt.Printf("json Marshal failed. error:%s", err)
  132. }
  133. } else {
  134. fmt.Printf("get %s failed. error:%s", basePath, err)
  135. }
  136. }
  137. // key 为加密密钥
  138. func GetConfig(runmode, key string, cli client.Client) *Configure {
  139. keysAPI := client.NewKeysAPI(cli)
  140. basePath := fmt.Sprintf("/%s/config", runmode)
  141. if resp, err := keysAPI.Get(context.Background(), basePath, &client.GetOptions{
  142. Recursive: true,
  143. }); err == nil && resp != nil && resp.Node != nil {
  144. Conf = &Configure{}
  145. value := getNodeData(key, resp.Node)
  146. if jsonStr, err := json.Marshal(value); err == nil {
  147. if err := json.Unmarshal(jsonStr, &Conf); err == nil {
  148. go watchEtcd(keysAPI, resp.Index, runmode, key, cli)
  149. return Conf
  150. } else {
  151. fmt.Printf("json Unmarshal failed. error:%s", err)
  152. }
  153. } else {
  154. fmt.Printf("json Marshal failed. error:%s", err)
  155. }
  156. } else {
  157. fmt.Printf("get %s failed. error:%s", basePath, err)
  158. os.Exit(1)
  159. }
  160. return nil
  161. }
  162. // 递归取出node的叶子节点值
  163. func getNodeData(key string, head *client.Node) (value interface{}) {
  164. s0 := strings.Split(head.Key, "/")
  165. len0 := len(s0)
  166. if len0 == 0 {
  167. return
  168. }
  169. if head.Dir {
  170. mapData := map[string]interface{}{}
  171. for _, node := range head.Nodes {
  172. s1 := strings.Split(node.Key, "/")
  173. len1 := len(s1)
  174. if len1 == 0 {
  175. break
  176. }
  177. mapData[s1[len1-1]] = getNodeData(key, node)
  178. }
  179. value = mapData
  180. } else {
  181. if key != "" && head.Value != "" {
  182. if bytesData, err := Base64URLDecode(head.Value); err != nil {
  183. fmt.Printf("Base64URLDecode(%s) failed. error:%s", head.Value, err)
  184. os.Exit(1)
  185. } else {
  186. if data, err := AesDecrypt(bytesData, []byte(key)); err != nil {
  187. fmt.Printf("AesDecrypt failed. error:%s", err)
  188. os.Exit(1)
  189. } else {
  190. value = string(data)
  191. }
  192. }
  193. } else {
  194. // 无加密,直接取值
  195. value = head.Value
  196. }
  197. }
  198. return
  199. }
  200. // 适配k8s 方式
  201. // 公共配置会以configmap的方式映射到容器的conf/common.json中
  202. func GetConfigForK8s() *Configure {
  203. buffer, err := ioutil.ReadFile(ConfigPath)
  204. if err != nil {
  205. fmt.Printf("get %s failed. error:%s", ConfigPath, err)
  206. return nil
  207. }
  208. Conf = &Configure{}
  209. if err := json.Unmarshal(buffer, Conf); err != nil {
  210. fmt.Printf("json Unmarshal failed. error:%s", err)
  211. return nil
  212. }
  213. go watchConfigFileForK8s()
  214. return Conf
  215. }
  216. func ReloadConfigForK8s() {
  217. buffer, err := ioutil.ReadFile(ConfigPath)
  218. if err != nil {
  219. fmt.Printf("get %s failed. error:%s", ConfigPath, err)
  220. }
  221. confTmp := &Configure{}
  222. if err := json.Unmarshal(buffer, confTmp); err != nil {
  223. fmt.Printf("json Unmarshal failed. error:%s", err)
  224. }
  225. Conf = confTmp
  226. }
  227. // 判断路径文件/文件夹是否存在
  228. func Exists(path string) bool {
  229. _, err := os.Stat(path)
  230. if err != nil {
  231. if os.IsExist(err) {
  232. return true
  233. }
  234. return false
  235. }
  236. return true
  237. }
  238. func watchConfigFileForK8s() {
  239. fileExist := true
  240. watch, err := fsnotify.NewWatcher()
  241. if err != nil {
  242. fmt.Printf("new file watcher failed\n\n")
  243. os.Exit(1)
  244. }
  245. defer watch.Close()
  246. /*err = watch.Add(ConfigPath)
  247. if err != nil {
  248. fmt.Printf("add file watcher failed\n\n")
  249. os.Exit(1)
  250. }*/
  251. for {
  252. // 判断文件是否存在
  253. if !Exists(ConfigPath) {
  254. time.Sleep(10 * time.Second)
  255. fileExist = false
  256. continue
  257. } else {
  258. watch.Remove(ConfigPath)
  259. watch.Add(ConfigPath)
  260. if !fileExist { // 文件重新创建
  261. ReloadConfigForK8s()
  262. }
  263. fileExist = true
  264. }
  265. select {
  266. case ev := <-watch.Events:
  267. {
  268. fmt.Println("op : ", ev.Op)
  269. ReloadConfigForK8s()
  270. }
  271. case err := <-watch.Errors:
  272. {
  273. fmt.Println("error : ", err)
  274. continue
  275. }
  276. }
  277. }
  278. }