Added correct power house logic

This commit is contained in:
jothi
2025-07-29 06:20:28 +00:00
parent 860bea9f0e
commit 1c2e45541c

View File

@@ -1,174 +1,3 @@
// package main
// // import (
// // "database/sql"
// // "encoding/json"
// // "fmt"
// // "log"
// // "os"
// // mqtt "github.com/eclipse/paho.mqtt.golang"
// // "github.com/joho/godotenv"
// // _ "github.com/lib/pq" // PostgreSQL driver
// // )
// import (
// "database/sql"
// "encoding/json"
// "fmt"
// "log"
// "os"
// mqtt "github.com/eclipse/paho.mqtt.golang"
// "github.com/joho/godotenv"
// _ "github.com/lib/pq"
// )
// var (
// mqttBroker string
// dbDriver string
// dbDataSource string
// clientID string
// )
// func init() {
// // Load environment variables from .env file
// if err := godotenv.Load(); err != nil {
// log.Fatalf("Error loading .env file: %v", err)
// }
// // Retrieve environment variables
// mqttBroker = os.Getenv("MQTT_BROKER")
// dbDriver = os.Getenv("DB_DRIVER")
// dbDataSource = os.Getenv("DB_DATASOURCE")
// clientID = os.Getenv("CLIENT_ID")
// // Validate required environment variables
// if mqttBroker == "" || dbDriver == "" || dbDataSource == "" || clientID == "" {
// log.Fatalf("Missing required environment variables")
// }
// }
// var (
// topics = map[string]byte{"test": 1} // Topics to subscribe with QoS 1
// db *sql.DB // Database connection
// )
// // Handles incoming MQTT messages
// func messageHandler(client mqtt.Client, msg mqtt.Message) {
// fmt.Printf("Received message on topic %s: %s\n", msg.Topic(), msg.Payload())
// requiredFields := []string{"temp", "humidity", "pressure", "timestamp"}
// errorTopic := "error_topic"
// // Validate JSON payload
// var jsonData map[string]interface{}
// if err := json.Unmarshal(msg.Payload(), &jsonData); err != nil {
// publishError(client, errorTopic, fmt.Sprintf("Invalid JSON payload on topic %s: %s", msg.Topic(), msg.Payload()))
// return
// }
// // Check for missing fields
// for _, field := range requiredFields {
// if _, exists := jsonData[field]; !exists {
// publishError(client, errorTopic, fmt.Sprintf("Missing field '%s' in payload on topic %s: %s", field, msg.Topic(), msg.Payload()))
// return
// }
// }
// // Create table dynamically if it doesn't exist
// tableName := msg.Topic()
// createTableQuery := fmt.Sprintf(`
// CREATE TABLE IF NOT EXISTS %s (
// id SERIAL PRIMARY KEY,
// temperature FLOAT NOT NULL,
// humidity INT NOT NULL,
// pressure INT NOT NULL,
// timestamp TIMESTAMP NOT NULL
// )`, tableName)
// if _, err := db.Exec(createTableQuery); err != nil {
// publishError(client, errorTopic, fmt.Sprintf("Failed to create table '%s': %v", tableName, err))
// return
// }
// // Insert data into the table
// insertQuery := fmt.Sprintf(`
// INSERT INTO %s (temperature, humidity, pressure, timestamp)
// VALUES ($1, $2, $3, $4)`, tableName)
// _, err := db.Exec(insertQuery, jsonData["temperature"], jsonData["humidity"], jsonData["pressure"], jsonData["timestamp"])
// if err != nil {
// publishError(client, errorTopic, fmt.Sprintf("Failed to insert data into table '%s': %v", tableName, err))
// } else {
// log.Printf("Data successfully inserted into table '%s'.", tableName)
// }
// }
// // Publishes error messages to a specific topic
// func publishError(client mqtt.Client, topic, message string) {
// token := client.Publish(topic, 1, false, message)
// token.Wait()
// if token.Error() != nil {
// log.Printf("Failed to publish error to topic '%s': %v", topic, token.Error())
// }
// }
// // Handles successful MQTT connection
// func connectHandler(client mqtt.Client) {
// log.Println("Connected to MQTT broker")
// }
// // Handles MQTT connection loss
// func connectionLostHandler(client mqtt.Client, err error) {
// log.Printf("Connection lost: %v. Attempting to reconnect...", err)
// }
// func main() {
// var err error
// godotenv.Load()
// // Connect to the database
// db, err = sql.Open(dbDriver, dbDataSource)
// if err != nil {
// log.Fatalf("Database connection failed: %v", err)
// }
// defer db.Close()
// // Validate database connection
// if err := db.Ping(); err != nil {
// log.Fatalf("Database ping failed: %v", err)
// }
// // Configure MQTT client
// opts := mqtt.NewClientOptions().
// AddBroker(mqttBroker).
// SetClientID(clientID).
// SetUsername("test"). // todo: Change to .env variable
// SetPassword("test"). // todo: Change to .env variable
// SetCleanSession(false).
// SetAutoReconnect(true).
// SetOrderMatters(false).
// SetOnConnectHandler(connectHandler).
// SetConnectionLostHandler(connectionLostHandler)
// client := mqtt.NewClient(opts)
// if token := client.Connect(); token.Wait() && token.Error() != nil {
// log.Fatalf("Failed to connect to MQTT broker: %v", token.Error())
// }
// // Subscribe to topics
// for topic, qos := range topics {
// if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil {
// log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error())
// }
// log.Printf("Subscribed to topic: %s", topic)
// }
// // Keep the program running
// sigChan := make(chan os.Signal, 1)
// <-sigChan
// }
//
package main package main
import ( import (
@@ -340,7 +169,7 @@ func messageHandler(client mqtt.Client, msg mqtt.Message) {
rows, err := db.Query(paramQuery, mfmMeterID, plantID) rows, err := db.Query(paramQuery, mfmMeterID, plantID)
if err != nil { if err != nil {
publishError(client, errorTopic, fmt.Sprintf("❌ Failed to fetch parameters for meter %d: %v", mfmMeterID, err)) logAndPublishError(client, errorTopic, fmt.Sprintf("❌ Failed to fetch parameters for meter %d: %v", mfmMeterID, err))
return return
} }
defer rows.Close() defer rows.Close()
@@ -349,25 +178,25 @@ func messageHandler(client mqtt.Client, msg mqtt.Message) {
for rows.Next() { for rows.Next() {
var name string var name string
if err := rows.Scan(&name); err != nil { if err := rows.Scan(&name); err != nil {
publishError(client, errorTopic, fmt.Sprintf("❌ Failed to scan parameter name: %v", err)) logAndPublishError(client, errorTopic, fmt.Sprintf("❌ Failed to scan parameter name: %v", err))
return return
} }
paramColumns = append(paramColumns, name) paramColumns = append(paramColumns, name)
} }
if len(paramColumns) == 0 { if len(paramColumns) == 0 {
publishError(client, errorTopic, fmt.Sprintf("❌ No parameters found for meter_id %d", mfmMeterID)) logAndPublishError(client, errorTopic, fmt.Sprintf("❌ No parameters found for meter_id %d", mfmMeterID))
return return
} }
// Validate values count // Validate values count
if len(values) < len(paramColumns) { if len(values) < len(paramColumns) {
publishError(client, errorTopic, fmt.Sprintf("❌ Insufficient register values: expected %d, got %d", len(paramColumns), len(values))) logAndPublishError(client, errorTopic, fmt.Sprintf("❌ Insufficient register values: expected %d, got %d", len(paramColumns), len(values)))
return return
} }
if len(values) > len(paramColumns){ if len(values) > len(paramColumns){
publishError(client, errorTopic, fmt.Sprintf("❌ Too many register values: expected %d, got %d", len(paramColumns), len(values))) logAndPublishError(client, errorTopic, fmt.Sprintf("❌ Too many register values: expected %d, got %d", len(paramColumns), len(values)))
return return
} }
@@ -402,81 +231,10 @@ func messageHandler(client mqtt.Client, msg mqtt.Message) {
_, err = db.Exec(insertQuery, args...) _, err = db.Exec(insertQuery, args...)
if err != nil { if err != nil {
publishError(client, errorTopic, fmt.Sprintf("❌ Failed to insert into mfm_readings: %v", err)) logAndPublishError(client, errorTopic, fmt.Sprintf("❌ Failed to insert into mfm_readings: %v", err))
return return
} }
log.Printf("✅ Inserted into mfm_readings for meter %d with %d parameters", mfmMeterID, len(paramColumns)) log.Printf("✅ Inserted into mfm_readings for meter %d with %d parameters", mfmMeterID, len(paramColumns))
} }
// Publishes error messages to a specific topic
func publishError(client mqtt.Client, topic, message string) {
token := client.Publish(topic, 1, false, message)
token.Wait()
if token.Error() != nil {
log.Printf("Failed to publish error to topic '%s': %v", topic, token.Error())
}
}
// Handles successful MQTT connection
func connectHandler(client mqtt.Client) {
log.Println("Connected to MQTT broker")
}
// Handles MQTT connection loss
func connectionLostHandler(client mqtt.Client, err error) {
log.Printf("Connection lost: %v. Attempting to reconnect...", err)
}
func main() {
var err error
godotenv.Load()
// Connect to the database
db, err = sql.Open(dbDriver, dbDataSource)
if err != nil {
log.Fatalf("Database connection failed: %v", err)
}
defer db.Close()
// Validate database connection
if err := db.Ping(); err != nil {
log.Fatalf("Database ping failed: %v", err)
}
// Configure MQTT client
opts := mqtt.NewClientOptions().
AddBroker(mqttBroker).
SetClientID(clientID).
SetCleanSession(false).
SetAutoReconnect(true).
SetOrderMatters(false).
SetOnConnectHandler(connectHandler).
SetConnectionLostHandler(connectionLostHandler)
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Fatalf("Failed to connect to MQTT broker: %v", token.Error())
}
// Subscribe to topics
for topic, qos := range topics {
if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil {
log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error())
}
log.Printf("Subscribed to topic: %s", topic)
}
// Keep the program running
sigChan := make(chan os.Signal, 1)
<-sigChan
}