||
- package mqtt
- import (
- "context"
- "encoding/json"
- "fmt"
- "strings"
- "sync"
- "time"
- "iot-base-station/internal/device"
- "iot-base-station/internal/analytics"
- "iot-base-station/pkg/mqtt"
- MQTT "github.com/eclipse/paho.mqtt.golang"
- "go.uber.org/zap"
- )
- // Adapter MQTT协议适配器
- type Adapter struct {
- client mqtt.Client
- config Config
- logger *zap.Logger
- dataChan chan *device.DeviceData
- eventChan chan *device.DeviceEvent
- mu sync.RWMutex
- devices map[string]bool // 在线设备
- }
- // Config MQTT配置
- type Config struct {
- Broker string `yaml:"broker"`
- ClientID string `yaml:"client_id"`
- Username string `yaml:"username"`
- Password string `yaml:"password"`
- KeepAlive int `yaml:"keep_alive"`
- QoS int `yaml:"qos"`
- Topics Topics `yaml:"topics"`
- }
- // Topics MQTT主题配置
- type Topics struct {
- Data string `yaml:"data"`
- Event string `yaml:"event"`
- Config string `yaml:"config"`
- Firmware string `yaml:"firmware"`
- Response string `yaml:"response"`
- Status string `yaml:"status"`
- }
- // Message MQTT消息
- type Message struct {
- Topic string `json:"topic"`
- Payload map[string]interface{} `json:"payload"`
- QoS byte `json:"qos"`
- Retained bool `json:"retained"`
- }
- // NewAdapter 创建MQTT适配器
- func NewAdapter(config Config, logger *zap.Logger) *Adapter {
- return &Adapter{
- config: config,
- logger: logger,
- dataChan: make(chan *device.DeviceData, 1000),
- eventChan: make(chan *device.DeviceEvent, 100),
- devices: make(map[string]bool),
- }
- }
- // Start 启动MQTT适配器
- func (a *Adapter) Start(ctx context.Context) error {
- // 创建MQTT客户端
- opts := MQTT.NewClientOptions()
- opts.AddBroker(a.config.Broker)
- opts.SetClientID(a.config.ClientID)
- opts.SetUsername(a.config.Username)
- opts.SetPassword(a.config.Password)
- opts.SetKeepAlive(time.Duration(a.config.KeepAlive) * time.Second)
- opts.SetDefaultPublishHandler(a.messageHandler)
- opts.SetOnConnectHandler(a.connectHandler)
- opts.SetConnectionLostHandler(a.connectLostHandler)
- // 连接MQTT代理
- client := MQTT.NewClient(opts)
- if token := client.Connect(); token.Wait() && token.Error() != nil {
- return fmt.Errorf("failed to connect to MQTT broker: %w", token.Error())
- }
- a.client = mqtt.NewClient(client)
- // 订阅主题
- if err := a.subscribeTopics(); err != nil {
- return fmt.Errorf("failed to subscribe topics: %w", err)
- }
- a.logger.Info("MQTT adapter started", zap.String("broker", a.config.Broker))
- // 启动消息处理
- go a.processMessages(ctx)
- return nil
- }
- // Stop 停止MQTT适配器
- func (a *Adapter) Stop() error {
- if a.client != nil {
- a.client.Disconnect(250)
- }
- close(a.dataChan)
- close(a.eventChan)
- a.logger.Info("MQTT adapter stopped")
- return nil
- }
- // Name 返回适配器名称
- func (a *Adapter) Name() string {
- return "mqtt"
- }
- // GetDataChannel 获取数据通道
- func (a *Adapter) GetDataChannel() <-chan *device.DeviceData {
- return a.dataChan
- }
- // GetEventChannel 获取事件通道
- func (a *Adapter) GetEventChannel() <-chan *device.DeviceEvent {
- return a.eventChan
- }
- // SendCommand 发送命令到设备
- func (a *Adapter) SendCommand(deviceID, command string, payload map[string]interface{}) error {
- topic := fmt.Sprintf("%s/%s/%s", a.config.Topics.Config, deviceID, command)
-
- message, err := json.Marshal(payload)
- if err != nil {
- return fmt.Errorf("failed to marshal command: %w", err)
- }
- return a.client.Publish(topic, byte(a.config.QoS), false, message)
- }
- // SendFirmwareUpdate 发送固件更新命令
- func (a *Adapter) SendFirmwareUpdate(deviceID, version, url string) error {
- topic := fmt.Sprintf("%s/%s", a.config.Topics.Firmware, deviceID)
-
- payload := map[string]interface{}{
- "version": version,
- "url": url,
- "command": "update",
- }
- message, err := json.Marshal(payload)
- if err != nil {
- return fmt.Errorf("failed to marshal firmware update: %w", err)
- }
- return a.client.Publish(topic, byte(a.config.QoS), false, message)
- }
- // subscribeTopics 订阅MQTT主题
- func (a *Adapter) subscribeTopics() error {
- topics := []string{
- a.config.Topics.Data + "/+/+",
- a.config.Topics.Event + "/+",
- a.config.Topics.Status + "/+",
- }
- for _, topic := range topics {
- if token := a.client.Subscribe(topic, byte(a.config.QoS), nil); token.Wait() && token.Error() != nil {
- return fmt.Errorf("failed to subscribe to topic %s: %w", topic, token.Error())
- }
- }
- return nil
- }
- // messageHandler 消息处理器
- func (a *Adapter) messageHandler(client MQTT.Client, msg MQTT.Message) {
- topic := msg.Topic()
- payload := msg.Payload()
- a.logger.Debug("Received MQTT message",
- zap.String("topic", topic),
- zap.ByteString("payload", payload))
- // 解析主题
- parts := strings.Split(topic, "/")
- if len(parts) < 3 {
- a.logger.Warn("Invalid topic format", zap.String("topic", topic))
- return
- }
- rootTopic := parts[0]
- deviceID := parts[1]
- // 根据主题类型处理消息
- switch rootTopic {
- case a.config.Topics.Data:
- a.handleDataMessage(deviceID, parts[2], payload)
- case a.config.Topics.Event:
- a.handleEventMessage(deviceID, payload)
- case a.config.Topics.Status:
- a.handleStatusMessage(deviceID, payload)
- default:
- a.logger.Warn("Unknown topic", zap.String("topic", topic))
- }
- }
- // handleDataMessage 处理数据消息
- func (a *Adapter) handleDataMessage(deviceID, dataType string, payload []byte) {
- var data map[string]interface{}
- if err := json.Unmarshal(payload, &data); err != nil {
- a.logger.Error("Failed to unmarshal data message", zap.Error(err))
- return
- }
- // 更新设备在线状态
- a.updateDeviceStatus(deviceID, true)
- // 创建设备数据对象
- deviceData := &device.DeviceData{
- DeviceID: deviceID,
- Timestamp: time.Now(),
- Metrics: data,
- Quality: device.DataQuality{
- Good: true,
- Source: "mqtt",
- },
- }
- // 发送到数据通道
- select {
- case a.dataChan <- deviceData:
- default:
- a.logger.Warn("Data channel full, dropping message", zap.String("device_id", deviceID))
- }
- }
- // handleEventMessage 处理事件消息
- func (a *Adapter) handleEventMessage(deviceID string, payload []byte) {
- var eventData map[string]interface{}
- if err := json.Unmarshal(payload, &eventData); err != nil {
- a.logger.Error("Failed to unmarshal event message", zap.Error(err))
- return
- }
- // 解析事件类型和严重程度
- eventType, _ := eventData["type"].(string)
- severity, _ := eventData["severity"].(string)
- message, _ := eventData["message"].(string)
- // 创建设备事件对象
- deviceEvent := &device.DeviceEvent{
- ID: generateEventID(),
- DeviceID: deviceID,
- Type: device.EventType(eventType),
- Severity: device.EventSeverity(severity),
- Message: message,
- Data: eventData,
- Timestamp: time.Now(),
- }
- // 发送到事件通道
- select {
- case a.eventChan <- deviceEvent:
- default:
- a.logger.Warn("Event channel full, dropping message", zap.String("device_id", deviceID))
- }
- }
- // handleStatusMessage 处理状态消息
- func (a *Adapter) handleStatusMessage(deviceID string, payload []byte) {
- var statusData map[string]interface{}
- if err := json.Unmarshal(payload, &statusData); err != nil {
- a.logger.Error("Failed to unmarshal status message", zap.Error(err))
- return
- }
- // 解析状态
- status, _ := statusData["status"].(string)
- isOnline := status == "online"
- // 更新设备状态
- a.updateDeviceStatus(deviceID, isOnline)
- // 创建状态事件
- deviceEvent := &device.DeviceEvent{
- ID: generateEventID(),
- DeviceID: deviceID,
- Type: device.EventConnect,
- Severity: device.SeverityInfo,
- Message: fmt.Sprintf("设备状态: %s", status),
- Data: statusData,
- Timestamp: time.Now(),
- }
- // 发送到事件通道
- select {
- case a.eventChan <- deviceEvent:
- default:
- a.logger.Warn("Event channel full, dropping message", zap.String("device_id", deviceID))
- }
- }
- // updateDeviceStatus 更新设备状态
- func (a *Adapter) updateDeviceStatus(deviceID string, isOnline bool) {
- a.mu.Lock()
- defer a.mu.Unlock()
- a.devices[deviceID] = isOnline
- }
- // processMessages 处理消息
- func (a *Adapter) processMessages(ctx context.Context) {
- for {
- select {
- case <-ctx.Done():
- return
- default:
- // 这里可以添加消息批处理逻辑
- time.Sleep(100 * time.Millisecond)
- }
- }
- }
- // connectHandler 连接处理器
- func (a *Adapter) connectHandler(client MQTT.Client) {
- a.logger.Info("Connected to MQTT broker")
- }
- // connectLostHandler 连接丢失处理器
- func (a *Adapter) connectLostHandler(client MQTT.Client, err error) {
- a.logger.Error("Connection to MQTT broker lost", zap.Error(err))
- }
- // generateEventID 生成事件ID
- func generateEventID() string {
- return fmt.Sprintf("mqtt_event_%d", time.Now().UnixNano())
- }
|