config.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. package config
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io/ioutil"
  7. "os"
  8. "strings"
  9. "time"
  10. "github.com/fsnotify/fsnotify"
  11. "go.etcd.io/etcd/client"
  12. )
  13. var Conf *Configure
  14. const ConfigPath = "conf/common.json"
  15. // mysql 配置
  16. type MysqlConfig struct {
  17. User string `json:"user"`
  18. Password string `json:"password"`
  19. Addr string `json:"addr"`
  20. DefaultDB string `json:"default_db"`
  21. Charset string `json:"charset"`
  22. MaxIdle json.Number `json:"max_idle"`
  23. MaxConn json.Number `json:"max_conn"`
  24. ServiceName string `json:"service_name"`
  25. ServicePort int `json:"service_port"`
  26. }
  27. // redis 配置
  28. type RedisConfig struct {
  29. Addrs string `json:"addrs"`
  30. Password string `json:"password"`
  31. DefaultDB json.Number `json:"default_db"`
  32. PoolSize json.Number `json:"pool_size"`
  33. MinIdleConns json.Number `json:"min_idle_conns"`
  34. MaxRetries json.Number `json:"max_retries"`
  35. IsCluster string `json:"is_cluster"`
  36. }
  37. type ElasticConfig struct {
  38. Addr string `json:"addr"`
  39. Sniff string `json:"sniff"`
  40. }
  41. type WarningConfig struct {
  42. ApiTotalCountThresholds string `json:"api_total_count_thresholds"`
  43. ApiDayCountThresholds string `json:"api_day_count_thresholds"`
  44. ApiDayThresholds string `json:"api_day_thresholds"`
  45. ProviderDayCountThresholds string `json:"provider_day_count_thresholds"`
  46. DefaultMails string `json:"default_mails"`
  47. MailHost string `json:"mail_host"`
  48. MailPassword string `json:"mail_password"`
  49. MailUser string `json:"mail_user"`
  50. ApiLogAvgElapsed string `json:"api_log_avg_elapsed"`
  51. ApiLogFailThreshold string `json:"api_log_fail_threshold"`
  52. ApiLogNorecordThreshold string `json:"api_log_norecord_threshold"`
  53. ProviderLogAvgElapsed string `json:"provider_log_avg_elapsed"`
  54. ProviderLogFailThreshold string `json:"provider_log_fail_threshold"`
  55. ProviderLogNorecordThreshold string `json:"provider_log_norecord_threshold"`
  56. LogTimeInterval string `json:"log_time_interval"`
  57. }
  58. type LogConfig struct {
  59. MaxSize json.Number `json:"max_size"`
  60. MaxBackups json.Number `json:"max_backups"`
  61. MaxAge json.Number `json:"max_age"`
  62. Level string `json:"level"`
  63. DisableStacktrace string `json:"disable_stacktrace"`
  64. }
  65. type RPCNode struct {
  66. Scheme string `json:"scheme"`
  67. Name string `json:"name"`
  68. UpdateInterval json.Number `json:"update_interval"`
  69. MysqlDB string `json:"mysql_db"`
  70. RedisDB json.Number `json:"redis_db"`
  71. Log LogConfig `json:"log"`
  72. ServiceName string `json:"service_name"`
  73. ServicePort json.Number `json:"service_port"`
  74. }
  75. type RPCConfig struct {
  76. BasePath string `json:"base_path"`
  77. Statistics RPCNode `json:"gd_statistics"`
  78. Management RPCNode `json:"gd_management"`
  79. Vehicle RPCNode `json:"gd_vehicle"`
  80. }
  81. type MongoConfig struct {
  82. User string `json:"user"`
  83. Password string `json:"password"`
  84. Addr string `json:"addr"`
  85. }
  86. type Configure struct {
  87. RunMode string `json:"run_mode"`
  88. AppKey string `json:"app_key"`
  89. AppSecret string `json:"app_secret"`
  90. AutoExportMail string `json:"auto_export_mail"`
  91. AutoExportProviderMail string `json:"auto_export_provider_mail"`
  92. LogMysql MysqlConfig `json:"log_mysql"`
  93. Redis RedisConfig `json:"redis"`
  94. Elastic ElasticConfig `json:"elastic"`
  95. //Mongo MongoConfig `json:"mongo"`
  96. Rpc RPCConfig `json:"rpc"`
  97. Warning WarningConfig `json:warning`
  98. }
  99. func watchEtcd(keysAPI client.KeysAPI, index uint64, runmode, key string, cli client.Client) {
  100. basePath := fmt.Sprintf("/%s/config", runmode)
  101. watcherOptions := &client.WatcherOptions{
  102. AfterIndex: index,
  103. Recursive: true,
  104. }
  105. watcher := keysAPI.Watcher(basePath, watcherOptions)
  106. for {
  107. r, err := watcher.Next(context.Background())
  108. if err != nil || r == nil {
  109. break
  110. }
  111. if r.Node != nil && r.PrevNode != nil {
  112. if r.Node.Key == r.PrevNode.Key && (r.Node.Value != r.PrevNode.Value) {
  113. reloadEtcd(runmode, key, cli)
  114. }
  115. }
  116. if r.Node != nil && r.PrevNode == nil {
  117. reloadEtcd(runmode, key, cli)
  118. }
  119. }
  120. }
  121. func reloadEtcd(runmode, key string, cli client.Client) {
  122. keysAPI := client.NewKeysAPI(cli)
  123. basePath := fmt.Sprintf("/%s/config", runmode)
  124. if resp, err := keysAPI.Get(context.Background(), basePath, &client.GetOptions{
  125. Recursive: true,
  126. }); err == nil && resp != nil && resp.Node != nil {
  127. conf := &Configure{}
  128. value := getNodeData(key, resp.Node)
  129. if jsonStr, err := json.Marshal(value); err == nil {
  130. if err := json.Unmarshal(jsonStr, &conf); err == nil {
  131. *Conf = *conf
  132. return
  133. } else {
  134. fmt.Printf("json Unmarshal failed. error:%s", err)
  135. }
  136. } else {
  137. fmt.Printf("json Marshal failed. error:%s", err)
  138. }
  139. } else {
  140. fmt.Printf("get %s failed. error:%s", basePath, err)
  141. }
  142. }
  143. // key 为加密密钥
  144. func GetConfig(runmode, key string, cli client.Client) *Configure {
  145. keysAPI := client.NewKeysAPI(cli)
  146. basePath := fmt.Sprintf("/%s/config", runmode)
  147. if resp, err := keysAPI.Get(context.Background(), basePath, &client.GetOptions{
  148. Recursive: true,
  149. }); err == nil && resp != nil && resp.Node != nil {
  150. Conf = &Configure{}
  151. value := getNodeData(key, resp.Node)
  152. if jsonStr, err := json.Marshal(value); err == nil {
  153. if err := json.Unmarshal(jsonStr, &Conf); err == nil {
  154. go watchEtcd(keysAPI, resp.Index, runmode, key, cli)
  155. return Conf
  156. } else {
  157. fmt.Printf("json Unmarshal failed. error:%s", err)
  158. }
  159. } else {
  160. fmt.Printf("json Marshal failed. error:%s", err)
  161. }
  162. } else {
  163. fmt.Printf("get %s failed. error:%s", basePath, err)
  164. os.Exit(1)
  165. }
  166. return nil
  167. }
  168. // 递归取出node的叶子节点值
  169. func getNodeData(key string, head *client.Node) (value interface{}) {
  170. s0 := strings.Split(head.Key, "/")
  171. len0 := len(s0)
  172. if len0 == 0 {
  173. return
  174. }
  175. if head.Dir {
  176. mapData := map[string]interface{}{}
  177. for _, node := range head.Nodes {
  178. s1 := strings.Split(node.Key, "/")
  179. len1 := len(s1)
  180. if len1 == 0 {
  181. break
  182. }
  183. mapData[s1[len1-1]] = getNodeData(key, node)
  184. }
  185. value = mapData
  186. } else {
  187. if key != "" && head.Value != "" {
  188. if bytesData, err := Base64URLDecode(head.Value); err != nil {
  189. fmt.Printf("Base64URLDecode(%s) failed. error:%s", head.Value, err)
  190. os.Exit(1)
  191. } else {
  192. if data, err := AesDecrypt(bytesData, []byte(key)); err != nil {
  193. fmt.Printf("AesDecrypt failed. error:%s", err)
  194. os.Exit(1)
  195. } else {
  196. value = string(data)
  197. }
  198. }
  199. } else {
  200. // 无加密,直接取值
  201. value = head.Value
  202. }
  203. }
  204. return
  205. }
  206. // 适配k8s 方式
  207. // 公共配置会以configmap的方式映射到容器的conf/common.json中
  208. func GetConfigForK8s() *Configure {
  209. buffer, err := ioutil.ReadFile(ConfigPath)
  210. if err != nil {
  211. fmt.Printf("get %s failed. error:%s", ConfigPath, err)
  212. return nil
  213. }
  214. Conf = &Configure{}
  215. if err := json.Unmarshal(buffer, Conf); err != nil {
  216. fmt.Printf("json Unmarshal failed. error:%s", err)
  217. return nil
  218. }
  219. go watchConfigFileForK8s()
  220. return Conf
  221. }
  222. func ReloadConfigForK8s() {
  223. buffer, err := ioutil.ReadFile(ConfigPath)
  224. if err != nil {
  225. fmt.Printf("get %s failed. error:%s", ConfigPath, err)
  226. }
  227. confTmp := &Configure{}
  228. if err := json.Unmarshal(buffer, confTmp); err != nil {
  229. fmt.Printf("json Unmarshal failed. error:%s", err)
  230. }
  231. Conf = confTmp
  232. }
  233. // 判断路径文件/文件夹是否存在
  234. func Exists(path string) bool {
  235. _, err := os.Stat(path)
  236. if err != nil {
  237. if os.IsExist(err) {
  238. return true
  239. }
  240. return false
  241. }
  242. return true
  243. }
  244. func watchConfigFileForK8s() {
  245. fileExist := true
  246. watch, err := fsnotify.NewWatcher()
  247. if err != nil {
  248. fmt.Printf("new file watcher failed\n\n")
  249. os.Exit(1)
  250. }
  251. defer watch.Close()
  252. /*err = watch.Add(ConfigPath)
  253. if err != nil {
  254. fmt.Printf("add file watcher failed\n\n")
  255. os.Exit(1)
  256. }*/
  257. for {
  258. // 判断文件是否存在
  259. if !Exists(ConfigPath) {
  260. time.Sleep(10 * time.Second)
  261. fileExist = false
  262. continue
  263. } else {
  264. watch.Remove(ConfigPath)
  265. watch.Add(ConfigPath)
  266. if !fileExist { // 文件重新创建
  267. ReloadConfigForK8s()
  268. }
  269. fileExist = true
  270. }
  271. select {
  272. case ev := <-watch.Events:
  273. {
  274. fmt.Println("op : ", ev.Op)
  275. ReloadConfigForK8s()
  276. }
  277. case err := <-watch.Errors:
  278. {
  279. fmt.Println("error : ", err)
  280. continue
  281. }
  282. }
  283. }
  284. }