main.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. // Copyright 2019 getensh.com. All rights reserved.
  2. // Use of this source code is governed by getensh.com.
  3. package main
  4. import (
  5. "context"
  6. "flag"
  7. "fmt"
  8. "gopkg.in/ini.v1"
  9. "log"
  10. "net"
  11. "os"
  12. "os/signal"
  13. "property-mqtt/mqtt_utils"
  14. "syscall"
  15. "time"
  16. "property-mqtt/impl"
  17. "property-mqtt/parser"
  18. "property-mqtt/pb"
  19. "git.getensh.com/common/gopkgs/database"
  20. "git.getensh.com/common/gopkgs/tasker"
  21. "google.golang.org/grpc"
  22. )
  23. var (
  24. configFile = flag.String("config", "conf/common.yaml", "config file location")
  25. appConf = flag.String("appconf", "conf/app.conf", "app conf file location")
  26. version = flag.Bool("version", false, "print the version")
  27. GitCommit = "library-import"
  28. Version = "library-import"
  29. )
  30. func showVersion() {
  31. fmt.Println("Version: ", Version)
  32. fmt.Println("GitCommit:", GitCommit)
  33. }
  34. func prepare(filename string) {
  35. // 加载配置
  36. err := parser.LoadConfig(filename)
  37. if err != nil {
  38. fmt.Printf("get conf failed, err: %+v\n\n", err)
  39. os.Exit(1)
  40. }
  41. // 注册处理函数
  42. // parser.Register(parser.MysqlHandler, parser.RedisHandler, parser.LoggerHandler)
  43. parser.Register(
  44. //parser.MysqlHandler,
  45. parser.MqttHandler,
  46. parser.LoggerHandler,
  47. parser.RedisHandler,
  48. //parser.InfluxdbHandler,
  49. )
  50. // 执行注册的处理函数
  51. parser.Handle()
  52. }
  53. func run() {
  54. // 开始监听
  55. serveAddr := fmt.Sprintf("%s:%d", parser.Conf.Rpc.Mqtt.ServiceIp, parser.Conf.Rpc.Mqtt.ServicePort)
  56. log.Printf("Listening and serving TCP on %s\n", serveAddr)
  57. lis, err := net.Listen("tcp", serveAddr)
  58. if err != nil {
  59. os.Exit(1)
  60. }
  61. s := grpc.NewServer()
  62. impl.Register(s)
  63. cfg, err := ini.Load(*appConf)
  64. if err != nil {
  65. fmt.Printf("Fail to read file: %v\n\n", err)
  66. os.Exit(1)
  67. }
  68. serviceIp := cfg.Section("").Key("service_ip").String()
  69. if len(parser.Conf.EtcdAddrs) > 0 {
  70. parser.EctdRegister(parser.Conf.EtcdAddrs, parser.Conf.Rpc.Mqtt.ServiceName,
  71. fmt.Sprintf("%s:%v", serviceIp, parser.Conf.Rpc.Mqtt.ServicePort),
  72. parser.Conf.Rpc.Prefix)
  73. }
  74. // 建立rpc客户端
  75. conns := pb.SetupClients()
  76. for _, conn := range conns {
  77. defer conn.Close()
  78. }
  79. mqtt_utils.SubAll()
  80. go func() {
  81. if err := s.Serve(lis); err != nil {
  82. log.Fatalf("HTTP server listen failed. err: %s\n", err.Error())
  83. }
  84. }()
  85. // 优雅关闭服务器
  86. sigChan := make(chan os.Signal, 1)
  87. // 捕获信号
  88. signal.Notify(sigChan, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
  89. sigValue := <-sigChan
  90. log.Printf("Got a signal:%v", sigValue)
  91. // 让tasker安全退出
  92. tasker.SignalNotify(sigValue)
  93. // 不管什么行为,都等待5秒退出
  94. log.Println("Start to shutdown server...")
  95. _, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  96. defer cancel()
  97. s.Stop()
  98. log.Println("Shutdown server finished.")
  99. }
  100. func main() {
  101. defer func() {
  102. database.Close()
  103. }()
  104. flag.Parse()
  105. if *version {
  106. showVersion()
  107. os.Exit(1)
  108. }
  109. prepare(*configFile)
  110. run()
  111. return
  112. }