// Copyright 2019 getensh.com. All rights reserved. // Use of this source code is governed by getensh.com. package utils import ( "encoding/json" "fmt" "gd_service/consts" "gd_service/errors" "gd_service/thirdparty" "sync" "time" "github.com/tidwall/gjson" "go.uber.org/zap" "gd_service/common.in/cache" "gd_service/common.in/config" "github.com/astaxie/beego/orm" "github.com/afex/hystrix-go/hystrix" ) const ( HystrixClose = 0 HystrixOpen = 1 ) type HystrixConfig struct { ProviderApiName string `json:"provider_api_name"` ProviderApiCode string `json:"provider_api_code"` MaxConcurrentRequests int `json:"max_concurrent_requests"` RequestVolumeThreshold int `json:"request_volume_threshold"` SleepWindow int `json:"sleep_window"` ErrorPercentThreshold int `json:"error_percent_threshold"` Period int `json:"period"` // 熔断统计周期 HystrixStatus int `json:"hystrix_status"` IsOn int `json:"is_on"` LastSendTime int64 `json:"last_send_time"` Mutex sync.Mutex } var settingsMutex *sync.RWMutex var HystrixConfigMap map[string]*HystrixConfig var HystrixConfigStatusMap map[string]int // 构建熔断map func constructHystrixMap() map[string]*HystrixConfig { hystrixConfigMap := make(map[string]*HystrixConfig) hystrixConfigs := []HystrixConfig{} orm.NewOrm().Raw("select * from db_gd_management.t_gd_provider_api_hystrix where is_on=1").QueryRows(&hystrixConfigs) for index, _ := range hystrixConfigs { hystrixConfigMap[hystrixConfigs[index].ProviderApiCode] = &hystrixConfigs[index] /*if _, ok := HystrixConfigStatusMap[hystrixConfigs[index].ProviderApiCode]; !ok { HystrixConfigStatusMap[hystrixConfigs[index].ProviderApiCode] = 0 }*/ } return hystrixConfigMap } // 加载数据源熔断 func ProviderApiHystrixLoad() error { //HystrixConfigStatusMap = make(map[string]int) HystrixConfigMap = constructHystrixMap() settingsMutex = &sync.RWMutex{} // 注册熔断 for k, v := range HystrixConfigMap { fmt.Println("HystrixConfigMap:", v.ProviderApiCode, v.SleepWindow, v.RequestVolumeThreshold, v.ErrorPercentThreshold, v.Period) hystrix.ConfigureCommand(k, hystrix.CommandConfig{Timeout: int(20000000), //cmd的超时时间,一旦超时则返回失败 MaxConcurrentRequests: v.MaxConcurrentRequests, //最大并发请求数 RequestVolumeThreshold: v.RequestVolumeThreshold, //一个统计窗口10(修改为60s了)秒内请求数量。达到这个请求数量后才去判断是否要开启熔断 SleepWindow: v.SleepWindow, //熔断发生后的等待恢复时间 ErrorPercentThreshold: v.ErrorPercentThreshold, //失败占比 Period: v.Period, }) } go SubscribeHystrix() //go HystrixTimer() return nil } func CheckProviderApiHystrix(cmd string) (*HystrixConfig, bool) { settingsMutex.RLock() defer settingsMutex.RUnlock() v, ok := HystrixConfigMap[cmd] return v, ok } func CheckHystrixStatus(hystrixConfig *HystrixConfig, err error, response string) { now := time.Now().UnixNano() / 1e6 if hystrixConfig.HystrixStatus == HystrixOpen { // 熔断状态开启时,判断是否超过了窗口时间,没超过不发送任何通知 if now < (hystrixConfig.LastSendTime + int64(hystrixConfig.SleepWindow)) { return } } if err == nil { // 如果已开启熔断,错误为空表示恢复 if hystrixConfig.HystrixStatus == HystrixOpen { hystrixConfig.HystrixStatus = HystrixClose content := fmt.Sprintf("数据源%s(%s)已恢复(%s)", hystrixConfig.ProviderApiName, hystrixConfig.ProviderApiCode, time.Now().Format(consts.DaySecLayout)) RobotMsg(content) //updateHystrixMap(hystrixConfig) } } else if err == errors.VendorError { // 如果是三方错误,并且熔断开启,表示尝试调用数据源,任然不通 if hystrixConfig.HystrixStatus == HystrixOpen { content := fmt.Sprintf("数据源%s(%s)未恢复(%s),数据源响应:%s", hystrixConfig.ProviderApiName, hystrixConfig.ProviderApiCode, time.Now().Format(consts.DaySecLayout), response) RobotMsg(content) } } else { // 出现熔断错误,判断本地熔断状态,如果是关闭,将熔断状态设置为开启,并发送钉钉 hystrixConfig.Mutex.Lock() defer hystrixConfig.Mutex.Unlock() if hystrixConfig.HystrixStatus == HystrixClose { hystrixConfig.HystrixStatus = HystrixOpen // xxx数据源请求失败超过xx%,触发熔断(时间段),数据源返回值 content := fmt.Sprintf("数据源%s(%s)请求失败超过百分之%d,触发熔断(%s)", hystrixConfig.ProviderApiName, hystrixConfig.ProviderApiCode, hystrixConfig.ErrorPercentThreshold, time.Now().Format(consts.DaySecLayout)) RobotMsg(content) hystrixConfig.LastSendTime = now } } } type Text struct { Content string `json:"content"` } type At struct { AtMobiles []string `json:"atMobiles"` IsAtAll bool `json:"isAtAll"` } type TextMsg struct { MsgType string `json:"msgtype"` Text Text `json:"text"` At At `json:"at"` } func RobotMsg(content string) (err error) { body := map[string]interface{}{ "msgtype": "text", "text": map[string]interface{}{"content": content}, } bytes, _ := json.Marshal(body) h := thirdparty.HttpRequestWithHeadCommon{ Method: "POST", Url: config.Conf.ThirdPart.HystrixWebhook, Body: bytes, TimeOut: 10 * time.Second, } bytes, err = h.Request() if err != nil { l.Error("func", zap.String("call", "RobotMsg"), zap.String("params", content), zap.String("error", err.Error())) return errors.VendorError } errcode := gjson.GetBytes(bytes, "errcode").Int() errmsg := gjson.GetBytes(bytes, "errmsg").String() if errcode != 0 { l.Error("func", zap.String("call", "RobotMsg"), zap.String("params", content), zap.String("error", errmsg)) return errors.VendorError } return nil } func ReloadHystrix() { hystrixConfigMapNew := constructHystrixMap() for k, v := range HystrixConfigMap { // 新的和老的都存在判断是否更新 if value, ok := hystrixConfigMapNew[k]; ok { // 存在,判断是否更新 if v.ErrorPercentThreshold != value.ErrorPercentThreshold || v.MaxConcurrentRequests != value.MaxConcurrentRequests || v.RequestVolumeThreshold != value.RequestVolumeThreshold || v.SleepWindow != value.SleepWindow || v.Period != value.Period { fmt.Println("update:", k, value.RequestVolumeThreshold, value.RequestVolumeThreshold, value.SleepWindow, value.Period) hystrix.ConfigureCommand(k, hystrix.CommandConfig{Timeout: int(20000000), //cmd的超时时间,一旦超时则返回失败 MaxConcurrentRequests: value.MaxConcurrentRequests, //最大并发请求数 RequestVolumeThreshold: value.RequestVolumeThreshold, //一个统计窗口10(修改为60s了)秒内请求数量。达到这个请求数量后才去判断是否要开启熔断 SleepWindow: value.SleepWindow, //熔断发生后的等待恢复时间 ErrorPercentThreshold: value.ErrorPercentThreshold, //失败占比 Period: value.Period, }) v.ErrorPercentThreshold = value.ErrorPercentThreshold v.MaxConcurrentRequests = value.MaxConcurrentRequests v.RequestVolumeThreshold = value.RequestVolumeThreshold v.SleepWindow = value.SleepWindow v.Period = value.Period } } else { // 新的不存在 settingsMutex.Lock() delete(HystrixConfigMap, k) settingsMutex.Unlock() } } for k, v := range hystrixConfigMapNew { if _, ok := HystrixConfigMap[k]; !ok { hystrix.ConfigureCommand(k, hystrix.CommandConfig{Timeout: int(20000000), //cmd的超时时间,一旦超时则返回失败 MaxConcurrentRequests: v.MaxConcurrentRequests, //最大并发请求数 RequestVolumeThreshold: v.RequestVolumeThreshold, //一个统计窗口10(修改为60s了)秒内请求数量。达到这个请求数量后才去判断是否要开启熔断 SleepWindow: v.SleepWindow, //熔断发生后的等待恢复时间 ErrorPercentThreshold: v.ErrorPercentThreshold, //失败占比 Period: v.Period, }) settingsMutex.Lock() HystrixConfigMap[k] = v settingsMutex.Unlock() } } } func SubscribeHystrix() { pubsub := cache.Redis.Subscribe(config.Conf.ThirdPart.HystrixPublishChannel) defer pubsub.Close() for msg := range pubsub.Channel() { if msg.Payload == "hystrix-update" { ReloadHystrix() } } }