main.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "os"
  7. "os/signal"
  8. "syscall"
  9. "time"
  10. "iot-base-station/internal/config"
  11. "iot-base-station/internal/device"
  12. "iot-base-station/internal/analytics"
  13. "iot-base-station/internal/notification"
  14. "iot-base-station/pkg/database"
  15. "iot-base-station/pkg/security"
  16. "github.com/gin-gonic/gin"
  17. "go.uber.org/zap"
  18. )
  19. func main() {
  20. // 加载配置
  21. cfg, err := config.LoadConfig("config/config.yaml")
  22. if err != nil {
  23. log.Fatalf("Failed to load config: %v", err)
  24. }
  25. // 初始化日志
  26. logger, err := initLogger(cfg.Logging)
  27. if err != nil {
  28. log.Fatalf("Failed to initialize logger: %v", err)
  29. }
  30. defer logger.Sync()
  31. // 初始化数据库
  32. postgresDB, err := database.InitPostgres(cfg.Database.Postgres)
  33. if err != nil {
  34. logger.Fatal("Failed to initialize PostgreSQL", zap.Error(err))
  35. }
  36. defer postgresDB.Close()
  37. influxDB, err := database.InitInfluxDB(cfg.Database.InfluxDB)
  38. if err != nil {
  39. logger.Fatal("Failed to initialize InfluxDB", zap.Error(err))
  40. }
  41. defer influxDB.Close()
  42. // 初始化Redis
  43. redisClient, err := database.InitRedis(cfg.Redis)
  44. if err != nil {
  45. logger.Fatal("Failed to initialize Redis", zap.Error(err))
  46. }
  47. defer redisClient.Close()
  48. // 初始化JWT服务
  49. jwtService := security.NewJWTService(cfg.JWT.Secret, cfg.JWT.Expire)
  50. // 初始化设备管理器
  51. deviceManager := device.NewManager(postgresDB, redisClient, logger)
  52. // 初始化数据处理器
  53. dataProcessor := analytics.NewProcessor(influxDB, logger)
  54. // 初始化通知服务
  55. notifier := notification.NewNotifier(cfg.Notification, logger)
  56. // 初始化告警系统
  57. alertManager := notification.NewAlertManager(cfg.Alert, notifier, logger)
  58. // 设置Gin模式
  59. gin.SetMode(cfg.Server.Mode)
  60. // 创建Gin引擎
  61. r := gin.New()
  62. // 添加中间件
  63. r.Use(gin.Logger())
  64. r.Use(gin.Recovery())
  65. r.Use(corsMiddleware())
  66. // 设置路由
  67. setupRoutes(r, deviceManager, dataProcessor, alertManager, jwtService, logger)
  68. // 启动后台服务
  69. go startBackgroundServices(deviceManager, dataProcessor, alertManager, cfg, logger)
  70. // 创建HTTP服务器
  71. srv := &http.Server{
  72. Addr: ":" + cfg.Server.Port,
  73. Handler: r,
  74. ReadTimeout: time.Duration(cfg.Server.ReadTimeout) * time.Second,
  75. WriteTimeout: time.Duration(cfg.Server.WriteTimeout) * time.Second,
  76. }
  77. // 启动服务器
  78. go func() {
  79. logger.Info("Starting server", zap.String("port", cfg.Server.Port))
  80. if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  81. logger.Fatal("Failed to start server", zap.Error(err))
  82. }
  83. }()
  84. // 等待中断信号
  85. quit := make(chan os.Signal, 1)
  86. signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
  87. <-quit
  88. logger.Info("Shutting down server...")
  89. // 优雅关闭
  90. ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
  91. defer cancel()
  92. if err := srv.Shutdown(ctx); err != nil {
  93. logger.Fatal("Server forced to shutdown", zap.Error(err))
  94. }
  95. logger.Info("Server exited")
  96. }
  97. func setupRoutes(r *gin.Engine,
  98. deviceManager *device.Manager,
  99. dataProcessor *analytics.Processor,
  100. alertManager *notification.AlertManager,
  101. jwtService *security.JWTService,
  102. logger *zap.Logger) {
  103. // 健康检查
  104. r.GET("/health", func(c *gin.Context) {
  105. c.JSON(200, gin.H{"status": "ok"})
  106. })
  107. // API路由组
  108. api := r.Group("/api/v1")
  109. {
  110. // 设备管理路由
  111. deviceGroup := api.Group("/devices")
  112. {
  113. deviceGroup.POST("", deviceManager.CreateDevice)
  114. deviceGroup.GET("", deviceManager.ListDevices)
  115. deviceGroup.GET("/:id", deviceManager.GetDevice)
  116. deviceGroup.PUT("/:id", deviceManager.UpdateDevice)
  117. deviceGroup.DELETE("/:id", deviceManager.DeleteDevice)
  118. deviceGroup.PUT("/:id/config", deviceManager.UpdateDeviceConfig)
  119. deviceGroup.POST("/:id/firmware", deviceManager.UpdateFirmware)
  120. }
  121. // 数据查询路由
  122. dataGroup := api.Group("/data")
  123. {
  124. dataGroup.GET("/realtime", dataProcessor.GetRealtimeData)
  125. dataGroup.GET("/history", dataProcessor.GetHistoryData)
  126. dataGroup.GET("/statistics", dataProcessor.GetStatistics)
  127. }
  128. // 告警管理路由
  129. alertGroup := api.Group("/alerts")
  130. {
  131. alertGroup.GET("/rules", alertManager.ListRules)
  132. alertGroup.POST("/rules", alertManager.CreateRule)
  133. alertGroup.GET("/rules/:id", alertManager.GetRule)
  134. alertGroup.PUT("/rules/:id", alertManager.UpdateRule)
  135. alertGroup.DELETE("/rules/:id", alertManager.DeleteRule)
  136. alertGroup.GET("", alertManager.ListAlerts)
  137. }
  138. // 认证路由
  139. authGroup := api.Group("/auth")
  140. {
  141. authGroup.POST("/login", func(c *gin.Context) {
  142. // 实现登录逻辑
  143. })
  144. authGroup.POST("/refresh", func(c *gin.Context) {
  145. // 实现令牌刷新逻辑
  146. })
  147. }
  148. }
  149. // WebSocket路由
  150. r.GET("/ws", func(c *gin.Context) {
  151. // 实现WebSocket连接
  152. })
  153. }
  154. func startBackgroundServices(
  155. deviceManager *device.Manager,
  156. dataProcessor *analytics.Processor,
  157. alertManager *notification.AlertManager,
  158. cfg *config.Config,
  159. logger *zap.Logger) {
  160. // 启动设备状态检查
  161. go deviceManager.StartStatusCheck()
  162. // 启动数据处理器
  163. go dataProcessor.Start()
  164. // 启动告警检查
  165. go alertManager.Start()
  166. logger.Info("Background services started")
  167. }
  168. func corsMiddleware() gin.HandlerFunc {
  169. return func(c *gin.Context) {
  170. c.Header("Access-Control-Allow-Origin", "*")
  171. c.Header("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
  172. c.Header("Access-Control-Allow-Headers", "Content-Type, Authorization")
  173. if c.Request.Method == "OPTIONS" {
  174. c.AbortWithStatus(204)
  175. return
  176. }
  177. c.Next()
  178. }
  179. }
  180. func initLogger(logConfig config.LoggingConfig) (*zap.Logger, error) {
  181. var logger *zap.Logger
  182. var err error
  183. if logConfig.Level == "debug" {
  184. logger, err = zap.NewDevelopment()
  185. } else {
  186. logger, err = zap.NewProduction()
  187. }
  188. if err != nil {
  189. return nil, err
  190. }
  191. return logger, nil
  192. }