package mqtt import ( "context" "encoding/json" "fmt" "strings" "sync" "time" "iot-base-station/internal/device" "iot-base-station/internal/analytics" "iot-base-station/pkg/mqtt" MQTT "github.com/eclipse/paho.mqtt.golang" "go.uber.org/zap" ) // Adapter MQTT协议适配器 type Adapter struct { client mqtt.Client config Config logger *zap.Logger dataChan chan *device.DeviceData eventChan chan *device.DeviceEvent mu sync.RWMutex devices map[string]bool // 在线设备 } // Config MQTT配置 type Config struct { Broker string `yaml:"broker"` ClientID string `yaml:"client_id"` Username string `yaml:"username"` Password string `yaml:"password"` KeepAlive int `yaml:"keep_alive"` QoS int `yaml:"qos"` Topics Topics `yaml:"topics"` } // Topics MQTT主题配置 type Topics struct { Data string `yaml:"data"` Event string `yaml:"event"` Config string `yaml:"config"` Firmware string `yaml:"firmware"` Response string `yaml:"response"` Status string `yaml:"status"` } // Message MQTT消息 type Message struct { Topic string `json:"topic"` Payload map[string]interface{} `json:"payload"` QoS byte `json:"qos"` Retained bool `json:"retained"` } // NewAdapter 创建MQTT适配器 func NewAdapter(config Config, logger *zap.Logger) *Adapter { return &Adapter{ config: config, logger: logger, dataChan: make(chan *device.DeviceData, 1000), eventChan: make(chan *device.DeviceEvent, 100), devices: make(map[string]bool), } } // Start 启动MQTT适配器 func (a *Adapter) Start(ctx context.Context) error { // 创建MQTT客户端 opts := MQTT.NewClientOptions() opts.AddBroker(a.config.Broker) opts.SetClientID(a.config.ClientID) opts.SetUsername(a.config.Username) opts.SetPassword(a.config.Password) opts.SetKeepAlive(time.Duration(a.config.KeepAlive) * time.Second) opts.SetDefaultPublishHandler(a.messageHandler) opts.SetOnConnectHandler(a.connectHandler) opts.SetConnectionLostHandler(a.connectLostHandler) // 连接MQTT代理 client := MQTT.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { return fmt.Errorf("failed to connect to MQTT broker: %w", token.Error()) } a.client = mqtt.NewClient(client) // 订阅主题 if err := a.subscribeTopics(); err != nil { return fmt.Errorf("failed to subscribe topics: %w", err) } a.logger.Info("MQTT adapter started", zap.String("broker", a.config.Broker)) // 启动消息处理 go a.processMessages(ctx) return nil } // Stop 停止MQTT适配器 func (a *Adapter) Stop() error { if a.client != nil { a.client.Disconnect(250) } close(a.dataChan) close(a.eventChan) a.logger.Info("MQTT adapter stopped") return nil } // Name 返回适配器名称 func (a *Adapter) Name() string { return "mqtt" } // GetDataChannel 获取数据通道 func (a *Adapter) GetDataChannel() <-chan *device.DeviceData { return a.dataChan } // GetEventChannel 获取事件通道 func (a *Adapter) GetEventChannel() <-chan *device.DeviceEvent { return a.eventChan } // SendCommand 发送命令到设备 func (a *Adapter) SendCommand(deviceID, command string, payload map[string]interface{}) error { topic := fmt.Sprintf("%s/%s/%s", a.config.Topics.Config, deviceID, command) message, err := json.Marshal(payload) if err != nil { return fmt.Errorf("failed to marshal command: %w", err) } return a.client.Publish(topic, byte(a.config.QoS), false, message) } // SendFirmwareUpdate 发送固件更新命令 func (a *Adapter) SendFirmwareUpdate(deviceID, version, url string) error { topic := fmt.Sprintf("%s/%s", a.config.Topics.Firmware, deviceID) payload := map[string]interface{}{ "version": version, "url": url, "command": "update", } message, err := json.Marshal(payload) if err != nil { return fmt.Errorf("failed to marshal firmware update: %w", err) } return a.client.Publish(topic, byte(a.config.QoS), false, message) } // subscribeTopics 订阅MQTT主题 func (a *Adapter) subscribeTopics() error { topics := []string{ a.config.Topics.Data + "/+/+", a.config.Topics.Event + "/+", a.config.Topics.Status + "/+", } for _, topic := range topics { if token := a.client.Subscribe(topic, byte(a.config.QoS), nil); token.Wait() && token.Error() != nil { return fmt.Errorf("failed to subscribe to topic %s: %w", topic, token.Error()) } } return nil } // messageHandler 消息处理器 func (a *Adapter) messageHandler(client MQTT.Client, msg MQTT.Message) { topic := msg.Topic() payload := msg.Payload() a.logger.Debug("Received MQTT message", zap.String("topic", topic), zap.ByteString("payload", payload)) // 解析主题 parts := strings.Split(topic, "/") if len(parts) < 3 { a.logger.Warn("Invalid topic format", zap.String("topic", topic)) return } rootTopic := parts[0] deviceID := parts[1] // 根据主题类型处理消息 switch rootTopic { case a.config.Topics.Data: a.handleDataMessage(deviceID, parts[2], payload) case a.config.Topics.Event: a.handleEventMessage(deviceID, payload) case a.config.Topics.Status: a.handleStatusMessage(deviceID, payload) default: a.logger.Warn("Unknown topic", zap.String("topic", topic)) } } // handleDataMessage 处理数据消息 func (a *Adapter) handleDataMessage(deviceID, dataType string, payload []byte) { var data map[string]interface{} if err := json.Unmarshal(payload, &data); err != nil { a.logger.Error("Failed to unmarshal data message", zap.Error(err)) return } // 更新设备在线状态 a.updateDeviceStatus(deviceID, true) // 创建设备数据对象 deviceData := &device.DeviceData{ DeviceID: deviceID, Timestamp: time.Now(), Metrics: data, Quality: device.DataQuality{ Good: true, Source: "mqtt", }, } // 发送到数据通道 select { case a.dataChan <- deviceData: default: a.logger.Warn("Data channel full, dropping message", zap.String("device_id", deviceID)) } } // handleEventMessage 处理事件消息 func (a *Adapter) handleEventMessage(deviceID string, payload []byte) { var eventData map[string]interface{} if err := json.Unmarshal(payload, &eventData); err != nil { a.logger.Error("Failed to unmarshal event message", zap.Error(err)) return } // 解析事件类型和严重程度 eventType, _ := eventData["type"].(string) severity, _ := eventData["severity"].(string) message, _ := eventData["message"].(string) // 创建设备事件对象 deviceEvent := &device.DeviceEvent{ ID: generateEventID(), DeviceID: deviceID, Type: device.EventType(eventType), Severity: device.EventSeverity(severity), Message: message, Data: eventData, Timestamp: time.Now(), } // 发送到事件通道 select { case a.eventChan <- deviceEvent: default: a.logger.Warn("Event channel full, dropping message", zap.String("device_id", deviceID)) } } // handleStatusMessage 处理状态消息 func (a *Adapter) handleStatusMessage(deviceID string, payload []byte) { var statusData map[string]interface{} if err := json.Unmarshal(payload, &statusData); err != nil { a.logger.Error("Failed to unmarshal status message", zap.Error(err)) return } // 解析状态 status, _ := statusData["status"].(string) isOnline := status == "online" // 更新设备状态 a.updateDeviceStatus(deviceID, isOnline) // 创建状态事件 deviceEvent := &device.DeviceEvent{ ID: generateEventID(), DeviceID: deviceID, Type: device.EventConnect, Severity: device.SeverityInfo, Message: fmt.Sprintf("设备状态: %s", status), Data: statusData, Timestamp: time.Now(), } // 发送到事件通道 select { case a.eventChan <- deviceEvent: default: a.logger.Warn("Event channel full, dropping message", zap.String("device_id", deviceID)) } } // updateDeviceStatus 更新设备状态 func (a *Adapter) updateDeviceStatus(deviceID string, isOnline bool) { a.mu.Lock() defer a.mu.Unlock() a.devices[deviceID] = isOnline } // processMessages 处理消息 func (a *Adapter) processMessages(ctx context.Context) { for { select { case <-ctx.Done(): return default: // 这里可以添加消息批处理逻辑 time.Sleep(100 * time.Millisecond) } } } // connectHandler 连接处理器 func (a *Adapter) connectHandler(client MQTT.Client) { a.logger.Info("Connected to MQTT broker") } // connectLostHandler 连接丢失处理器 func (a *Adapter) connectLostHandler(client MQTT.Client, err error) { a.logger.Error("Connection to MQTT broker lost", zap.Error(err)) } // generateEventID 生成事件ID func generateEventID() string { return fmt.Sprintf("mqtt_event_%d", time.Now().UnixNano()) }