config.go 7.6 KB


  1. package config
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io/ioutil"
  7. "os"
  8. "strings"
  9. "time"
  10. "github.com/fsnotify/fsnotify"
  11. "go.etcd.io/etcd/client"
  12. )
  13. var Conf *Configure
  14. const ConfigPath = "conf/common.json"
  15. // mysql 配置
  16. type MysqlConfig struct {
  17. User string `json:"user"`
  18. Password string `json:"password"`
  19. Addr string `json:"addr"`
  20. DefaultDB string `json:"default_db"`
  21. Charset string `json:"charset"`
  22. MaxIdle json.Number `json:"max_idle"`
  23. MaxConn json.Number `json:"max_conn"`
  24. }
  25. type MongoConfig struct {
  26. User string `json:"user"`
  27. Password string `json:"password"`
  28. Addr string `json:"addr"`
  29. }
  30. // redis 配置
  31. type RedisConfig struct {
  32. Addrs string `json:"addrs"`
  33. Password string `json:"password"`
  34. DefaultDB json.Number `json:"default_db"`
  35. PoolSize json.Number `json:"pool_size"`
  36. MinIdleConns json.Number `json:"min_idle_conns"`
  37. MaxRetries json.Number `json:"max_retries"`
  38. IsCluster string `json:"is_cluster"`
  39. }
  40. type ElasticConfig struct {
  41. Addr string `json:"addr"`
  42. Sniff string `json:"sniff"`
  43. }
  44. type LogConfig struct {
  45. MaxSize json.Number `json:"max_size"`
  46. MaxBackups json.Number `json:"max_backups"`
  47. MaxAge json.Number `json:"max_age"`
  48. Level string `json:"level"`
  49. DisableStacktrace string `json:"disable_stacktrace"`
  50. }
  51. type RPCNode struct {
  52. Scheme string `json:"scheme"`
  53. Name string `json:"name"`
  54. UpdateInterval json.Number `json:"update_interval"`
  55. MysqlDB string `json:"mysql_db"`
  56. RedisDB json.Number `json:"redis_db"`
  57. Log LogConfig `json:"log"`
  58. ServiceName string `json:"service_name"`
  59. ServicePort int `json:"service_port"`
  60. }
  61. type RPCConfig struct {
  62. BasePath string `json:"base_path"`
  63. Vehicle RPCNode `json:"gd_vehicle"`
  64. CarParts RPCNode `json:"gd_car_parts"`
  65. AuthCheck RPCNode `json:"gd_auth_check"`
  66. AdmData RPCNode `json:"gd_adm_data"`
  67. Service RPCNode `json:"gd_service"`
  68. VehicleAccessories RPCNode `json:"gd_vehicle_accessories"`
  69. }
  70. type ThirdPartConfig struct {
  71. DingTalkWebhook string `json:"ding_talk_webhook"`
  72. AdmAppkey string `json:"adm_appkey"`
  73. HystrixPublishChannel string `json:"hystrix_publish_channel"`
  74. }
  75. type Configure struct {
  76. RunMode string `json:"run_mode"`
  77. AppKey string `json:"app_key"`
  78. AppSecret string `json:"app_secret"`
  79. Mysql MysqlConfig `json:"mysql"`
  80. //Mongo MongoConfig `json:"mongo"`
  81. Redis RedisConfig `json:"redis"`
  82. Elastic ElasticConfig `json:"elastic"`
  83. ThirdPart ThirdPartConfig `json:"third_part"`
  84. Rpc RPCConfig `json:"rpc"`
  85. NoNeedVehicleInfoMerchant string `json:"no_need_vehicle_info_merchant"`
  86. }
  87. func watchEtcd(keysAPI client.KeysAPI, index uint64, runmode, key string, cli client.Client) {
  88. basePath := fmt.Sprintf("/%s/config", runmode)
  89. watcherOptions := &client.WatcherOptions{
  90. AfterIndex: index,
  91. Recursive: true,
  92. }
  93. watcher := keysAPI.Watcher(basePath, watcherOptions)
  94. for {
  95. r, err := watcher.Next(context.Background())
  96. if err != nil || r == nil {
  97. break
  98. }
  99. if r.Node != nil && r.PrevNode != nil {
  100. if r.Node.Key == r.PrevNode.Key && (r.Node.Value != r.PrevNode.Value) {
  101. reloadEtcd(runmode, key, cli)
  102. }
  103. }
  104. if r.Node != nil && r.PrevNode == nil {
  105. reloadEtcd(runmode, key, cli)
  106. }
  107. }
  108. }
  109. func reloadEtcd(runmode, key string, cli client.Client) {
  110. keysAPI := client.NewKeysAPI(cli)
  111. basePath := fmt.Sprintf("/%s/config", runmode)
  112. if resp, err := keysAPI.Get(context.Background(), basePath, &client.GetOptions{
  113. Recursive: true,
  114. }); err == nil && resp != nil && resp.Node != nil {
  115. conf := &Configure{}
  116. value := getNodeData(key, resp.Node)
  117. if jsonStr, err := json.Marshal(value); err == nil {
  118. if err := json.Unmarshal(jsonStr, &conf); err == nil {
  119. *Conf = *conf
  120. return
  121. } else {
  122. fmt.Printf("json Unmarshal failed. error:%s", err)
  123. }
  124. } else {
  125. fmt.Printf("json Marshal failed. error:%s", err)
  126. }
  127. } else {
  128. fmt.Printf("get %s failed. error:%s", basePath, err)
  129. }
  130. }
  131. // key 为加密密钥
  132. func GetConfig(runmode, key string, cli client.Client) *Configure {
  133. keysAPI := client.NewKeysAPI(cli)
  134. basePath := fmt.Sprintf("/%s/config", runmode)
  135. if resp, err := keysAPI.Get(context.Background(), basePath, &client.GetOptions{
  136. Recursive: true,
  137. }); err == nil && resp != nil && resp.Node != nil {
  138. Conf = &Configure{}
  139. value := getNodeData(key, resp.Node)
  140. if jsonStr, err := json.Marshal(value); err == nil {
  141. if err := json.Unmarshal(jsonStr, &Conf); err == nil {
  142. go watchEtcd(keysAPI, resp.Index, runmode, key, cli)
  143. return Conf
  144. } else {
  145. fmt.Printf("json Unmarshal failed. error:%s", err)
  146. }
  147. } else {
  148. fmt.Printf("json Marshal failed. error:%s", err)
  149. }
  150. } else {
  151. fmt.Printf("get %s failed. error:%s", basePath, err)
  152. os.Exit(1)
  153. }
  154. return nil
  155. }
  156. // 递归取出node的叶子节点值
  157. func getNodeData(key string, head *client.Node) (value interface{}) {
  158. s0 := strings.Split(head.Key, "/")
  159. len0 := len(s0)
  160. if len0 == 0 {
  161. return
  162. }
  163. if head.Dir {
  164. mapData := map[string]interface{}{}
  165. for _, node := range head.Nodes {
  166. s1 := strings.Split(node.Key, "/")
  167. len1 := len(s1)
  168. if len1 == 0 {
  169. break
  170. }
  171. mapData[s1[len1-1]] = getNodeData(key, node)
  172. }
  173. value = mapData
  174. } else {
  175. if key != "" && head.Value != "" {
  176. if bytesData, err := Base64URLDecode(head.Value); err != nil {
  177. fmt.Printf("Base64URLDecode(%s) failed. error:%s", head.Value, err)
  178. os.Exit(1)
  179. } else {
  180. if data, err := AesDecrypt(bytesData, []byte(key)); err != nil {
  181. fmt.Printf("AesDecrypt failed. error:%s", err)
  182. os.Exit(1)
  183. } else {
  184. value = string(data)
  185. }
  186. }
  187. } else {
  188. // 无加密,直接取值
  189. value = head.Value
  190. }
  191. }
  192. return
  193. }
  194. // 适配k8s 方式
  195. // 公共配置会以configmap的方式映射到容器的conf/common.json中
  196. func GetConfigForK8s() *Configure {
  197. buffer, err := ioutil.ReadFile(ConfigPath)
  198. if err != nil {
  199. fmt.Printf("get %s failed. error:%s", ConfigPath, err)
  200. return nil
  201. }
  202. Conf = &Configure{}
  203. if err := json.Unmarshal(buffer, Conf); err != nil {
  204. fmt.Printf("json Unmarshal failed. error:%s", err)
  205. return nil
  206. }
  207. go watchConfigFileForK8s()
  208. return Conf
  209. }
  210. func ReloadConfigForK8s() {
  211. buffer, err := ioutil.ReadFile(ConfigPath)
  212. if err != nil {
  213. fmt.Printf("get %s failed. error:%s", ConfigPath, err)
  214. }
  215. confTmp := &Configure{}
  216. if err := json.Unmarshal(buffer, confTmp); err != nil {
  217. fmt.Printf("json Unmarshal failed. error:%s", err)
  218. }
  219. Conf = confTmp
  220. }
  221. // 判断路径文件/文件夹是否存在
  222. func Exists(path string) bool {
  223. _, err := os.Stat(path)
  224. if err != nil {
  225. if os.IsExist(err) {
  226. return true
  227. }
  228. return false
  229. }
  230. return true
  231. }
  232. func watchConfigFileForK8s() {
  233. fileExist := true
  234. watch, err := fsnotify.NewWatcher()
  235. if err != nil {
  236. fmt.Printf("new file watcher failed\n\n")
  237. os.Exit(1)
  238. }
  239. defer watch.Close()
  240. /*err = watch.Add(ConfigPath)
  241. if err != nil {
  242. fmt.Printf("add file watcher failed\n\n")
  243. os.Exit(1)
  244. }*/
  245. for {
  246. // 判断文件是否存在
  247. if !Exists(ConfigPath) {
  248. time.Sleep(10 * time.Second)
  249. fileExist = false
  250. continue
  251. } else {
  252. watch.Remove(ConfigPath)
  253. watch.Add(ConfigPath)
  254. if !fileExist { // 文件重新创建
  255. ReloadConfigForK8s()
  256. }
  257. fileExist = true
  258. }
  259. select {
  260. case ev := <-watch.Events:
  261. {
  262. fmt.Println("op : ", ev.Op)
  263. ReloadConfigForK8s()
  264. }
  265. case err := <-watch.Errors:
  266. {
  267. fmt.Println("error : ", err)
  268. continue
  269. }
  270. }
  271. }
  272. }