package main import ( "context" "fmt" "log" "os" "os/signal" "syscall" "time" "iot-base-station/internal/config" "iot-base-station/internal/device" "iot-base-station/internal/analytics" "iot-base-station/internal/notification" "iot-base-station/pkg/database" "iot-base-station/pkg/security" "github.com/gin-gonic/gin" "go.uber.org/zap" ) func main() { // 加载配置 cfg, err := config.LoadConfig("config/config.yaml") if err != nil { log.Fatalf("Failed to load config: %v", err) } // 初始化日志 logger, err := initLogger(cfg.Logging) if err != nil { log.Fatalf("Failed to initialize logger: %v", err) } defer logger.Sync() // 初始化数据库 postgresDB, err := database.InitPostgres(cfg.Database.Postgres) if err != nil { logger.Fatal("Failed to initialize PostgreSQL", zap.Error(err)) } defer postgresDB.Close() influxDB, err := database.InitInfluxDB(cfg.Database.InfluxDB) if err != nil { logger.Fatal("Failed to initialize InfluxDB", zap.Error(err)) } defer influxDB.Close() // 初始化Redis redisClient, err := database.InitRedis(cfg.Redis) if err != nil { logger.Fatal("Failed to initialize Redis", zap.Error(err)) } defer redisClient.Close() // 初始化JWT服务 jwtService := security.NewJWTService(cfg.JWT.Secret, cfg.JWT.Expire) // 初始化设备管理器 deviceManager := device.NewManager(postgresDB, redisClient, logger) // 初始化数据处理器 dataProcessor := analytics.NewProcessor(influxDB, logger) // 初始化通知服务 notifier := notification.NewNotifier(cfg.Notification, logger) // 初始化告警系统 alertManager := notification.NewAlertManager(cfg.Alert, notifier, logger) // 设置Gin模式 gin.SetMode(cfg.Server.Mode) // 创建Gin引擎 r := gin.New() // 添加中间件 r.Use(gin.Logger()) r.Use(gin.Recovery()) r.Use(corsMiddleware()) // 设置路由 setupRoutes(r, deviceManager, dataProcessor, alertManager, jwtService, logger) // 启动后台服务 go startBackgroundServices(deviceManager, dataProcessor, alertManager, cfg, logger) // 创建HTTP服务器 srv := &http.Server{ Addr: ":" + cfg.Server.Port, Handler: r, ReadTimeout: time.Duration(cfg.Server.ReadTimeout) * time.Second, WriteTimeout: time.Duration(cfg.Server.WriteTimeout) * time.Second, } // 启动服务器 go func() { logger.Info("Starting server", zap.String("port", cfg.Server.Port)) if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { logger.Fatal("Failed to start server", zap.Error(err)) } }() // 等待中断信号 quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit logger.Info("Shutting down server...") // 优雅关闭 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := srv.Shutdown(ctx); err != nil { logger.Fatal("Server forced to shutdown", zap.Error(err)) } logger.Info("Server exited") } func setupRoutes(r *gin.Engine, deviceManager *device.Manager, dataProcessor *analytics.Processor, alertManager *notification.AlertManager, jwtService *security.JWTService, logger *zap.Logger) { // 健康检查 r.GET("/health", func(c *gin.Context) { c.JSON(200, gin.H{"status": "ok"}) }) // API路由组 api := r.Group("/api/v1") { // 设备管理路由 deviceGroup := api.Group("/devices") { deviceGroup.POST("", deviceManager.CreateDevice) deviceGroup.GET("", deviceManager.ListDevices) deviceGroup.GET("/:id", deviceManager.GetDevice) deviceGroup.PUT("/:id", deviceManager.UpdateDevice) deviceGroup.DELETE("/:id", deviceManager.DeleteDevice) deviceGroup.PUT("/:id/config", deviceManager.UpdateDeviceConfig) deviceGroup.POST("/:id/firmware", deviceManager.UpdateFirmware) } // 数据查询路由 dataGroup := api.Group("/data") { dataGroup.GET("/realtime", dataProcessor.GetRealtimeData) dataGroup.GET("/history", dataProcessor.GetHistoryData) dataGroup.GET("/statistics", dataProcessor.GetStatistics) } // 告警管理路由 alertGroup := api.Group("/alerts") { alertGroup.GET("/rules", alertManager.ListRules) alertGroup.POST("/rules", alertManager.CreateRule) alertGroup.GET("/rules/:id", alertManager.GetRule) alertGroup.PUT("/rules/:id", alertManager.UpdateRule) alertGroup.DELETE("/rules/:id", alertManager.DeleteRule) alertGroup.GET("", alertManager.ListAlerts) } // 认证路由 authGroup := api.Group("/auth") { authGroup.POST("/login", func(c *gin.Context) { // 实现登录逻辑 }) authGroup.POST("/refresh", func(c *gin.Context) { // 实现令牌刷新逻辑 }) } } // WebSocket路由 r.GET("/ws", func(c *gin.Context) { // 实现WebSocket连接 }) } func startBackgroundServices( deviceManager *device.Manager, dataProcessor *analytics.Processor, alertManager *notification.AlertManager, cfg *config.Config, logger *zap.Logger) { // 启动设备状态检查 go deviceManager.StartStatusCheck() // 启动数据处理器 go dataProcessor.Start() // 启动告警检查 go alertManager.Start() logger.Info("Background services started") } func corsMiddleware() gin.HandlerFunc { return func(c *gin.Context) { c.Header("Access-Control-Allow-Origin", "*") c.Header("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") c.Header("Access-Control-Allow-Headers", "Content-Type, Authorization") if c.Request.Method == "OPTIONS" { c.AbortWithStatus(204) return } c.Next() } } func initLogger(logConfig config.LoggingConfig) (*zap.Logger, error) { var logger *zap.Logger var err error if logConfig.Level == "debug" { logger, err = zap.NewDevelopment() } else { logger, err = zap.NewProduction() } if err != nil { return nil, err } return logger, nil }