hystrix_config.go 8.4 KB


  1. // Copyright 2019 getensh.com. All rights reserved.
  2. // Use of this source code is governed by getensh.com.
  3. package utils
  4. import (
  5. "encoding/json"
  6. "fmt"
  7. "gd_vehicle/consts"
  8. "gd_vehicle/errors"
  9. "gd_vehicle/thirdparty"
  10. "sync"
  11. "time"
  12. "github.com/tidwall/gjson"
  13. "go.uber.org/zap"
  14. "gd_vehicle/common.in/cache"
  15. "gd_vehicle/common.in/config"
  16. "github.com/astaxie/beego/orm"
  17. "github.com/afex/hystrix-go/hystrix"
  18. )
  19. const (
  20. HystrixClose = 0
  21. HystrixOpen = 1
  22. )
  23. type HystrixConfig struct {
  24. ProviderApiName string `json:"provider_api_name"`
  25. ProviderApiCode string `json:"provider_api_code"`
  26. MaxConcurrentRequests int `json:"max_concurrent_requests"`
  27. RequestVolumeThreshold int `json:"request_volume_threshold"`
  28. SleepWindow int `json:"sleep_window"`
  29. ErrorPercentThreshold int `json:"error_percent_threshold"`
  30. Period int `json:"period"` // 熔断统计周期
  31. HystrixStatus int `json:"hystrix_status"`
  32. IsOn int `json:"is_on"`
  33. LastSendTime int64 `json:"last_send_time"`
  34. Mutex sync.Mutex
  35. }
  36. var settingsMutex *sync.RWMutex
  37. var HystrixConfigMap map[string]*HystrixConfig
  38. var HystrixConfigStatusMap map[string]int
  39. // 构建熔断map
  40. func constructHystrixMap() map[string]*HystrixConfig {
  41. hystrixConfigMap := make(map[string]*HystrixConfig)
  42. hystrixConfigs := []HystrixConfig{}
  43. orm.NewOrm().Raw("select * from db_gd_management.t_gd_provider_api_hystrix where is_on=1").QueryRows(&hystrixConfigs)
  44. for index, _ := range hystrixConfigs {
  45. hystrixConfigMap[hystrixConfigs[index].ProviderApiCode] = &hystrixConfigs[index]
  46. /*if _, ok := HystrixConfigStatusMap[hystrixConfigs[index].ProviderApiCode]; !ok {
  47. HystrixConfigStatusMap[hystrixConfigs[index].ProviderApiCode] = 0
  48. }*/
  49. }
  50. return hystrixConfigMap
  51. }
  52. // 加载数据源熔断
  53. func ProviderApiHystrixLoad() error {
  54. //HystrixConfigStatusMap = make(map[string]int)
  55. HystrixConfigMap = constructHystrixMap()
  56. settingsMutex = &sync.RWMutex{}
  57. // 注册熔断
  58. for k, v := range HystrixConfigMap {
  59. fmt.Println("HystrixConfigMap:", v.ProviderApiCode, v.SleepWindow, v.RequestVolumeThreshold, v.ErrorPercentThreshold, v.Period)
  60. hystrix.ConfigureCommand(k, hystrix.CommandConfig{Timeout: int(20000000), //cmd的超时时间,一旦超时则返回失败
  61. MaxConcurrentRequests: v.MaxConcurrentRequests, //最大并发请求数
  62. RequestVolumeThreshold: v.RequestVolumeThreshold, //一个统计窗口10(修改为60s了)秒内请求数量。达到这个请求数量后才去判断是否要开启熔断
  63. SleepWindow: v.SleepWindow, //熔断发生后的等待恢复时间
  64. ErrorPercentThreshold: v.ErrorPercentThreshold, //失败占比
  65. Period: v.Period,
  66. })
  67. }
  68. go SubscribeHystrix()
  69. //go HystrixTimer()
  70. return nil
  71. }
  72. func CheckProviderApiHystrix(cmd string) (*HystrixConfig, bool) {
  73. settingsMutex.RLock()
  74. defer settingsMutex.RUnlock()
  75. v, ok := HystrixConfigMap[cmd]
  76. return v, ok
  77. }
  78. func CheckHystrixStatus(hystrixConfig *HystrixConfig, err error, response string) {
  79. now := time.Now().UnixNano() / 1e6
  80. if hystrixConfig.HystrixStatus == HystrixOpen {
  81. // 熔断状态开启时,判断是否超过了窗口时间,没超过不发送任何通知
  82. if now < (hystrixConfig.LastSendTime + int64(hystrixConfig.SleepWindow)) {
  83. return
  84. }
  85. }
  86. if err == nil {
  87. // 如果已开启熔断,错误为空表示恢复
  88. if hystrixConfig.HystrixStatus == HystrixOpen {
  89. hystrixConfig.HystrixStatus = HystrixClose
  90. content := fmt.Sprintf("数据源%s(%s)已恢复(%s)",
  91. hystrixConfig.ProviderApiName,
  92. hystrixConfig.ProviderApiCode,
  93. time.Now().Format(consts.DaySecLayout))
  94. RobotMsg(content)
  95. //updateHystrixMap(hystrixConfig)
  96. }
  97. } else if err == errors.VendorError {
  98. // 如果是三方错误,并且熔断开启,表示尝试调用数据源,任然不通
  99. if hystrixConfig.HystrixStatus == HystrixOpen {
  100. content := fmt.Sprintf("数据源%s(%s)未恢复(%s),数据源响应:%s",
  101. hystrixConfig.ProviderApiName,
  102. hystrixConfig.ProviderApiCode,
  103. time.Now().Format(consts.DaySecLayout),
  104. response)
  105. RobotMsg(content)
  106. }
  107. } else {
  108. // 出现熔断错误,判断本地熔断状态,如果是关闭,将熔断状态设置为开启,并发送钉钉
  109. hystrixConfig.Mutex.Lock()
  110. defer hystrixConfig.Mutex.Unlock()
  111. if hystrixConfig.HystrixStatus == HystrixClose {
  112. hystrixConfig.HystrixStatus = HystrixOpen
  113. // xxx数据源请求失败超过xx%,触发熔断(时间段),数据源返回值
  114. content := fmt.Sprintf("数据源%s(%s)请求失败超过百分之%d,触发熔断(%s)",
  115. hystrixConfig.ProviderApiName,
  116. hystrixConfig.ProviderApiCode,
  117. hystrixConfig.ErrorPercentThreshold,
  118. time.Now().Format(consts.DaySecLayout))
  119. RobotMsg(content)
  120. hystrixConfig.LastSendTime = now
  121. }
  122. }
  123. }
  124. type Text struct {
  125. Content string `json:"content"`
  126. }
  127. type At struct {
  128. AtMobiles []string `json:"atMobiles"`
  129. IsAtAll bool `json:"isAtAll"`
  130. }
  131. type TextMsg struct {
  132. MsgType string `json:"msgtype"`
  133. Text Text `json:"text"`
  134. At At `json:"at"`
  135. }
  136. func RobotMsg(content string) (err error) {
  137. body := map[string]interface{}{
  138. "msgtype": "text",
  139. "text": map[string]interface{}{"content": content},
  140. }
  141. bytes, _ := json.Marshal(body)
  142. h := thirdparty.HttpRequestWithHeadCommon{
  143. Method: "POST",
  144. Url: config.Conf.ThirdPart.HystrixWebhook,
  145. Body: bytes,
  146. TimeOut: 10 * time.Second,
  147. }
  148. bytes, err = h.Request()
  149. if err != nil {
  150. l.Error("func",
  151. zap.String("call", "RobotMsg"),
  152. zap.String("params", content),
  153. zap.String("error", err.Error()))
  154. return errors.VendorError
  155. }
  156. errcode := gjson.GetBytes(bytes, "errcode").Int()
  157. errmsg := gjson.GetBytes(bytes, "errmsg").String()
  158. if errcode != 0 {
  159. l.Error("func",
  160. zap.String("call", "RobotMsg"),
  161. zap.String("params", content),
  162. zap.String("error", errmsg))
  163. return errors.VendorError
  164. }
  165. return nil
  166. }
  167. func ReloadHystrix() {
  168. hystrixConfigMapNew := constructHystrixMap()
  169. for k, v := range HystrixConfigMap {
  170. // 新的和老的都存在判断是否更新
  171. if value, ok := hystrixConfigMapNew[k]; ok {
  172. // 存在,判断是否更新
  173. if v.ErrorPercentThreshold != value.ErrorPercentThreshold ||
  174. v.MaxConcurrentRequests != value.MaxConcurrentRequests ||
  175. v.RequestVolumeThreshold != value.RequestVolumeThreshold ||
  176. v.SleepWindow != value.SleepWindow ||
  177. v.Period != value.Period {
  178. fmt.Println("update:", k, value.RequestVolumeThreshold, value.RequestVolumeThreshold, value.SleepWindow, value.Period)
  179. hystrix.ConfigureCommand(k, hystrix.CommandConfig{Timeout: int(20000000), //cmd的超时时间,一旦超时则返回失败
  180. MaxConcurrentRequests: value.MaxConcurrentRequests, //最大并发请求数
  181. RequestVolumeThreshold: value.RequestVolumeThreshold, //一个统计窗口10(修改为60s了)秒内请求数量。达到这个请求数量后才去判断是否要开启熔断
  182. SleepWindow: value.SleepWindow, //熔断发生后的等待恢复时间
  183. ErrorPercentThreshold: value.ErrorPercentThreshold, //失败占比
  184. Period: value.Period,
  185. })
  186. v.ErrorPercentThreshold = value.ErrorPercentThreshold
  187. v.MaxConcurrentRequests = value.MaxConcurrentRequests
  188. v.RequestVolumeThreshold = value.RequestVolumeThreshold
  189. v.SleepWindow = value.SleepWindow
  190. v.Period = value.Period
  191. }
  192. } else {
  193. // 新的不存在
  194. settingsMutex.Lock()
  195. delete(HystrixConfigMap, k)
  196. settingsMutex.Unlock()
  197. }
  198. }
  199. for k, v := range hystrixConfigMapNew {
  200. if _, ok := HystrixConfigMap[k]; !ok {
  201. hystrix.ConfigureCommand(k, hystrix.CommandConfig{Timeout: int(20000000), //cmd的超时时间,一旦超时则返回失败
  202. MaxConcurrentRequests: v.MaxConcurrentRequests, //最大并发请求数
  203. RequestVolumeThreshold: v.RequestVolumeThreshold, //一个统计窗口10(修改为60s了)秒内请求数量。达到这个请求数量后才去判断是否要开启熔断
  204. SleepWindow: v.SleepWindow, //熔断发生后的等待恢复时间
  205. ErrorPercentThreshold: v.ErrorPercentThreshold, //失败占比
  206. Period: v.Period,
  207. })
  208. settingsMutex.Lock()
  209. HystrixConfigMap[k] = v
  210. settingsMutex.Unlock()
  211. }
  212. }
  213. }
  214. func SubscribeHystrix() {
  215. pubsub := cache.Redis.Subscribe(config.Conf.ThirdPart.HystrixPublishChannel)
  216. defer pubsub.Close()
  217. for msg := range pubsub.Channel() {
  218. if msg.Payload == "hystrix-update" {
  219. ReloadHystrix()
  220. }
  221. }
  222. }