Compare commits
14 Commits
b1609bb4d4
...
dev-mqtt
| Author | SHA1 | Date | |
|---|---|---|---|
| acec9d72b4 | |||
|
|
64a8a32d1a | ||
| 89786c9dfa | |||
|
|
bd68af1f58 | ||
|
|
f6d3908088 | ||
|
|
5aad1d7d44 | ||
|
|
02fe9ea45e | ||
|
|
edfdc3699d | ||
|
|
dbcb05d017 | ||
|
|
3fd408d3ee | ||
|
|
36142de600 | ||
|
|
1c2e45541c | ||
|
|
860bea9f0e | ||
|
|
516624fd8d |
@@ -43,6 +43,10 @@ var (
|
||||
topics = map[string]byte{"powerhouse": 1} // Topics to subscribe with QoS 1
|
||||
db *sql.DB // Database connection
|
||||
)
|
||||
func publishError(client mqtt.Client, topic, errorMsg string) {
|
||||
token := client.Publish(topic, 0, false, errorMsg)
|
||||
token.Wait()
|
||||
}
|
||||
|
||||
// Handles incoming MQTT messages
|
||||
func messageHandler(client mqtt.Client, msg mqtt.Message) {
|
||||
@@ -95,7 +99,7 @@ func messageHandler(client mqtt.Client, msg mqtt.Message) {
|
||||
return
|
||||
}
|
||||
|
||||
// Check for duplicates
|
||||
|
||||
var exists int
|
||||
existsQuery := `SELECT 1 FROM temp_live_readings WHERE created_at = $1 AND mfm_meter_id = $2 LIMIT 1`
|
||||
err = db.QueryRow(existsQuery, payload.CreatedAt, mfmMeterID).Scan(&exists)
|
||||
@@ -123,6 +127,7 @@ func messageHandler(client mqtt.Client, msg mqtt.Message) {
|
||||
publishError(client, errorTopic, fmt.Sprintf("❌ DB insert failed: %v", err))
|
||||
return
|
||||
}
|
||||
//..
|
||||
|
||||
log.Printf("✅ Data inserted successfully for plant ID %d and mfm_meter_id %d", plantID, mfmMeterID)
|
||||
|
||||
@@ -200,7 +205,6 @@ func messageHandler(client mqtt.Client, msg mqtt.Message) {
|
||||
return
|
||||
}
|
||||
|
||||
// Build insert statement
|
||||
columns := []string{"plant_id", "mfm_meter_id"}
|
||||
placeholders := []string{"$1", "$2"}
|
||||
args := []interface{}{plantID, mfmMeterID}
|
||||
@@ -240,15 +244,6 @@ func messageHandler(client mqtt.Client, msg mqtt.Message) {
|
||||
}
|
||||
|
||||
|
||||
// Publishes error messages to a specific topic
|
||||
func publishError(client mqtt.Client, topic, message string) {
|
||||
token := client.Publish(topic, 1, false, message)
|
||||
token.Wait()
|
||||
if token.Error() != nil {
|
||||
log.Printf("Failed to publish error to topic '%s': %v", topic, token.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// Handles successful MQTT connection
|
||||
func connectHandler(client mqtt.Client) {
|
||||
log.Println("Connected to MQTT broker")
|
||||
@@ -278,8 +273,8 @@ func main() {
|
||||
opts := mqtt.NewClientOptions().
|
||||
AddBroker(mqttBroker).
|
||||
SetClientID(clientID).
|
||||
// SetUsername("test"). // todo: Change to .env variable
|
||||
// SetPassword("test"). // todo: Change to .env variable
|
||||
// SetUsername("test").
|
||||
// SetPassword("test").
|
||||
SetCleanSession(false).
|
||||
SetAutoReconnect(true).
|
||||
SetOrderMatters(false).
|
||||
@@ -291,7 +286,6 @@ func main() {
|
||||
log.Fatalf("Failed to connect to MQTT broker: %v", token.Error())
|
||||
}
|
||||
|
||||
// Subscribe to topics
|
||||
for topic, qos := range topics {
|
||||
if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil {
|
||||
log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error())
|
||||
@@ -299,15 +293,8 @@ func main() {
|
||||
log.Printf("Subscribed to topic: %s", topic)
|
||||
}
|
||||
|
||||
// Keep the program running
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
<-sigChan
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user