Added fpower house functionality

This commit is contained in:
ranjith
2025-07-25 12:55:48 +00:00
parent 8815872544
commit b1609bb4d4

View File

@@ -1,409 +1,113 @@
// package main
// // import (
// // "database/sql"
// // "encoding/json"
// // "fmt"
// // "log"
// // "os"
// // mqtt "github.com/eclipse/paho.mqtt.golang"
// // "github.com/joho/godotenv"
// // _ "github.com/lib/pq" // PostgreSQL driver
// // )
// import (
// "database/sql"
// "encoding/json"
// "fmt"
// "log"
// "os"
// mqtt "github.com/eclipse/paho.mqtt.golang"
// "github.com/joho/godotenv"
// _ "github.com/lib/pq"
// )
// var (
// mqttBroker string
// dbDriver string
// dbDataSource string
// clientID string
// )
// func init() {
// // Load environment variables from .env file
// if err := godotenv.Load(); err != nil {
// log.Fatalf("Error loading .env file: %v", err)
// }
// // Retrieve environment variables
// mqttBroker = os.Getenv("MQTT_BROKER")
// dbDriver = os.Getenv("DB_DRIVER")
// dbDataSource = os.Getenv("DB_DATASOURCE")
// clientID = os.Getenv("CLIENT_ID")
// // Validate required environment variables
// if mqttBroker == "" || dbDriver == "" || dbDataSource == "" || clientID == "" {
// log.Fatalf("Missing required environment variables")
// }
// }
// var (
// topics = map[string]byte{"test": 1} // Topics to subscribe with QoS 1
// db *sql.DB // Database connection
// )
// // Handles incoming MQTT messages
// func messageHandler(client mqtt.Client, msg mqtt.Message) {
// fmt.Printf("Received message on topic %s: %s\n", msg.Topic(), msg.Payload())
// requiredFields := []string{"temp", "humidity", "pressure", "timestamp"}
// errorTopic := "error_topic"
// // Validate JSON payload
// var jsonData map[string]interface{}
// if err := json.Unmarshal(msg.Payload(), &jsonData); err != nil {
// publishError(client, errorTopic, fmt.Sprintf("Invalid JSON payload on topic %s: %s", msg.Topic(), msg.Payload()))
// return
// }
// // Check for missing fields
// for _, field := range requiredFields {
// if _, exists := jsonData[field]; !exists {
// publishError(client, errorTopic, fmt.Sprintf("Missing field '%s' in payload on topic %s: %s", field, msg.Topic(), msg.Payload()))
// return
// }
// }
// // Create table dynamically if it doesn't exist
// tableName := msg.Topic()
// createTableQuery := fmt.Sprintf(`
// CREATE TABLE IF NOT EXISTS %s (
// id SERIAL PRIMARY KEY,
// temperature FLOAT NOT NULL,
// humidity INT NOT NULL,
// pressure INT NOT NULL,
// timestamp TIMESTAMP NOT NULL
// )`, tableName)
// if _, err := db.Exec(createTableQuery); err != nil {
// publishError(client, errorTopic, fmt.Sprintf("Failed to create table '%s': %v", tableName, err))
// return
// }
// // Insert data into the table
// insertQuery := fmt.Sprintf(`
// INSERT INTO %s (temperature, humidity, pressure, timestamp)
// VALUES ($1, $2, $3, $4)`, tableName)
// _, err := db.Exec(insertQuery, jsonData["temperature"], jsonData["humidity"], jsonData["pressure"], jsonData["timestamp"])
// if err != nil {
// publishError(client, errorTopic, fmt.Sprintf("Failed to insert data into table '%s': %v", tableName, err))
// } else {
// log.Printf("Data successfully inserted into table '%s'.", tableName)
// }
// }
// // Publishes error messages to a specific topic
// func publishError(client mqtt.Client, topic, message string) {
// token := client.Publish(topic, 1, false, message)
// token.Wait()
// if token.Error() != nil {
// log.Printf("Failed to publish error to topic '%s': %v", topic, token.Error())
// }
// }
// // Handles successful MQTT connection
// func connectHandler(client mqtt.Client) {
// log.Println("Connected to MQTT broker")
// }
// // Handles MQTT connection loss
// func connectionLostHandler(client mqtt.Client, err error) {
// log.Printf("Connection lost: %v. Attempting to reconnect...", err)
// }
// func main() {
// var err error
// godotenv.Load()
// // Connect to the database
// db, err = sql.Open(dbDriver, dbDataSource)
// if err != nil {
// log.Fatalf("Database connection failed: %v", err)
// }
// defer db.Close()
// // Validate database connection
// if err := db.Ping(); err != nil {
// log.Fatalf("Database ping failed: %v", err)
// }
// // Configure MQTT client
// opts := mqtt.NewClientOptions().
// AddBroker(mqttBroker).
// SetClientID(clientID).
// SetUsername("test"). // todo: Change to .env variable
// SetPassword("test"). // todo: Change to .env variable
// SetCleanSession(false).
// SetAutoReconnect(true).
// SetOrderMatters(false).
// SetOnConnectHandler(connectHandler).
// SetConnectionLostHandler(connectionLostHandler)
// client := mqtt.NewClient(opts)
// if token := client.Connect(); token.Wait() && token.Error() != nil {
// log.Fatalf("Failed to connect to MQTT broker: %v", token.Error())
// }
// // Subscribe to topics
// for topic, qos := range topics {
// if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil {
// log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error())
// }
// log.Printf("Subscribed to topic: %s", topic)
// }
// // Keep the program running
// sigChan := make(chan os.Signal, 1)
// <-sigChan
// }
//
package main package main
import ( import (
"encoding/json"
"database/sql" "database/sql"
"log" "encoding/json"
"net/http"
"os"
"fmt" "fmt"
"log"
"os"
"strings"
mqtt "github.com/eclipse/paho.mqtt.golang" mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/joho/godotenv" "github.com/joho/godotenv"
_ "github.com/lib/pq" //Required for PostgreSQL driver _ "github.com/lib/pq"
) )
//JSON payload structure var (
// type Payload struct { mqttBroker string
// Temperature float64 `json:"temperature"` dbDriver string
// Humidity int `json:"humidity"` dbDataSource string
// Pressure int `json:"pressure"` clientID string
// Timestamp string `json:"timestamp"` )
// }
type Payload struct {
PlantCode string `json:"plant_id"` func init() {
// Load environment variables from .env file
if err := godotenv.Load(); err != nil {
log.Fatalf("Error loading .env file: %v", err)
}
// Retrieve environment variables
mqttBroker = os.Getenv("MQTT_BROKER")
dbDriver = os.Getenv("DB_DRIVER")
dbDataSource = os.Getenv("DB_DATASOURCE")
clientID = os.Getenv("CLIENT_ID")
// Validate required environment variables
if mqttBroker == "" || dbDriver == "" || dbDataSource == "" || clientID == "" {
log.Fatalf("Missing required environment variables")
}
}
var (
topics = map[string]byte{"powerhouse": 1} // Topics to subscribe with QoS 1
db *sql.DB // Database connection
)
// Handles incoming MQTT messages
func messageHandler(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("📥 Received message on topic %s: %s\n", msg.Topic(), msg.Payload())
errorTopic := "error_topic"
// Define struct for payload
type Payload struct {
PlantCode string `json:"plant_code"`
MFMMeterID string `json:"mfm_meter_id"` MFMMeterID string `json:"mfm_meter_id"`
RegisterData string `json:"register_data"` RegisterData string `json:"register_data"`
CreatedAt string `json:"created_at"` CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"` UpdatedAt string `json:"updated_at"`
CreatedBy string `json:"created_by"` CreatedBy string `json:"created_by"`
}
//Global variables
var (
mqttClient mqtt.Client
mqttTopic = "test"
db *sql.DB
mqttBroker string
clientID string
mqttUser string
mqttPass string
dbDriver string
dbDataSource string
)
func main() {
//Load environment variables from .env
if err := godotenv.Load(); err != nil {
log.Fatalf("Error loading .env file: %v", err)
}
//Get environment variables
mqttBroker = os.Getenv("MQTT_BROKER")
clientID = os.Getenv("CLIENT_ID")
mqttUser = os.Getenv("MQTT_USERNAME")
mqttPass = os.Getenv("MQTT_PASSWORD")
dbDriver = os.Getenv("DB_DRIVER") // should be "postgres"
dbDataSource = os.Getenv("DB_DATASOURCE") // Postgres connection string
if mqttBroker == "" || clientID == "" || dbDriver == "" || dbDataSource == "" {
log.Fatal("Missing required environment variables")
}
//Open DB connection
var err error
db, err = sql.Open(dbDriver, dbDataSource)
if err != nil {
log.Fatalf("Failed to connect to DB: %v", err)
}
if err := db.Ping(); err != nil {
log.Fatalf("DB ping failed: %v", err)
}
log.Println("Connected to the database")
//Setup and connect to MQTT
opts := mqtt.NewClientOptions().
AddBroker(mqttBroker).
SetClientID(clientID).
SetUsername(mqttUser).
SetPassword(mqttPass).
SetCleanSession(true).
SetAutoReconnect(true)
mqttClient = mqtt.NewClient(opts)
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
log.Fatalf("MQTT connection failed: %v", token.Error())
}
log.Println("Connected to MQTT Broker")
//Start HTTP server
http.HandleFunc("/api/send-data", handlePostData)
log.Println("HTTP server listening on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
// func handlePostData(w http.ResponseWriter, r *http.Request) {
// w.Header().Set("Content-Type", "application/json")
// if r.Method != http.MethodPost {
// http.Error(w, `{"error":"Only POST allowed"}`, http.StatusMethodNotAllowed)
// return
// }
// // Decode payload
// var payload Payload
// var err error
// if err = json.NewDecoder(r.Body).Decode(&payload); err != nil {
// http.Error(w, `{"error":"Invalid JSON"}`, http.StatusBadRequest)
// return
// }
// if payload.CreatedAt == "" {
// http.Error(w, `{"error":"Missing created_at"}`, http.StatusBadRequest)
// return
// }
// var plantID int
// err = db.QueryRow(`SELECT id FROM plants WHERE code = $1`, payload.PlantName).Scan(&plantID)
// if err == sql.ErrNoRows {
// w.WriteHeader(http.StatusBadRequest)
// fmt.Fprintf(w, `{"error":"Plant code '%s' not found"}`, payload.PlantName)
// return
// } else if err != nil {
// log.Printf("Error getting plant ID: %v", err)
// http.Error(w, `{"error":"Internal server error"}`, http.StatusInternalServerError)
// return
// }
// var mfmMeterID int
// query := `
// SELECT id FROM mfm_meters
// WHERE TRIM(sequence) = TRIM($1) AND plant_id = $2
// LIMIT 1
// `
// err = db.QueryRow(query, payload.MFMMeterID, payload.PlantName).Scan(&mfmMeterID)
// if err == sql.ErrNoRows {
// w.WriteHeader(http.StatusBadRequest)
// w.Write([]byte(`{"error": "Sequence '` + payload.MFMMeterID + `' not found for plant_id ` + strconv.Itoa(payload.PlantName) + `"}`))
// return
// } else if err != nil {
// log.Printf("DB check error: %v", err)
// http.Error(w, `{"error":"Internal server error"}`, http.StatusInternalServerError)
// return
// }
// insertQuery := `
// INSERT INTO temp_live_readings (
// plant_id, mfm_meter_id, register_data, created_at, updated_at, created_by
// ) VALUES ($1, $2, $3, $4, $5, $6)
// `
// _, err = db.Exec(insertQuery,
// payload.PlantName,
// mfmMeterID,
// payload.RegisterData,
// payload.CreatedAt,
// payload.UpdatedAt,
// payload.CreatedBy,
// )
// if err != nil {
// log.Printf("DB insert error: %v", err)
// http.Error(w, `{"error":"DB insert failed"}`, http.StatusInternalServerError)
// return
// }
// msg, _ := json.Marshal(payload)
// token := mqttClient.Publish(mqttTopic, 1, false, msg)
// token.Wait()
// w.WriteHeader(http.StatusOK)
// w.Write([]byte(`{"message":"Data inserted into DB and published to MQTT"}`))
// }
func handlePostData(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if r.Method != http.MethodPost {
http.Error(w, `{"error":"Only POST allowed"}`, http.StatusMethodNotAllowed)
return
} }
var payload Payload var payload Payload
var err error if err := json.Unmarshal(msg.Payload(), &payload); err != nil {
publishError(client, errorTopic, fmt.Sprintf("❌ Invalid JSON: %v", err))
if err = json.NewDecoder(r.Body).Decode(&payload); err != nil {
http.Error(w, `{"error":"Invalid JSON"}`, http.StatusBadRequest)
return return
} }
// Validate required fields
if payload.CreatedAt == "" { if payload.CreatedAt == "" {
http.Error(w, `{"error":"Missing created_at"}`, http.StatusBadRequest) publishError(client, errorTopic, "❌ Missing created_at")
return return
} }
//Resolve plant ID from plant_code // Resolve plant ID
var plantID int var plantID int
err = db.QueryRow(`SELECT id FROM plants WHERE code = $1`, payload.PlantCode).Scan(&plantID) err := db.QueryRow(`SELECT id FROM plants WHERE code = $1`, payload.PlantCode).Scan(&plantID)
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
w.WriteHeader(http.StatusBadRequest) publishError(client, errorTopic, fmt.Sprintf("❌ Plant code '%s' not found", payload.PlantCode))
fmt.Fprintf(w, `{"error":"Plant code '%s' not found"}`, payload.PlantCode)
return return
} else if err != nil { } else if err != nil {
log.Printf("Error getting plant ID: %v", err) publishError(client, errorTopic, fmt.Sprintf("Error fetching plant ID: %v", err))
http.Error(w, `{"error":"Internal server error"}`, http.StatusInternalServerError)
return return
} }
// Check if MFM meter exists for the plant // Validate MFM meter
var mfmMeterID int var mfmMeterID int
query := `SELECT id FROM mfm_meters WHERE TRIM(sequence) = TRIM($1) AND plant_id = $2 LIMIT 1` query := `SELECT id FROM mfm_meters WHERE TRIM(sequence) = TRIM($1) AND plant_id = $2 LIMIT 1`
err = db.QueryRow(query, payload.MFMMeterID, plantID).Scan(&mfmMeterID) err = db.QueryRow(query, payload.MFMMeterID, plantID).Scan(&mfmMeterID)
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
w.WriteHeader(http.StatusBadRequest) publishError(client, errorTopic, fmt.Sprintf("❌ mfm_meter_id '%s' not found for plant_id %d", payload.MFMMeterID, plantID))
fmt.Fprintf(w, `{"error":"mfm_meter_id '%s' not found for plant_id %d"}`, payload.MFMMeterID, plantID)
return return
} else if err != nil { } else if err != nil {
log.Printf("Error checking MFM meter: %v", err) publishError(client, errorTopic, fmt.Sprintf("Error checking MFM meter: %v", err))
http.Error(w, `{"error":"Internal server error"}`, http.StatusInternalServerError)
return return
} }
existsQuery := `SELECT 1 FROM temp_live_readings WHERE created_at = $1 AND mfm_meter_id = $2 LIMIT 1` // Check for duplicates
var exists int var exists int
existsQuery := `SELECT 1 FROM temp_live_readings WHERE created_at = $1 AND mfm_meter_id = $2 LIMIT 1`
err = db.QueryRow(existsQuery, payload.CreatedAt, mfmMeterID).Scan(&exists) err = db.QueryRow(existsQuery, payload.CreatedAt, mfmMeterID).Scan(&exists)
if err == nil { if err == nil {
w.WriteHeader(http.StatusConflict) // 409 Conflict is semantically accurate publishError(client, errorTopic, fmt.Sprintf("⚠️ Duplicate entry for created_at '%s' and mfm_meter_id %d", payload.CreatedAt, mfmMeterID))
fmt.Fprintf(w, `{"error":"Duplicate entry for created_at '%s' and mfm_meter_id %d"}`, payload.CreatedAt, mfmMeterID)
return return
} else if err != sql.ErrNoRows { } else if err != sql.ErrNoRows {
log.Printf("Error checking for duplicate: %v", err) publishError(client, errorTopic, fmt.Sprintf("Error checking for duplicate: %v", err))
http.Error(w, `{"error":"Internal server error"}`, http.StatusInternalServerError)
return return
} }
//Insert into temp_live_readings //
insertQuery := `INSERT INTO temp_live_readings (plant_id, mfm_meter_id, register_data, created_at, updated_at, created_by) VALUES ($1, $2, $3, $4, $5, $6)` insertQuery := `INSERT INTO temp_live_readings (plant_id, mfm_meter_id, register_data, created_at, updated_at, created_by) VALUES ($1, $2, $3, $4, $5, $6)`
_, err = db.Exec(insertQuery, _, err = db.Exec(insertQuery,
@@ -416,20 +120,194 @@ func handlePostData(w http.ResponseWriter, r *http.Request) {
) )
if err != nil { if err != nil {
log.Printf("DB insert error: %v", err) publishError(client, errorTopic, fmt.Sprintf("DB insert failed: %v", err))
http.Error(w, `{"error":"Insert into DB failed"}`, http.StatusInternalServerError)
return return
} }
//Publish to MQTT log.Printf("✅ Data inserted successfully for plant ID %d and mfm_meter_id %d", plantID, mfmMeterID)
msg, _ := json.Marshal(payload)
token := mqttClient.Publish(mqttTopic, 1, false, msg)
token.Wait()
w.WriteHeader(http.StatusOK) // values := strings.Split(payload.RegisterData, ",")
w.Write([]byte(`{"message":"Data inserted into DB and published to MQTT"}`))
// if len(values) < 1 {
// publishError(client, errorTopic,
// fmt.Sprintf("❌ Register data has insufficient fields: need at least 1, got %d", len(values)))
// return
// }
// fields := make([]interface{}, 22)
// for i := 0; i < 22; i++ {
// if i < len(values) {
// v := strings.TrimSpace(values[i])
// if v == "" {
// fields[i] = nil // Empty string becomes NULL
// } else {
// fields[i] = v
// }
// } else {
// fields[i] = nil // No value sent → NULL
// }
// }
// // Prepare and execute insert into mfm_readings
// insertReadingQuery := `INSERT INTO mfm_readings (plant_id, mfm_meter_id,apparent_energy_received, reactive_energy_received, active_energy_received, active_power_r, active_power_y, active_power_b, active_power_total,voltage_ry, voltage_yb, voltage_br,current_r, current_y, current_b, current_n,voltage_r_n, voltage_y_n, voltage_b_n,frequency,power_factor_r, power_factor_y, power_factor_b, power_factor_total,created_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25)`
// args := []interface{}{plantID, mfmMeterID}
// args = append(args, fields...)
// args = append(args, payload.CreatedAt)
// // Execute the insert safely
// _, err = db.Exec(insertReadingQuery, args...)
// if err != nil {
// publishError(client, errorTopic, fmt.Sprintf("❌ Failed to insert into mfm_readings: %v", err))
// return
// }
// log.Printf("✅ mfm_readings inserted for mfm_meter_id %d", mfmMeterID)
values := strings.Split(payload.RegisterData, ",")
// Step 1: Get expected parameter columns from mfm_parameters
paramQuery := `SELECT name FROM mfm_parameters WHERE mfm_meter_id = $1 AND plant_id = $2 ORDER BY id ASC`
rows, err := db.Query(paramQuery, mfmMeterID, plantID)
if err != nil {
publishError(client, errorTopic, fmt.Sprintf("❌ Failed to fetch parameters for meter %d: %v", mfmMeterID, err))
return
}
defer rows.Close()
var paramColumns []string
for rows.Next() {
var name string
if err := rows.Scan(&name); err != nil {
publishError(client, errorTopic, fmt.Sprintf("❌ Failed to scan parameter name: %v", err))
return
}
paramColumns = append(paramColumns, name)
}
if len(paramColumns) == 0 {
publishError(client, errorTopic, fmt.Sprintf("❌ No parameters found for meter_id %d", mfmMeterID))
return
}
// Validate values count
if len(values) < len(paramColumns) {
publishError(client, errorTopic, fmt.Sprintf("❌ Insufficient register values: expected %d, got %d", len(paramColumns), len(values)))
return
}
if len(values) > len(paramColumns){
publishError(client, errorTopic, fmt.Sprintf("❌ Too many register values: expected %d, got %d", len(paramColumns), len(values)))
return
}
// Build insert statement
columns := []string{"plant_id", "mfm_meter_id"}
placeholders := []string{"$1", "$2"}
args := []interface{}{plantID, mfmMeterID}
argIndex := 3
for i, col := range paramColumns {
columns = append(columns, col)
placeholders = append(placeholders, fmt.Sprintf("$%d", argIndex))
argIndex++
v := strings.TrimSpace(values[i])
if v == "" {
args = append(args, nil)
} else {
args = append(args, v)
}
}
// Add created_at
columns = append(columns, "created_at")
placeholders = append(placeholders, fmt.Sprintf("$%d", argIndex))
args = append(args, payload.CreatedAt)
insertQuery = fmt.Sprintf(
`INSERT INTO mfm_readings (%s) VALUES (%s)`,
strings.Join(columns, ", "),
strings.Join(placeholders, ", "),
)
_, err = db.Exec(insertQuery, args...)
if err != nil {
publishError(client, errorTopic, fmt.Sprintf("❌ Failed to insert into mfm_readings: %v", err))
return
}
log.Printf("✅ Inserted into mfm_readings for meter %d with %d parameters", mfmMeterID, len(paramColumns))
}
// Publishes error messages to a specific topic
func publishError(client mqtt.Client, topic, message string) {
token := client.Publish(topic, 1, false, message)
token.Wait()
if token.Error() != nil {
log.Printf("Failed to publish error to topic '%s': %v", topic, token.Error())
}
}
// Handles successful MQTT connection
func connectHandler(client mqtt.Client) {
log.Println("Connected to MQTT broker")
}
// Handles MQTT connection loss
func connectionLostHandler(client mqtt.Client, err error) {
log.Printf("Connection lost: %v. Attempting to reconnect...", err)
}
func main() {
var err error
godotenv.Load()
// Connect to the database
db, err = sql.Open(dbDriver, dbDataSource)
if err != nil {
log.Fatalf("Database connection failed: %v", err)
}
defer db.Close()
// Validate database connection
if err := db.Ping(); err != nil {
log.Fatalf("Database ping failed: %v", err)
}
// Configure MQTT client
opts := mqtt.NewClientOptions().
AddBroker(mqttBroker).
SetClientID(clientID).
// SetUsername("test"). // todo: Change to .env variable
// SetPassword("test"). // todo: Change to .env variable
SetCleanSession(false).
SetAutoReconnect(true).
SetOrderMatters(false).
SetOnConnectHandler(connectHandler).
SetConnectionLostHandler(connectionLostHandler)
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Fatalf("Failed to connect to MQTT broker: %v", token.Error())
}
// Subscribe to topics
for topic, qos := range topics {
if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil {
log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error())
}
log.Printf("Subscribed to topic: %s", topic)
}
// Keep the program running
sigChan := make(chan os.Signal, 1)
<-sigChan
} }