Compare commits

..

14 Commits

Author SHA1 Message Date
acec9d72b4 Merge branch 'master' into dev-mqtt 2025-08-06 05:51:44 +00:00
jothi
64a8a32d1a removed unwanted line for checking branch 2025-08-06 05:40:39 +00:00
89786c9dfa Merge pull request 'Remove commented line for checking git branch' (#1) from dev-mqtt into master
Reviewed-on: #1
2025-08-06 05:28:29 +00:00
jothi
bd68af1f58 Remove commented line for checking git branch 2025-08-06 05:17:46 +00:00
jothi
f6d3908088 remove done comment line for checking 2025-07-31 08:19:51 +00:00
jothi
5aad1d7d44 checking git commits 2025-07-31 07:55:32 +00:00
jothi
02fe9ea45e Removed commented l;ines for checking 2025-07-31 04:23:58 +00:00
jothi
edfdc3699d removed commented line 2025-07-31 04:03:16 +00:00
jothi
dbcb05d017 Added comment line for checking 2025-07-30 04:59:36 +00:00
jothi
3fd408d3ee Added commented line for unwanted 2025-07-29 08:43:54 +00:00
jothi
36142de600 Added power house logic latest code 2025-07-29 06:51:57 +00:00
jothi
1c2e45541c Added correct power house logic 2025-07-29 06:20:28 +00:00
jothi
860bea9f0e removed unwanted lines 2025-07-29 05:56:29 +00:00
ranjith
516624fd8d added commented line 2025-07-29 05:27:09 +00:00

View File

@@ -43,6 +43,10 @@ var (
topics = map[string]byte{"powerhouse": 1} // Topics to subscribe with QoS 1 topics = map[string]byte{"powerhouse": 1} // Topics to subscribe with QoS 1
db *sql.DB // Database connection db *sql.DB // Database connection
) )
func publishError(client mqtt.Client, topic, errorMsg string) {
token := client.Publish(topic, 0, false, errorMsg)
token.Wait()
}
// Handles incoming MQTT messages // Handles incoming MQTT messages
func messageHandler(client mqtt.Client, msg mqtt.Message) { func messageHandler(client mqtt.Client, msg mqtt.Message) {
@@ -95,7 +99,7 @@ func messageHandler(client mqtt.Client, msg mqtt.Message) {
return return
} }
// Check for duplicates
var exists int var exists int
existsQuery := `SELECT 1 FROM temp_live_readings WHERE created_at = $1 AND mfm_meter_id = $2 LIMIT 1` existsQuery := `SELECT 1 FROM temp_live_readings WHERE created_at = $1 AND mfm_meter_id = $2 LIMIT 1`
err = db.QueryRow(existsQuery, payload.CreatedAt, mfmMeterID).Scan(&exists) err = db.QueryRow(existsQuery, payload.CreatedAt, mfmMeterID).Scan(&exists)
@@ -123,6 +127,7 @@ func messageHandler(client mqtt.Client, msg mqtt.Message) {
publishError(client, errorTopic, fmt.Sprintf("❌ DB insert failed: %v", err)) publishError(client, errorTopic, fmt.Sprintf("❌ DB insert failed: %v", err))
return return
} }
//..
log.Printf("✅ Data inserted successfully for plant ID %d and mfm_meter_id %d", plantID, mfmMeterID) log.Printf("✅ Data inserted successfully for plant ID %d and mfm_meter_id %d", plantID, mfmMeterID)
@@ -200,7 +205,6 @@ func messageHandler(client mqtt.Client, msg mqtt.Message) {
return return
} }
// Build insert statement
columns := []string{"plant_id", "mfm_meter_id"} columns := []string{"plant_id", "mfm_meter_id"}
placeholders := []string{"$1", "$2"} placeholders := []string{"$1", "$2"}
args := []interface{}{plantID, mfmMeterID} args := []interface{}{plantID, mfmMeterID}
@@ -240,15 +244,6 @@ func messageHandler(client mqtt.Client, msg mqtt.Message) {
} }
// Publishes error messages to a specific topic
func publishError(client mqtt.Client, topic, message string) {
token := client.Publish(topic, 1, false, message)
token.Wait()
if token.Error() != nil {
log.Printf("Failed to publish error to topic '%s': %v", topic, token.Error())
}
}
// Handles successful MQTT connection // Handles successful MQTT connection
func connectHandler(client mqtt.Client) { func connectHandler(client mqtt.Client) {
log.Println("Connected to MQTT broker") log.Println("Connected to MQTT broker")
@@ -278,8 +273,8 @@ func main() {
opts := mqtt.NewClientOptions(). opts := mqtt.NewClientOptions().
AddBroker(mqttBroker). AddBroker(mqttBroker).
SetClientID(clientID). SetClientID(clientID).
// SetUsername("test"). // todo: Change to .env variable // SetUsername("test").
// SetPassword("test"). // todo: Change to .env variable // SetPassword("test").
SetCleanSession(false). SetCleanSession(false).
SetAutoReconnect(true). SetAutoReconnect(true).
SetOrderMatters(false). SetOrderMatters(false).
@@ -291,7 +286,6 @@ func main() {
log.Fatalf("Failed to connect to MQTT broker: %v", token.Error()) log.Fatalf("Failed to connect to MQTT broker: %v", token.Error())
} }
// Subscribe to topics
for topic, qos := range topics { for topic, qos := range topics {
if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil { if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil {
log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error()) log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error())
@@ -299,15 +293,8 @@ func main() {
log.Printf("Subscribed to topic: %s", topic) log.Printf("Subscribed to topic: %s", topic)
} }
// Keep the program running
sigChan := make(chan os.Signal, 1) sigChan := make(chan os.Signal, 1)
<-sigChan <-sigChan
} }