main.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. // Copyright 2019 github.com. All rights reserved.
  2. // Use of this source code is governed by github.com.
  3. package main
  4. import (
  5. "github.com/jaryhe/gopkgs/mqtt"
  6. "tower-monitor/parser"
  7. "tower-monitor/pb"
  8. "tower-monitor/udp_packet"
  9. "tower-monitor/timetask"
  10. "flag"
  11. "fmt"
  12. "net"
  13. "os"
  14. gmqtt "github.com/eclipse/paho.mqtt.golang"
  15. )
  16. var (
  17. configFile = flag.String("config", "conf/common.yaml", "config file location")
  18. version = flag.Bool("version", false, "print the version")
  19. GitCommit = "library-import"
  20. Version = "library-import"
  21. )
  22. func showVersion() {
  23. fmt.Println("Version: ", Version)
  24. fmt.Println("GitCommit:", GitCommit)
  25. }
  26. var LimitChan = make(chan bool, 100)
  27. var HeartLimitChan = make(chan bool, 100)
  28. func prepare(filename string) {
  29. // 加载配置
  30. err := parser.LoadConfig(filename)
  31. if err != nil {
  32. fmt.Printf("get conf failed, err: %+v\n\n", err)
  33. os.Exit(1)
  34. }
  35. // 注册处理函数
  36. // parser.Register(parser.MysqlHandler, parser.RedisHandler, parser.LoggerHandler)
  37. //parser.Register(parser.LoggerHandler, parser.MysqlHandler, parser.RedisHandler)
  38. parser.Register(parser.LoggerHandler, parser.InfluxdbHandler, parser.MqttHandler)
  39. // 执行注册的处理函数
  40. parser.Handle()
  41. // 建立rpc客户端
  42. conns := pb.SetupClients()
  43. for _, conn := range conns {
  44. defer conn.Close()
  45. }
  46. timetask.Init()
  47. }
  48. func runDeviceUdp(ip string, port int, ch chan bool) {
  49. service := fmt.Sprintf("%s:%d", ip, port)
  50. udpAddr, err := net.ResolveUDPAddr("udp", service)
  51. if err != nil {
  52. fmt.Println("resolve udp addr err:", err)
  53. os.Exit(1)
  54. }
  55. listener, err := net.ListenUDP("udp", udpAddr)
  56. if err != nil {
  57. fmt.Println("listen tcp err:", err)
  58. os.Exit(2)
  59. }
  60. defer listener.Close()
  61. fmt.Printf("listen:%v\n", listener.LocalAddr().String())
  62. for {
  63. ch <-true
  64. go udp_packet.HandleSocketClient(listener, ch)
  65. }
  66. }
  67. func subCallbackFunc(client gmqtt.Client, msg gmqtt.Message) {
  68. fmt.Printf("Subscribe: Topic is [%s]; msg is [%v]\n", msg.Topic(), msg.Payload())
  69. if msg.Topic() != parser.Conf.Mqtt.Topic {
  70. return
  71. }
  72. go udp_packet.MqttHandle(msg.Payload())
  73. }
  74. func mqttSubscribe() error {
  75. fmt.Printf("xxxtopic:%s,%v\n", parser.Conf.Mqtt.Topic, mqtt.MqttCli)
  76. return mqtt.Subscribe(mqtt.MqttCli, parser.Conf.Mqtt.Topic, subCallbackFunc)
  77. }
  78. func main() {
  79. flag.Parse()
  80. if *version {
  81. showVersion()
  82. os.Exit(1)
  83. }
  84. prepare(*configFile)
  85. mqttSubscribe()
  86. go runDeviceUdp(parser.Conf.TowerMonitor.ServiceIp, parser.Conf.TowerMonitor.ServicePort, LimitChan)
  87. runDeviceUdp(parser.Conf.TowerMonitor.ServiceIp, parser.Conf.TowerMonitor.HeartPort, HeartLimitChan)
  88. return
  89. }