broker.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. // Copyright 2016 Circonus, Inc. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package checkmgr
  5. import (
  6. "fmt"
  7. "math/rand"
  8. "net"
  9. "net/url"
  10. "reflect"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "github.com/circonus-labs/circonus-gometrics/api"
  15. )
  16. func init() {
  17. rand.Seed(time.Now().UnixNano())
  18. }
  19. // Get Broker to use when creating a check
  20. func (cm *CheckManager) getBroker() (*api.Broker, error) {
  21. if cm.brokerID != 0 {
  22. cid := fmt.Sprintf("/broker/%d", cm.brokerID)
  23. broker, err := cm.apih.FetchBroker(api.CIDType(&cid))
  24. if err != nil {
  25. return nil, err
  26. }
  27. if !cm.isValidBroker(broker) {
  28. return nil, fmt.Errorf(
  29. "[ERROR] designated broker %d [%s] is invalid (not active, does not support required check type, or connectivity issue)",
  30. cm.brokerID,
  31. broker.Name)
  32. }
  33. return broker, nil
  34. }
  35. broker, err := cm.selectBroker()
  36. if err != nil {
  37. return nil, fmt.Errorf("[ERROR] Unable to fetch suitable broker %s", err)
  38. }
  39. return broker, nil
  40. }
  41. // Get CN of Broker associated with submission_url to satisfy no IP SANS in certs
  42. func (cm *CheckManager) getBrokerCN(broker *api.Broker, submissionURL api.URLType) (string, error) {
  43. u, err := url.Parse(string(submissionURL))
  44. if err != nil {
  45. return "", err
  46. }
  47. hostParts := strings.Split(u.Host, ":")
  48. host := hostParts[0]
  49. if net.ParseIP(host) == nil { // it's a non-ip string
  50. return u.Host, nil
  51. }
  52. cn := ""
  53. for _, detail := range broker.Details {
  54. if *detail.IP == host {
  55. cn = detail.CN
  56. break
  57. }
  58. }
  59. if cn == "" {
  60. return "", fmt.Errorf("[ERROR] Unable to match URL host (%s) to Broker", u.Host)
  61. }
  62. return cn, nil
  63. }
  64. // Select a broker for use when creating a check, if a specific broker
  65. // was not specified.
  66. func (cm *CheckManager) selectBroker() (*api.Broker, error) {
  67. var brokerList *[]api.Broker
  68. var err error
  69. enterpriseType := "enterprise"
  70. if len(cm.brokerSelectTag) > 0 {
  71. filter := api.SearchFilterType{
  72. "f__tags_has": cm.brokerSelectTag,
  73. }
  74. brokerList, err = cm.apih.SearchBrokers(nil, &filter)
  75. if err != nil {
  76. return nil, err
  77. }
  78. } else {
  79. brokerList, err = cm.apih.FetchBrokers()
  80. if err != nil {
  81. return nil, err
  82. }
  83. }
  84. if len(*brokerList) == 0 {
  85. return nil, fmt.Errorf("zero brokers found")
  86. }
  87. validBrokers := make(map[string]api.Broker)
  88. haveEnterprise := false
  89. for _, broker := range *brokerList {
  90. broker := broker
  91. if cm.isValidBroker(&broker) {
  92. validBrokers[broker.CID] = broker
  93. if broker.Type == enterpriseType {
  94. haveEnterprise = true
  95. }
  96. }
  97. }
  98. if haveEnterprise { // eliminate non-enterprise brokers from valid brokers
  99. for k, v := range validBrokers {
  100. if v.Type != enterpriseType {
  101. delete(validBrokers, k)
  102. }
  103. }
  104. }
  105. if len(validBrokers) == 0 {
  106. return nil, fmt.Errorf("found %d broker(s), zero are valid", len(*brokerList))
  107. }
  108. validBrokerKeys := reflect.ValueOf(validBrokers).MapKeys()
  109. selectedBroker := validBrokers[validBrokerKeys[rand.Intn(len(validBrokerKeys))].String()]
  110. if cm.Debug {
  111. cm.Log.Printf("[DEBUG] Selected broker '%s'\n", selectedBroker.Name)
  112. }
  113. return &selectedBroker, nil
  114. }
  115. // Verify broker supports the check type to be used
  116. func (cm *CheckManager) brokerSupportsCheckType(checkType CheckTypeType, details *api.BrokerDetail) bool {
  117. baseType := string(checkType)
  118. for _, module := range details.Modules {
  119. if module == baseType {
  120. return true
  121. }
  122. }
  123. if idx := strings.Index(baseType, ":"); idx > 0 {
  124. baseType = baseType[0:idx]
  125. }
  126. for _, module := range details.Modules {
  127. if module == baseType {
  128. return true
  129. }
  130. }
  131. return false
  132. }
  133. // Is the broker valid (active, supports check type, and reachable)
  134. func (cm *CheckManager) isValidBroker(broker *api.Broker) bool {
  135. var brokerHost string
  136. var brokerPort string
  137. if broker.Type != "circonus" && broker.Type != "enterprise" {
  138. return false
  139. }
  140. valid := false
  141. for _, detail := range broker.Details {
  142. detail := detail
  143. // broker must be active
  144. if detail.Status != statusActive {
  145. if cm.Debug {
  146. cm.Log.Printf("[DEBUG] Broker '%s' is not active.\n", broker.Name)
  147. }
  148. continue
  149. }
  150. // broker must have module loaded for the check type to be used
  151. if !cm.brokerSupportsCheckType(cm.checkType, &detail) {
  152. if cm.Debug {
  153. cm.Log.Printf("[DEBUG] Broker '%s' does not support '%s' checks.\n", broker.Name, cm.checkType)
  154. }
  155. continue
  156. }
  157. if detail.ExternalPort != 0 {
  158. brokerPort = strconv.Itoa(int(detail.ExternalPort))
  159. } else {
  160. if detail.Port != nil && *detail.Port != 0 {
  161. brokerPort = strconv.Itoa(int(*detail.Port))
  162. } else {
  163. brokerPort = "43191"
  164. }
  165. }
  166. if detail.ExternalHost != nil && *detail.ExternalHost != "" {
  167. brokerHost = *detail.ExternalHost
  168. } else if detail.IP != nil && *detail.IP != "" {
  169. brokerHost = *detail.IP
  170. }
  171. if brokerHost == "" {
  172. cm.Log.Printf("[WARN] Broker '%s' instance %s has no IP or external host set", broker.Name, detail.CN)
  173. continue
  174. }
  175. if brokerHost == "trap.noit.circonus.net" && brokerPort != "443" {
  176. brokerPort = "443"
  177. }
  178. retries := 5
  179. for attempt := 1; attempt <= retries; attempt++ {
  180. // broker must be reachable and respond within designated time
  181. conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%s", brokerHost, brokerPort), cm.brokerMaxResponseTime)
  182. if err == nil {
  183. conn.Close()
  184. valid = true
  185. break
  186. }
  187. cm.Log.Printf("[WARN] Broker '%s' unable to connect, %v. Retrying in 2 seconds, attempt %d of %d.", broker.Name, err, attempt, retries)
  188. time.Sleep(2 * time.Second)
  189. }
  190. if valid {
  191. if cm.Debug {
  192. cm.Log.Printf("[DEBUG] Broker '%s' is valid\n", broker.Name)
  193. }
  194. break
  195. }
  196. }
  197. return valid
  198. }