// Copyright 2019 github.com. All rights reserved. // Use of this source code is governed by github.com. package main import ( "github.com/jaryhe/gopkgs/mqtt" "tower-monitor/parser" "tower-monitor/pb" "tower-monitor/udp_packet" "tower-monitor/timetask" "flag" "fmt" "net" "os" gmqtt "github.com/eclipse/paho.mqtt.golang" ) var ( configFile = flag.String("config", "conf/common.yaml", "config file location") version = flag.Bool("version", false, "print the version") GitCommit = "library-import" Version = "library-import" ) func showVersion() { fmt.Println("Version: ", Version) fmt.Println("GitCommit:", GitCommit) } var LimitChan = make(chan bool, 100) var HeartLimitChan = make(chan bool, 100) func prepare(filename string) { // 加载配置 err := parser.LoadConfig(filename) if err != nil { fmt.Printf("get conf failed, err: %+v\n\n", err) os.Exit(1) } // 注册处理函数 // parser.Register(parser.MysqlHandler, parser.RedisHandler, parser.LoggerHandler) //parser.Register(parser.LoggerHandler, parser.MysqlHandler, parser.RedisHandler) parser.Register(parser.LoggerHandler, parser.InfluxdbHandler, parser.MqttHandler) // 执行注册的处理函数 parser.Handle() // 建立rpc客户端 conns := pb.SetupClients() for _, conn := range conns { defer conn.Close() } timetask.Init() } func runDeviceUdp(ip string, port int, ch chan bool) { service := fmt.Sprintf("%s:%d", ip, port) udpAddr, err := net.ResolveUDPAddr("udp", service) if err != nil { fmt.Println("resolve udp addr err:", err) os.Exit(1) } listener, err := net.ListenUDP("udp", udpAddr) if err != nil { fmt.Println("listen tcp err:", err) os.Exit(2) } defer listener.Close() fmt.Printf("listen:%v\n", listener.LocalAddr().String()) for { ch <-true go udp_packet.HandleSocketClient(listener, ch) } } func subCallbackFunc(client gmqtt.Client, msg gmqtt.Message) { fmt.Printf("Subscribe: Topic is [%s]; msg is [%v]\n", msg.Topic(), msg.Payload()) if msg.Topic() != parser.Conf.Mqtt.Topic { return } go udp_packet.MqttHandle(msg.Payload()) } func mqttSubscribe() error { fmt.Printf("xxxtopic:%s,%v\n", parser.Conf.Mqtt.Topic, mqtt.MqttCli) return mqtt.Subscribe(mqtt.MqttCli, parser.Conf.Mqtt.Topic, subCallbackFunc) } func main() { flag.Parse() if *version { showVersion() os.Exit(1) } prepare(*configFile) mqttSubscribe() go runDeviceUdp(parser.Conf.TowerMonitor.ServiceIp, parser.Conf.TowerMonitor.ServicePort, LimitChan) runDeviceUdp(parser.Conf.TowerMonitor.ServiceIp, parser.Conf.TowerMonitor.HeartPort, HeartLimitChan) return }