main.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. package main
  2. import (
  3. "gd_crontab/impl"
  4. "gd_crontab/impl/crontab"
  5. "gd_crontab/impl/warning"
  6. "gd_crontab/rpc_apis"
  7. "flag"
  8. "fmt"
  9. "log"
  10. "os"
  11. "os/signal"
  12. "strings"
  13. "syscall"
  14. "time"
  15. "gd_crontab/common.in/clinit"
  16. "gd_crontab/common.in/config"
  17. "gd_crontab/common.in/logger"
  18. "gd_crontab/common.in/utils"
  19. "github.com/astaxie/beego/orm"
  20. _ "github.com/go-sql-driver/mysql"
  21. //"github.com/gomodule/redigo/redis"
  22. "github.com/smallnest/rpcx/server"
  23. "github.com/smallnest/rpcx/serverplugin"
  24. gconfig "gd_crontab/common.in/config"
  25. "github.com/rcrowley/go-metrics"
  26. "gopkg.in/ini.v1"
  27. )
  28. var (
  29. // 这里可以改默认值
  30. configFile = flag.String("config", "/etc/gd_crontab/app.conf", "config file location")
  31. version = flag.Bool("version", false, "config file location")
  32. GitCommit = "library-import"
  33. Version = "library-import"
  34. localReportAddr = flag.String("rh", "", "")
  35. localReportDebug = flag.Bool("rdebug", false, "")
  36. localReportUser = flag.String("ru", "", "")
  37. localReportPass = flag.String("rp", "", "")
  38. localReportDates = flag.String("rd", "", "")
  39. localReportMerchantId = flag.Int64("rmid", 0, "")
  40. localReportApiId = flag.Int64("raid", 0, "")
  41. localReportProviderApiId = flag.Int64("rpid", 0, "")
  42. localReportApi = flag.Bool("rapi", false, "")
  43. localReportProvider = flag.Bool("rprovider", false, "")
  44. localReportStart = flag.Int64("rstart", 0, "")
  45. localReportEnd = flag.Int64("rend", 0, "")
  46. tec = flag.Bool("tec", false, "")
  47. tes = flag.Bool("tes", false, "")
  48. tem = flag.Bool("tem", false, "")
  49. )
  50. func showVersion() {
  51. fmt.Println("Version: ", Version)
  52. fmt.Println("GitCommit:", GitCommit)
  53. }
  54. func prepare(projectName, runmode, key, logDir string, etcdAddrs []string, discoveryType string) {
  55. // 先行于读配置
  56. var conf *config.Configure
  57. if discoveryType == "k8s" {
  58. conf = config.GetConfigForK8s()
  59. if conf == nil {
  60. fmt.Printf("get conf failed\n\n")
  61. os.Exit(1)
  62. }
  63. rpc_apis.InitForK8s(conf)
  64. } else {
  65. // 先行于读配置
  66. clinit.InitEtcd(etcdAddrs)
  67. conf = config.GetConfig(projectName+"/"+runmode, key, clinit.GetEtcdClient())
  68. if conf == nil {
  69. fmt.Printf("get conf failed\n\n")
  70. os.Exit(1)
  71. }
  72. rpc_apis.Init(etcdAddrs, conf)
  73. }
  74. appname := conf.Rpc.Crontab.Name
  75. // 指定mysql数据库,若无则使用默认数据库
  76. mysqldb := conf.Rpc.Crontab.MysqlDB
  77. if mysqldb == "" {
  78. mysqldb = conf.LogMysql.DefaultDB
  79. }
  80. // 连接数据库服务器
  81. clinit.InitMySQL(&clinit.MysqlConfig{
  82. User: conf.LogMysql.User,
  83. Password: conf.LogMysql.Password,
  84. Addr: conf.LogMysql.Addr,
  85. DB: mysqldb,
  86. Charset: conf.LogMysql.Charset,
  87. MaxIdle: conf.LogMysql.MaxIdle,
  88. MaxConn: conf.LogMysql.MaxConn,
  89. })
  90. // 初始化logger
  91. /*ms, _ := conf.Rpc.AccessLog.Log.MaxSize.Int64()
  92. mb, _ := conf.Rpc.AccessLog.Log.MaxBackups.Int64()
  93. ma, _ := conf.Rpc.AccessLog.Log.MaxAge.Int64()
  94. maxSize := int(ms)
  95. maxBackups := int(mb)
  96. maxAge := int(ma)
  97. disableStacktrace := (conf.Rpc.AccessLog.Log.DisableStacktrace == "true")
  98. commonLogger := logger.InitLogger(runmode, fmt.Sprintf("%s/%s.log", logDir, appname),
  99. maxSize, maxBackups, maxAge, disableStacktrace)
  100. accessLogger := logger.NewInfoLogger(runmode, fmt.Sprintf("%s/%s-access.log", logDir, appname),
  101. maxSize, maxBackups, maxAge)
  102. thirdpartyLogger := logger.NewInfoLogger(runmode, fmt.Sprintf("%s/%s-thirdparty.log", logDir, appname),
  103. maxSize, maxBackups, maxAge)
  104. */
  105. /*if err := mongo.MgoInit(conf.Mongo.Addr, conf.Mongo.User, conf.Mongo.Password); err != nil {
  106. fmt.Printf("init mongo failed:%v, conf.Mongo.Uuser:%s\n", err, conf.Mongo.User)
  107. os.Exit(-1)
  108. }*/
  109. // 设置需要使用logger的地方
  110. ms, _ := conf.Rpc.Crontab.Log.MaxSize.Int64()
  111. mb, _ := conf.Rpc.Crontab.Log.MaxBackups.Int64()
  112. ma, _ := conf.Rpc.Crontab.Log.MaxAge.Int64()
  113. maxSize := int(ms)
  114. maxBackups := int(mb)
  115. maxAge := int(ma)
  116. disableStacktrace := (conf.Rpc.Crontab.Log.DisableStacktrace == "true")
  117. // 通用logger
  118. commonLogger := logger.InitLogger(runmode, fmt.Sprintf("%s/%s.log", logDir, appname), appname, conf.Rpc.Crontab.Log.Level,
  119. maxSize, maxBackups, maxAge, disableStacktrace)
  120. crontab.SetLogger(commonLogger)
  121. impl.RegisterOrmModel()
  122. // common日志
  123. if runmode != "prod" {
  124. orm.Debug = true
  125. //redis.Debug = false
  126. }
  127. }
  128. func start(conf *config.Configure, serveAddr string, etcdAddrs []string) {
  129. s := server.NewServer()
  130. r := &serverplugin.EtcdRegisterPlugin{
  131. ServiceAddress: fmt.Sprintf("tcp@%s", serveAddr),
  132. EtcdServers: etcdAddrs,
  133. BasePath: conf.Rpc.BasePath,
  134. Metrics: metrics.NewRegistry(),
  135. UpdateInterval: time.Minute,
  136. }
  137. if err := r.Start(); err != nil {
  138. fmt.Printf("start failed. error:%s\n\n", err)
  139. os.Exit(1)
  140. }
  141. s.Plugins.Add(r)
  142. s.RegisterName(conf.Rpc.Crontab.Name, new(impl.Rcvr), "")
  143. go func() {
  144. if err := s.Serve("tcp", serveAddr); err != nil {
  145. log.Fatalf("HTTP server listen failed. err: %s\n", err.Error())
  146. }
  147. }()
  148. // 捕获信号
  149. sigChan := make(chan os.Signal, 1)
  150. signal.Notify(sigChan, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
  151. sigValue := <-sigChan
  152. log.Printf("Got a signal:%v", sigValue)
  153. r.Stop()
  154. time.Sleep(5 * time.Second)
  155. log.Println("Shutdown server finished.")
  156. }
  157. func startPeerToPeer(conf *config.Configure, serveAddr string) {
  158. s := server.NewServer()
  159. s.RegisterName(conf.Rpc.Crontab.Name, new(impl.Rcvr), "")
  160. s.Serve("tcp", serveAddr)
  161. }
  162. func main() {
  163. flag.Parse()
  164. if *version {
  165. showVersion()
  166. }
  167. cfg, err := ini.Load(*configFile)
  168. if err != nil {
  169. fmt.Printf("Fail to read file: %v\n\n", err)
  170. os.Exit(1)
  171. }
  172. crontab.TestExceptionCompute = *tec
  173. crontab.TestExceptionSql = *tes
  174. crontab.TestExceptionMail = *tem
  175. //appname := cfg.Section("").Key("appname").String()
  176. logDir := cfg.Section("").Key("log_dir").String()
  177. runmode := cfg.Section("").Key("runmode").String()
  178. //serveAddr := cfg.Section("").Key("serve_addr").String()
  179. etcdAddrs := strings.Split(cfg.Section("").Key("etcd_addrs").String(), ",")
  180. encryptKey := cfg.Section("").Key("encrypt_key").String()
  181. discoveryType := cfg.Section("").Key("discovery_type").String()
  182. projectName := cfg.Section("").Key("project_name").String()
  183. prepare( projectName, runmode, encryptKey, logDir, etcdAddrs, discoveryType)
  184. if *localReportAddr != "" {
  185. db := crontab.GetLocalDb(*localReportUser, *localReportPass, *localReportAddr)
  186. //crontab.GernerateHourReport(*localReportStart, *localReportEnd, *localReportApi, *localReportProvider, db)
  187. crontab.AutoExportTest(db)
  188. os.Exit(0)
  189. }
  190. go utils.Free()
  191. warning.ParseThreshold()
  192. crontab.DeleteLogDayTask()
  193. crontab.AutoExport()
  194. crontab.LogHourTask()
  195. //warning.LoopCheck()
  196. impl.TaskInit()
  197. serveAddr := fmt.Sprintf("%s:%s",gconfig.Conf.Rpc.Crontab.ServiceName,gconfig.Conf.Rpc.Crontab.ServicePort.String())
  198. if discoveryType == "etcd" {
  199. fmt.Println(gconfig.Conf, serveAddr, etcdAddrs)
  200. start(gconfig.Conf, serveAddr, etcdAddrs)
  201. } else {
  202. startPeerToPeer(gconfig.Conf, serveAddr)
  203. }
  204. return
  205. }