diff --git a/mqtt_client.go b/mqtt_client.go index 024e737..4ba1049 100644 --- a/mqtt_client.go +++ b/mqtt_client.go @@ -43,6 +43,10 @@ var ( topics = map[string]byte{"powerhouse": 1} // Topics to subscribe with QoS 1 db *sql.DB // Database connection ) +func publishError(client mqtt.Client, topic, errorMsg string) { + token := client.Publish(topic, 0, false, errorMsg) + token.Wait() +} // Handles incoming MQTT messages func messageHandler(client mqtt.Client, msg mqtt.Message) { @@ -169,7 +173,7 @@ func messageHandler(client mqtt.Client, msg mqtt.Message) { rows, err := db.Query(paramQuery, mfmMeterID, plantID) if err != nil { - logAndPublishError(client, errorTopic, fmt.Sprintf("❌ Failed to fetch parameters for meter %d: %v", mfmMeterID, err)) + publishError(client, errorTopic, fmt.Sprintf("❌ Failed to fetch parameters for meter %d: %v", mfmMeterID, err)) return } defer rows.Close() @@ -178,25 +182,25 @@ func messageHandler(client mqtt.Client, msg mqtt.Message) { for rows.Next() { var name string if err := rows.Scan(&name); err != nil { - logAndPublishError(client, errorTopic, fmt.Sprintf("❌ Failed to scan parameter name: %v", err)) + publishError(client, errorTopic, fmt.Sprintf("❌ Failed to scan parameter name: %v", err)) return } paramColumns = append(paramColumns, name) } if len(paramColumns) == 0 { - logAndPublishError(client, errorTopic, fmt.Sprintf("❌ No parameters found for meter_id %d", mfmMeterID)) + publishError(client, errorTopic, fmt.Sprintf("❌ No parameters found for meter_id %d", mfmMeterID)) return } // Validate values count if len(values) < len(paramColumns) { - logAndPublishError(client, errorTopic, fmt.Sprintf("❌ Insufficient register values: expected %d, got %d", len(paramColumns), len(values))) + publishError(client, errorTopic, fmt.Sprintf("❌ Insufficient register values: expected %d, got %d", len(paramColumns), len(values))) return } if len(values) > len(paramColumns){ - logAndPublishError(client, errorTopic, fmt.Sprintf("❌ Too many register values: expected %d, got %d", len(paramColumns), len(values))) + publishError(client, errorTopic, fmt.Sprintf("❌ Too many register values: expected %d, got %d", len(paramColumns), len(values))) return } @@ -231,10 +235,68 @@ func messageHandler(client mqtt.Client, msg mqtt.Message) { _, err = db.Exec(insertQuery, args...) if err != nil { - logAndPublishError(client, errorTopic, fmt.Sprintf("❌ Failed to insert into mfm_readings: %v", err)) + publishError(client, errorTopic, fmt.Sprintf("❌ Failed to insert into mfm_readings: %v", err)) return } log.Printf("✅ Inserted into mfm_readings for meter %d with %d parameters", mfmMeterID, len(paramColumns)) -} \ No newline at end of file +} + + +// Handles successful MQTT connection +func connectHandler(client mqtt.Client) { + log.Println("Connected to MQTT broker") +} + +// Handles MQTT connection loss +func connectionLostHandler(client mqtt.Client, err error) { + log.Printf("Connection lost: %v. Attempting to reconnect...", err) +} + +//.. + +func main() { + var err error + godotenv.Load() + // Connect to the database + db, err = sql.Open(dbDriver, dbDataSource) + if err != nil { + log.Fatalf("Database connection failed: %v", err) + } + defer db.Close() + + // Validate database connection + if err := db.Ping(); err != nil { + log.Fatalf("Database ping failed: %v", err) + } + + // Configure MQTT client + opts := mqtt.NewClientOptions(). + AddBroker(mqttBroker). + SetClientID(clientID). + // SetUsername("test"). // todo: Change to .env variable + // SetPassword("test"). // todo: Change to .env variable + SetCleanSession(false). + SetAutoReconnect(true). + SetOrderMatters(false). + SetOnConnectHandler(connectHandler). + SetConnectionLostHandler(connectionLostHandler) + + client := mqtt.NewClient(opts) + if token := client.Connect(); token.Wait() && token.Error() != nil { + log.Fatalf("Failed to connect to MQTT broker: %v", token.Error()) + } + + // Subscribe to topics + for topic, qos := range topics { + if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil { + log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error()) + } + log.Printf("Subscribed to topic: %s", topic) + } + + // Keep the program running + sigChan := make(chan os.Signal, 1) + <-sigChan +}