setup.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. // Copyright 2019 github.com. All rights reserved.
  2. // Use of this source code is governed by github.com.
  3. package pb
  4. import (
  5. fmt "fmt"
  6. "property-applete-gateway/parser"
  7. "time"
  8. grpc "google.golang.org/grpc"
  9. "google.golang.org/grpc/keepalive"
  10. "go.etcd.io/etcd/client/v3/naming/resolver"
  11. )
  12. // 客户端集合
  13. var System SystemClient
  14. var Common CommonClient
  15. var Garden GardenClient
  16. var Thirdparty PropertyThirdpartyClient
  17. var Household HouseholdClient
  18. var PropertyLog PropertyLogClient
  19. var Device DeviceClient
  20. func setupDeviceClient2(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  21. // 根据是否为k8s来组装targets
  22. var serviceName string
  23. serviceName = parser.Conf.Rpc.Device.ServiceName
  24. builder, err := resolver.NewBuilder(parser.GetEtcdClient())
  25. if err != nil {
  26. panic(err)
  27. }
  28. // 发起一个连接并记录连接conn,后期释放
  29. if conn, err := grpc.Dial("etcd:///"+parser.Conf.Rpc.Prefix+"/"+serviceName,
  30. grpc.WithResolvers(builder),
  31. grpc.WithBalancerName("round_robin"),
  32. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  33. Device = NewDeviceClient(conn)
  34. conns = append(conns, conn)
  35. } else {
  36. fmt.Println("[rpc] dial Device conn err", err)
  37. }
  38. return
  39. }
  40. func setupDeviceClient(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  41. // 根据是否为k8s来组装targets
  42. var serviceName string
  43. if parser.Conf.K8s {
  44. serviceName = parser.Conf.Rpc.Device.ServiceName
  45. } else {
  46. serviceName = parser.Conf.Rpc.Device.ServiceIp
  47. }
  48. // 发起一个连接并记录连接conn,后期释放
  49. if conn, err := grpc.Dial(fmt.Sprintf("%s:%d", serviceName,
  50. parser.Conf.Rpc.Device.ServicePort),
  51. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  52. Device = NewDeviceClient(conn)
  53. conns = append(conns, conn)
  54. } else {
  55. fmt.Println("[rpc] dial Device conn err", err)
  56. }
  57. return
  58. }
  59. func setupSystemClient(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  60. // 根据是否为k8s来组装targets
  61. var serviceName string
  62. if parser.Conf.K8s {
  63. serviceName = parser.Conf.Rpc.System.ServiceName
  64. } else {
  65. serviceName = parser.Conf.Rpc.System.ServiceIp
  66. }
  67. // 发起一个连接并记录连接conn,后期释放
  68. if conn, err := grpc.Dial(fmt.Sprintf("%s:%d", serviceName,
  69. parser.Conf.Rpc.System.ServicePort),
  70. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  71. System = NewSystemClient(conn)
  72. conns = append(conns, conn)
  73. } else {
  74. fmt.Println("[rpc] dial system conn err", err)
  75. }
  76. return
  77. }
  78. func setupSystemClient2(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  79. // 根据是否为k8s来组装targets
  80. var serviceName string
  81. serviceName = parser.Conf.Rpc.System.ServiceName
  82. builder, err := resolver.NewBuilder(parser.GetEtcdClient())
  83. if err != nil {
  84. panic(err)
  85. }
  86. // 发起一个连接并记录连接conn,后期释放
  87. if conn, err := grpc.Dial("etcd:///"+parser.Conf.Rpc.Prefix+"/"+serviceName,
  88. grpc.WithResolvers(builder),
  89. grpc.WithBalancerName("round_robin"),
  90. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  91. System = NewSystemClient(conn)
  92. conns = append(conns, conn)
  93. } else {
  94. fmt.Println("[rpc] dial system conn err", err)
  95. }
  96. return
  97. }
  98. func setupPropertyLogClient(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  99. // 根据是否为k8s来组装targets
  100. var serviceName string
  101. if parser.Conf.K8s {
  102. serviceName = parser.Conf.Rpc.PropertyLog.ServiceName
  103. } else {
  104. serviceName = parser.Conf.Rpc.PropertyLog.ServiceIp
  105. }
  106. // 发起一个连接并记录连接conn,后期释放
  107. if conn, err := grpc.Dial(fmt.Sprintf("%s:%d", serviceName,
  108. parser.Conf.Rpc.PropertyLog.ServicePort),
  109. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  110. PropertyLog = NewPropertyLogClient(conn)
  111. conns = append(conns, conn)
  112. } else {
  113. fmt.Println("[rpc] dial log conn err", err)
  114. }
  115. return
  116. }
  117. func setupPropertyLogClient2(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  118. // 根据是否为k8s来组装targets
  119. var serviceName string
  120. serviceName = parser.Conf.Rpc.PropertyLog.ServiceName
  121. builder, err := resolver.NewBuilder(parser.GetEtcdClient())
  122. if err != nil {
  123. panic(err)
  124. }
  125. // 发起一个连接并记录连接conn,后期释放
  126. if conn, err := grpc.Dial("etcd:///"+parser.Conf.Rpc.Prefix+"/"+serviceName,
  127. grpc.WithResolvers(builder),
  128. grpc.WithBalancerName("round_robin"),
  129. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  130. PropertyLog = NewPropertyLogClient(conn)
  131. conns = append(conns, conn)
  132. } else {
  133. fmt.Println("[rpc] dial PropertyLog conn err", err)
  134. }
  135. return
  136. }
  137. func setupHouseholdClient(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  138. // 根据是否为k8s来组装targets
  139. var serviceName string
  140. if parser.Conf.K8s {
  141. serviceName = parser.Conf.Rpc.Household.ServiceName
  142. } else {
  143. serviceName = parser.Conf.Rpc.Household.ServiceIp
  144. }
  145. // 发起一个连接并记录连接conn,后期释放
  146. if conn, err := grpc.Dial(fmt.Sprintf("%s:%d", serviceName,
  147. parser.Conf.Rpc.Household.ServicePort),
  148. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  149. Household = NewHouseholdClient(conn)
  150. conns = append(conns, conn)
  151. } else {
  152. fmt.Println("[rpc] dial household conn err", err)
  153. }
  154. return
  155. }
  156. func setupHouseholdClient2(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  157. // 根据是否为k8s来组装targets
  158. var serviceName string
  159. serviceName = parser.Conf.Rpc.Household.ServiceName
  160. builder, err := resolver.NewBuilder(parser.GetEtcdClient())
  161. if err != nil {
  162. panic(err)
  163. }
  164. // 发起一个连接并记录连接conn,后期释放
  165. if conn, err := grpc.Dial("etcd:///"+parser.Conf.Rpc.Prefix+"/"+serviceName,
  166. grpc.WithResolvers(builder),
  167. grpc.WithBalancerName("round_robin"),
  168. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  169. Household = NewHouseholdClient(conn)
  170. conns = append(conns, conn)
  171. } else {
  172. fmt.Println("[rpc] dial Household conn err", err)
  173. }
  174. return
  175. }
  176. func setupGardenClient2(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  177. // 根据是否为k8s来组装targets
  178. var serviceName string
  179. serviceName = parser.Conf.Rpc.Garden.ServiceName
  180. builder, err := resolver.NewBuilder(parser.GetEtcdClient())
  181. if err != nil {
  182. panic(err)
  183. }
  184. // 发起一个连接并记录连接conn,后期释放
  185. if conn, err := grpc.Dial("etcd:///"+parser.Conf.Rpc.Prefix+"/"+serviceName,
  186. grpc.WithResolvers(builder),
  187. grpc.WithBalancerName("round_robin"),
  188. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  189. Garden = NewGardenClient(conn)
  190. conns = append(conns, conn)
  191. } else {
  192. fmt.Println("[rpc] dial Garden conn err", err)
  193. }
  194. return
  195. }
  196. func setupGardenClient(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  197. // 根据是否为k8s来组装targets
  198. var serviceName string
  199. if parser.Conf.K8s {
  200. serviceName = parser.Conf.Rpc.Garden.ServiceName
  201. } else {
  202. serviceName = parser.Conf.Rpc.Garden.ServiceIp
  203. }
  204. // 发起一个连接并记录连接conn,后期释放
  205. if conn, err := grpc.Dial(fmt.Sprintf("%s:%d", serviceName,
  206. parser.Conf.Rpc.Garden.ServicePort),
  207. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  208. Garden = NewGardenClient(conn)
  209. conns = append(conns, conn)
  210. } else {
  211. fmt.Println("[rpc] dial Garden conn err", err)
  212. }
  213. return
  214. }
  215. func setupCommonClient2(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  216. // 根据是否为k8s来组装targets
  217. var serviceName string
  218. serviceName = parser.Conf.Rpc.Common.ServiceName
  219. builder, err := resolver.NewBuilder(parser.GetEtcdClient())
  220. if err != nil {
  221. panic(err)
  222. }
  223. // 发起一个连接并记录连接conn,后期释放
  224. if conn, err := grpc.Dial("etcd:///"+parser.Conf.Rpc.Prefix+"/"+serviceName,
  225. grpc.WithResolvers(builder),
  226. grpc.WithBalancerName("round_robin"),
  227. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  228. Common = NewCommonClient(conn)
  229. conns = append(conns, conn)
  230. } else {
  231. fmt.Println("[rpc] dial Common conn err", err)
  232. }
  233. return
  234. }
  235. func setupCommonClient(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  236. // 根据是否为k8s来组装targets
  237. var serviceName string
  238. if parser.Conf.K8s {
  239. serviceName = parser.Conf.Rpc.Common.ServiceName
  240. } else {
  241. serviceName = parser.Conf.Rpc.Common.ServiceIp
  242. }
  243. // 发起一个连接并记录连接conn,后期释放
  244. if conn, err := grpc.Dial(fmt.Sprintf("%s:%d", serviceName,
  245. parser.Conf.Rpc.Common.ServicePort),
  246. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  247. Common = NewCommonClient(conn)
  248. conns = append(conns, conn)
  249. } else {
  250. fmt.Println("[rpc] dial common conn err", err)
  251. }
  252. return
  253. }
  254. func setupThirdpartyClient2(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  255. // 根据是否为k8s来组装targets
  256. var serviceName string
  257. serviceName = parser.Conf.Rpc.Thirdparty.ServiceName
  258. builder, err := resolver.NewBuilder(parser.GetEtcdClient())
  259. if err != nil {
  260. panic(err)
  261. }
  262. // 发起一个连接并记录连接conn,后期释放
  263. if conn, err := grpc.Dial("etcd:///"+parser.Conf.Rpc.Prefix+"/"+serviceName,
  264. grpc.WithResolvers(builder),
  265. grpc.WithBalancerName("round_robin"),
  266. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  267. Thirdparty = NewPropertyThirdpartyClient(conn)
  268. conns = append(conns, conn)
  269. } else {
  270. fmt.Println("[rpc] dial Thirdparty conn err", err)
  271. }
  272. return
  273. }
  274. func setupThirdpartyClient(kacp keepalive.ClientParameters, conns []*grpc.ClientConn) {
  275. // 根据是否为k8s来组装targets
  276. var serviceName string
  277. if parser.Conf.K8s {
  278. serviceName = parser.Conf.Rpc.Thirdparty.ServiceName
  279. } else {
  280. serviceName = parser.Conf.Rpc.Thirdparty.ServiceIp
  281. }
  282. // 发起一个连接并记录连接conn,后期释放
  283. if conn, err := grpc.Dial(fmt.Sprintf("%s:%d", serviceName,
  284. parser.Conf.Rpc.Thirdparty.ServicePort),
  285. grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)); err == nil {
  286. Thirdparty = NewPropertyThirdpartyClient(conn)
  287. conns = append(conns, conn)
  288. } else {
  289. fmt.Println("[rpc] dial thirdparty conn err", err)
  290. }
  291. return
  292. }
  293. // SetupClients 创建客户端
  294. func SetupClients() (conns []*grpc.ClientConn) {
  295. // 客户端配置参数
  296. var kacp = keepalive.ClientParameters{
  297. // send pings every n seconds if there is no activity
  298. Time: time.Duration(parser.Conf.Rpc.Keepalive.ClientTime) * time.Second,
  299. // wait n second for ping ack before considering the connection dead
  300. Timeout: time.Duration(parser.Conf.Rpc.Keepalive.ClientTimeout) * time.Second,
  301. // send pings even without active streams
  302. PermitWithoutStream: true,
  303. }
  304. setupSystemClient2(kacp, conns)
  305. setupCommonClient2(kacp, conns)
  306. setupThirdpartyClient2(kacp, conns)
  307. setupGardenClient2(kacp, conns)
  308. setupHouseholdClient2(kacp, conns)
  309. setupPropertyLogClient2(kacp, conns)
  310. setupDeviceClient2(kacp, conns)
  311. return
  312. }