main.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. // Copyright 2019 github.com. All rights reserved.
  2. // Use of this source code is governed by github.com.
  3. package main
  4. import (
  5. "flag"
  6. "fmt"
  7. gmqtt "github.com/eclipse/paho.mqtt.golang"
  8. "github.com/jaryhe/gopkgs/mqtt"
  9. "net"
  10. "os"
  11. "lift-monitor/parser"
  12. "lift-monitor/pb"
  13. "lift-monitor/tcp_packet"
  14. "lift-monitor/timetask"
  15. "lift-monitor/udp_packet"
  16. )
  17. var (
  18. configFile = flag.String("config", "conf/common.yaml", "config file location")
  19. version = flag.Bool("version", false, "print the version")
  20. GitCommit = "library-import"
  21. Version = "library-import"
  22. )
  23. func showVersion() {
  24. fmt.Println("Version: ", Version)
  25. fmt.Println("GitCommit:", GitCommit)
  26. }
  27. var LimitChan = make(chan bool, 100)
  28. var HeartLimitChan = make(chan bool, 100)
  29. func prepare(filename string) {
  30. // 加载配置
  31. err := parser.LoadConfig(filename)
  32. if err != nil {
  33. fmt.Printf("get conf failed, err: %+v\n\n", err)
  34. os.Exit(1)
  35. }
  36. // 注册处理函数
  37. // parser.Register(parser.MysqlHandler, parser.RedisHandler, parser.LoggerHandler)
  38. //parser.Register(parser.LoggerHandler, parser.MysqlHandler, parser.RedisHandler)
  39. parser.Register(parser.LoggerHandler, parser.RedisHandler, parser.MysqlHandler, parser.InfluxdbHandler, parser.MqttHandler)
  40. // 执行注册的处理函数
  41. parser.Handle()
  42. // 建立rpc客户端
  43. conns := pb.SetupClients()
  44. for _, conn := range conns {
  45. defer conn.Close()
  46. }
  47. timetask.Init()
  48. }
  49. func runDeviceUdp(ip string, port int, ch chan bool) {
  50. service := fmt.Sprintf("%s:%d", ip, port)
  51. udpAddr, err := net.ResolveUDPAddr("udp", service)
  52. if err != nil {
  53. fmt.Println("resolve udp addr err:", err)
  54. os.Exit(1)
  55. }
  56. listener, err := net.ListenUDP("udp", udpAddr)
  57. if err != nil {
  58. fmt.Println("listen tcp err:", err)
  59. os.Exit(2)
  60. }
  61. defer listener.Close()
  62. fmt.Printf("listen:%v\n", listener.LocalAddr().String())
  63. for {
  64. ch <-true
  65. go udp_packet.HandleSocketClient(listener, ch)
  66. }
  67. }
  68. func runDeviceTcp(ip string, port int) {
  69. service := fmt.Sprintf("%s:%d", ip, port)
  70. tcpAddr, err := net.ResolveTCPAddr("tcp4", service)
  71. if err != nil {
  72. fmt.Println("resolve tcp addr err:", err)
  73. os.Exit(1)
  74. }
  75. listener, err := net.ListenTCP("tcp", tcpAddr)
  76. if err != nil {
  77. fmt.Println("listen tcp err:", err)
  78. os.Exit(2)
  79. }
  80. for {
  81. fmt.Printf("listen:%v\n", listener.Addr().String())
  82. conn, err := listener.Accept()
  83. if err != nil {
  84. continue
  85. }
  86. go tcp_packet.HandleSocketClient(conn)
  87. }
  88. }
  89. func subCallbackFunc(client gmqtt.Client, msg gmqtt.Message) {
  90. fmt.Printf("Subscribe: Topic is [%s]; msg is [%v]\n", msg.Topic(), msg.Payload())
  91. if msg.Topic() != parser.Conf.Mqtt.Topic {
  92. return
  93. }
  94. go udp_packet.MqttHandle(msg.Payload())
  95. }
  96. func mqttSubscribe() error {
  97. fmt.Printf("xxxtopic:%s,%v\n", parser.Conf.Mqtt.Topic, mqtt.MqttCli)
  98. return mqtt.Subscribe(mqtt.MqttCli, parser.Conf.Mqtt.Topic, subCallbackFunc)
  99. }
  100. func main() {
  101. flag.Parse()
  102. if *version {
  103. showVersion()
  104. os.Exit(1)
  105. }
  106. prepare(*configFile)
  107. //mqttSubscribe()
  108. //runDeviceUdp(parser.Conf.LiftMonitor.ServiceIp, parser.Conf.LiftMonitor.ServicePort, LimitChan)
  109. //runDeviceUdp(parser.Conf.LiftMonitor.ServiceIp, parser.Conf.LiftMonitor.HeartPort, HeartLimitChan)
  110. runDeviceTcp(parser.Conf.LiftMonitor.ServiceIp, parser.Conf.LiftMonitor.ServicePort)
  111. return
  112. }