Added power house logic latest code
This commit is contained in:
@@ -43,6 +43,10 @@ var (
|
|||||||
topics = map[string]byte{"powerhouse": 1} // Topics to subscribe with QoS 1
|
topics = map[string]byte{"powerhouse": 1} // Topics to subscribe with QoS 1
|
||||||
db *sql.DB // Database connection
|
db *sql.DB // Database connection
|
||||||
)
|
)
|
||||||
|
func publishError(client mqtt.Client, topic, errorMsg string) {
|
||||||
|
token := client.Publish(topic, 0, false, errorMsg)
|
||||||
|
token.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
// Handles incoming MQTT messages
|
// Handles incoming MQTT messages
|
||||||
func messageHandler(client mqtt.Client, msg mqtt.Message) {
|
func messageHandler(client mqtt.Client, msg mqtt.Message) {
|
||||||
@@ -169,7 +173,7 @@ func messageHandler(client mqtt.Client, msg mqtt.Message) {
|
|||||||
|
|
||||||
rows, err := db.Query(paramQuery, mfmMeterID, plantID)
|
rows, err := db.Query(paramQuery, mfmMeterID, plantID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logAndPublishError(client, errorTopic, fmt.Sprintf("❌ Failed to fetch parameters for meter %d: %v", mfmMeterID, err))
|
publishError(client, errorTopic, fmt.Sprintf("❌ Failed to fetch parameters for meter %d: %v", mfmMeterID, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
@@ -178,25 +182,25 @@ func messageHandler(client mqtt.Client, msg mqtt.Message) {
|
|||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var name string
|
var name string
|
||||||
if err := rows.Scan(&name); err != nil {
|
if err := rows.Scan(&name); err != nil {
|
||||||
logAndPublishError(client, errorTopic, fmt.Sprintf("❌ Failed to scan parameter name: %v", err))
|
publishError(client, errorTopic, fmt.Sprintf("❌ Failed to scan parameter name: %v", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
paramColumns = append(paramColumns, name)
|
paramColumns = append(paramColumns, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(paramColumns) == 0 {
|
if len(paramColumns) == 0 {
|
||||||
logAndPublishError(client, errorTopic, fmt.Sprintf("❌ No parameters found for meter_id %d", mfmMeterID))
|
publishError(client, errorTopic, fmt.Sprintf("❌ No parameters found for meter_id %d", mfmMeterID))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate values count
|
// Validate values count
|
||||||
if len(values) < len(paramColumns) {
|
if len(values) < len(paramColumns) {
|
||||||
logAndPublishError(client, errorTopic, fmt.Sprintf("❌ Insufficient register values: expected %d, got %d", len(paramColumns), len(values)))
|
publishError(client, errorTopic, fmt.Sprintf("❌ Insufficient register values: expected %d, got %d", len(paramColumns), len(values)))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(values) > len(paramColumns){
|
if len(values) > len(paramColumns){
|
||||||
logAndPublishError(client, errorTopic, fmt.Sprintf("❌ Too many register values: expected %d, got %d", len(paramColumns), len(values)))
|
publishError(client, errorTopic, fmt.Sprintf("❌ Too many register values: expected %d, got %d", len(paramColumns), len(values)))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -231,10 +235,68 @@ func messageHandler(client mqtt.Client, msg mqtt.Message) {
|
|||||||
|
|
||||||
_, err = db.Exec(insertQuery, args...)
|
_, err = db.Exec(insertQuery, args...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logAndPublishError(client, errorTopic, fmt.Sprintf("❌ Failed to insert into mfm_readings: %v", err))
|
publishError(client, errorTopic, fmt.Sprintf("❌ Failed to insert into mfm_readings: %v", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("✅ Inserted into mfm_readings for meter %d with %d parameters", mfmMeterID, len(paramColumns))
|
log.Printf("✅ Inserted into mfm_readings for meter %d with %d parameters", mfmMeterID, len(paramColumns))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Handles successful MQTT connection
|
||||||
|
func connectHandler(client mqtt.Client) {
|
||||||
|
log.Println("Connected to MQTT broker")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handles MQTT connection loss
|
||||||
|
func connectionLostHandler(client mqtt.Client, err error) {
|
||||||
|
log.Printf("Connection lost: %v. Attempting to reconnect...", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
//..
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
var err error
|
||||||
|
godotenv.Load()
|
||||||
|
// Connect to the database
|
||||||
|
db, err = sql.Open(dbDriver, dbDataSource)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Database connection failed: %v", err)
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
// Validate database connection
|
||||||
|
if err := db.Ping(); err != nil {
|
||||||
|
log.Fatalf("Database ping failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Configure MQTT client
|
||||||
|
opts := mqtt.NewClientOptions().
|
||||||
|
AddBroker(mqttBroker).
|
||||||
|
SetClientID(clientID).
|
||||||
|
// SetUsername("test"). // todo: Change to .env variable
|
||||||
|
// SetPassword("test"). // todo: Change to .env variable
|
||||||
|
SetCleanSession(false).
|
||||||
|
SetAutoReconnect(true).
|
||||||
|
SetOrderMatters(false).
|
||||||
|
SetOnConnectHandler(connectHandler).
|
||||||
|
SetConnectionLostHandler(connectionLostHandler)
|
||||||
|
|
||||||
|
client := mqtt.NewClient(opts)
|
||||||
|
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
||||||
|
log.Fatalf("Failed to connect to MQTT broker: %v", token.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe to topics
|
||||||
|
for topic, qos := range topics {
|
||||||
|
if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil {
|
||||||
|
log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error())
|
||||||
|
}
|
||||||
|
log.Printf("Subscribed to topic: %s", topic)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Keep the program running
|
||||||
|
sigChan := make(chan os.Signal, 1)
|
||||||
|
<-sigChan
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user