adapter.go 8.6 KB


  1. package mqtt
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "sync"
  8. "time"
  9. "iot-base-station/internal/device"
  10. "iot-base-station/internal/analytics"
  11. "iot-base-station/pkg/mqtt"
  12. MQTT "github.com/eclipse/paho.mqtt.golang"
  13. "go.uber.org/zap"
  14. )
  15. // Adapter MQTT协议适配器
  16. type Adapter struct {
  17. client mqtt.Client
  18. config Config
  19. logger *zap.Logger
  20. dataChan chan *device.DeviceData
  21. eventChan chan *device.DeviceEvent
  22. mu sync.RWMutex
  23. devices map[string]bool // 在线设备
  24. }
  25. // Config MQTT配置
  26. type Config struct {
  27. Broker string `yaml:"broker"`
  28. ClientID string `yaml:"client_id"`
  29. Username string `yaml:"username"`
  30. Password string `yaml:"password"`
  31. KeepAlive int `yaml:"keep_alive"`
  32. QoS int `yaml:"qos"`
  33. Topics Topics `yaml:"topics"`
  34. }
  35. // Topics MQTT主题配置
  36. type Topics struct {
  37. Data string `yaml:"data"`
  38. Event string `yaml:"event"`
  39. Config string `yaml:"config"`
  40. Firmware string `yaml:"firmware"`
  41. Response string `yaml:"response"`
  42. Status string `yaml:"status"`
  43. }
  44. // Message MQTT消息
  45. type Message struct {
  46. Topic string `json:"topic"`
  47. Payload map[string]interface{} `json:"payload"`
  48. QoS byte `json:"qos"`
  49. Retained bool `json:"retained"`
  50. }
  51. // NewAdapter 创建MQTT适配器
  52. func NewAdapter(config Config, logger *zap.Logger) *Adapter {
  53. return &Adapter{
  54. config: config,
  55. logger: logger,
  56. dataChan: make(chan *device.DeviceData, 1000),
  57. eventChan: make(chan *device.DeviceEvent, 100),
  58. devices: make(map[string]bool),
  59. }
  60. }
  61. // Start 启动MQTT适配器
  62. func (a *Adapter) Start(ctx context.Context) error {
  63. // 创建MQTT客户端
  64. opts := MQTT.NewClientOptions()
  65. opts.AddBroker(a.config.Broker)
  66. opts.SetClientID(a.config.ClientID)
  67. opts.SetUsername(a.config.Username)
  68. opts.SetPassword(a.config.Password)
  69. opts.SetKeepAlive(time.Duration(a.config.KeepAlive) * time.Second)
  70. opts.SetDefaultPublishHandler(a.messageHandler)
  71. opts.SetOnConnectHandler(a.connectHandler)
  72. opts.SetConnectionLostHandler(a.connectLostHandler)
  73. // 连接MQTT代理
  74. client := MQTT.NewClient(opts)
  75. if token := client.Connect(); token.Wait() && token.Error() != nil {
  76. return fmt.Errorf("failed to connect to MQTT broker: %w", token.Error())
  77. }
  78. a.client = mqtt.NewClient(client)
  79. // 订阅主题
  80. if err := a.subscribeTopics(); err != nil {
  81. return fmt.Errorf("failed to subscribe topics: %w", err)
  82. }
  83. a.logger.Info("MQTT adapter started", zap.String("broker", a.config.Broker))
  84. // 启动消息处理
  85. go a.processMessages(ctx)
  86. return nil
  87. }
  88. // Stop 停止MQTT适配器
  89. func (a *Adapter) Stop() error {
  90. if a.client != nil {
  91. a.client.Disconnect(250)
  92. }
  93. close(a.dataChan)
  94. close(a.eventChan)
  95. a.logger.Info("MQTT adapter stopped")
  96. return nil
  97. }
  98. // Name 返回适配器名称
  99. func (a *Adapter) Name() string {
  100. return "mqtt"
  101. }
  102. // GetDataChannel 获取数据通道
  103. func (a *Adapter) GetDataChannel() <-chan *device.DeviceData {
  104. return a.dataChan
  105. }
  106. // GetEventChannel 获取事件通道
  107. func (a *Adapter) GetEventChannel() <-chan *device.DeviceEvent {
  108. return a.eventChan
  109. }
  110. // SendCommand 发送命令到设备
  111. func (a *Adapter) SendCommand(deviceID, command string, payload map[string]interface{}) error {
  112. topic := fmt.Sprintf("%s/%s/%s", a.config.Topics.Config, deviceID, command)
  113. message, err := json.Marshal(payload)
  114. if err != nil {
  115. return fmt.Errorf("failed to marshal command: %w", err)
  116. }
  117. return a.client.Publish(topic, byte(a.config.QoS), false, message)
  118. }
  119. // SendFirmwareUpdate 发送固件更新命令
  120. func (a *Adapter) SendFirmwareUpdate(deviceID, version, url string) error {
  121. topic := fmt.Sprintf("%s/%s", a.config.Topics.Firmware, deviceID)
  122. payload := map[string]interface{}{
  123. "version": version,
  124. "url": url,
  125. "command": "update",
  126. }
  127. message, err := json.Marshal(payload)
  128. if err != nil {
  129. return fmt.Errorf("failed to marshal firmware update: %w", err)
  130. }
  131. return a.client.Publish(topic, byte(a.config.QoS), false, message)
  132. }
  133. // subscribeTopics 订阅MQTT主题
  134. func (a *Adapter) subscribeTopics() error {
  135. topics := []string{
  136. a.config.Topics.Data + "/+/+",
  137. a.config.Topics.Event + "/+",
  138. a.config.Topics.Status + "/+",
  139. }
  140. for _, topic := range topics {
  141. if token := a.client.Subscribe(topic, byte(a.config.QoS), nil); token.Wait() && token.Error() != nil {
  142. return fmt.Errorf("failed to subscribe to topic %s: %w", topic, token.Error())
  143. }
  144. }
  145. return nil
  146. }
  147. // messageHandler 消息处理器
  148. func (a *Adapter) messageHandler(client MQTT.Client, msg MQTT.Message) {
  149. topic := msg.Topic()
  150. payload := msg.Payload()
  151. a.logger.Debug("Received MQTT message",
  152. zap.String("topic", topic),
  153. zap.ByteString("payload", payload))
  154. // 解析主题
  155. parts := strings.Split(topic, "/")
  156. if len(parts) < 3 {
  157. a.logger.Warn("Invalid topic format", zap.String("topic", topic))
  158. return
  159. }
  160. rootTopic := parts[0]
  161. deviceID := parts[1]
  162. // 根据主题类型处理消息
  163. switch rootTopic {
  164. case a.config.Topics.Data:
  165. a.handleDataMessage(deviceID, parts[2], payload)
  166. case a.config.Topics.Event:
  167. a.handleEventMessage(deviceID, payload)
  168. case a.config.Topics.Status:
  169. a.handleStatusMessage(deviceID, payload)
  170. default:
  171. a.logger.Warn("Unknown topic", zap.String("topic", topic))
  172. }
  173. }
  174. // handleDataMessage 处理数据消息
  175. func (a *Adapter) handleDataMessage(deviceID, dataType string, payload []byte) {
  176. var data map[string]interface{}
  177. if err := json.Unmarshal(payload, &data); err != nil {
  178. a.logger.Error("Failed to unmarshal data message", zap.Error(err))
  179. return
  180. }
  181. // 更新设备在线状态
  182. a.updateDeviceStatus(deviceID, true)
  183. // 创建设备数据对象
  184. deviceData := &device.DeviceData{
  185. DeviceID: deviceID,
  186. Timestamp: time.Now(),
  187. Metrics: data,
  188. Quality: device.DataQuality{
  189. Good: true,
  190. Source: "mqtt",
  191. },
  192. }
  193. // 发送到数据通道
  194. select {
  195. case a.dataChan <- deviceData:
  196. default:
  197. a.logger.Warn("Data channel full, dropping message", zap.String("device_id", deviceID))
  198. }
  199. }
  200. // handleEventMessage 处理事件消息
  201. func (a *Adapter) handleEventMessage(deviceID string, payload []byte) {
  202. var eventData map[string]interface{}
  203. if err := json.Unmarshal(payload, &eventData); err != nil {
  204. a.logger.Error("Failed to unmarshal event message", zap.Error(err))
  205. return
  206. }
  207. // 解析事件类型和严重程度
  208. eventType, _ := eventData["type"].(string)
  209. severity, _ := eventData["severity"].(string)
  210. message, _ := eventData["message"].(string)
  211. // 创建设备事件对象
  212. deviceEvent := &device.DeviceEvent{
  213. ID: generateEventID(),
  214. DeviceID: deviceID,
  215. Type: device.EventType(eventType),
  216. Severity: device.EventSeverity(severity),
  217. Message: message,
  218. Data: eventData,
  219. Timestamp: time.Now(),
  220. }
  221. // 发送到事件通道
  222. select {
  223. case a.eventChan <- deviceEvent:
  224. default:
  225. a.logger.Warn("Event channel full, dropping message", zap.String("device_id", deviceID))
  226. }
  227. }
  228. // handleStatusMessage 处理状态消息
  229. func (a *Adapter) handleStatusMessage(deviceID string, payload []byte) {
  230. var statusData map[string]interface{}
  231. if err := json.Unmarshal(payload, &statusData); err != nil {
  232. a.logger.Error("Failed to unmarshal status message", zap.Error(err))
  233. return
  234. }
  235. // 解析状态
  236. status, _ := statusData["status"].(string)
  237. isOnline := status == "online"
  238. // 更新设备状态
  239. a.updateDeviceStatus(deviceID, isOnline)
  240. // 创建状态事件
  241. deviceEvent := &device.DeviceEvent{
  242. ID: generateEventID(),
  243. DeviceID: deviceID,
  244. Type: device.EventConnect,
  245. Severity: device.SeverityInfo,
  246. Message: fmt.Sprintf("设备状态: %s", status),
  247. Data: statusData,
  248. Timestamp: time.Now(),
  249. }
  250. // 发送到事件通道
  251. select {
  252. case a.eventChan <- deviceEvent:
  253. default:
  254. a.logger.Warn("Event channel full, dropping message", zap.String("device_id", deviceID))
  255. }
  256. }
  257. // updateDeviceStatus 更新设备状态
  258. func (a *Adapter) updateDeviceStatus(deviceID string, isOnline bool) {
  259. a.mu.Lock()
  260. defer a.mu.Unlock()
  261. a.devices[deviceID] = isOnline
  262. }
  263. // processMessages 处理消息
  264. func (a *Adapter) processMessages(ctx context.Context) {
  265. for {
  266. select {
  267. case <-ctx.Done():
  268. return
  269. default:
  270. // 这里可以添加消息批处理逻辑
  271. time.Sleep(100 * time.Millisecond)
  272. }
  273. }
  274. }
  275. // connectHandler 连接处理器
  276. func (a *Adapter) connectHandler(client MQTT.Client) {
  277. a.logger.Info("Connected to MQTT broker")
  278. }
  279. // connectLostHandler 连接丢失处理器
  280. func (a *Adapter) connectLostHandler(client MQTT.Client, err error) {
  281. a.logger.Error("Connection to MQTT broker lost", zap.Error(err))
  282. }
  283. // generateEventID 生成事件ID
  284. func generateEventID() string {
  285. return fmt.Sprintf("mqtt_event_%d", time.Now().UnixNano())
  286. }