// package main // // import ( // // "database/sql" // // "encoding/json" // // "fmt" // // "log" // // "os" // // mqtt "github.com/eclipse/paho.mqtt.golang" // // "github.com/joho/godotenv" // // _ "github.com/lib/pq" // PostgreSQL driver // // ) // import ( // "database/sql" // "encoding/json" // "fmt" // "log" // "os" // mqtt "github.com/eclipse/paho.mqtt.golang" // "github.com/joho/godotenv" // _ "github.com/lib/pq" // ) // var ( // mqttBroker string // dbDriver string // dbDataSource string // clientID string // ) // func init() { // // Load environment variables from .env file // if err := godotenv.Load(); err != nil { // log.Fatalf("Error loading .env file: %v", err) // } // // Retrieve environment variables // mqttBroker = os.Getenv("MQTT_BROKER") // dbDriver = os.Getenv("DB_DRIVER") // dbDataSource = os.Getenv("DB_DATASOURCE") // clientID = os.Getenv("CLIENT_ID") // // Validate required environment variables // if mqttBroker == "" || dbDriver == "" || dbDataSource == "" || clientID == "" { // log.Fatalf("Missing required environment variables") // } // } // var ( // topics = map[string]byte{"test": 1} // Topics to subscribe with QoS 1 // db *sql.DB // Database connection // ) // // Handles incoming MQTT messages // func messageHandler(client mqtt.Client, msg mqtt.Message) { // fmt.Printf("Received message on topic %s: %s\n", msg.Topic(), msg.Payload()) // requiredFields := []string{"temp", "humidity", "pressure", "timestamp"} // errorTopic := "error_topic" // // Validate JSON payload // var jsonData map[string]interface{} // if err := json.Unmarshal(msg.Payload(), &jsonData); err != nil { // publishError(client, errorTopic, fmt.Sprintf("Invalid JSON payload on topic %s: %s", msg.Topic(), msg.Payload())) // return // } // // Check for missing fields // for _, field := range requiredFields { // if _, exists := jsonData[field]; !exists { // publishError(client, errorTopic, fmt.Sprintf("Missing field '%s' in payload on topic %s: %s", field, msg.Topic(), msg.Payload())) // return // } // } // // Create table dynamically if it doesn't exist // tableName := msg.Topic() // createTableQuery := fmt.Sprintf(` // CREATE TABLE IF NOT EXISTS %s ( // id SERIAL PRIMARY KEY, // temperature FLOAT NOT NULL, // humidity INT NOT NULL, // pressure INT NOT NULL, // timestamp TIMESTAMP NOT NULL // )`, tableName) // if _, err := db.Exec(createTableQuery); err != nil { // publishError(client, errorTopic, fmt.Sprintf("Failed to create table '%s': %v", tableName, err)) // return // } // // Insert data into the table // insertQuery := fmt.Sprintf(` // INSERT INTO %s (temperature, humidity, pressure, timestamp) // VALUES ($1, $2, $3, $4)`, tableName) // _, err := db.Exec(insertQuery, jsonData["temperature"], jsonData["humidity"], jsonData["pressure"], jsonData["timestamp"]) // if err != nil { // publishError(client, errorTopic, fmt.Sprintf("Failed to insert data into table '%s': %v", tableName, err)) // } else { // log.Printf("Data successfully inserted into table '%s'.", tableName) // } // } // // Publishes error messages to a specific topic // func publishError(client mqtt.Client, topic, message string) { // token := client.Publish(topic, 1, false, message) // token.Wait() // if token.Error() != nil { // log.Printf("Failed to publish error to topic '%s': %v", topic, token.Error()) // } // } // // Handles successful MQTT connection // func connectHandler(client mqtt.Client) { // log.Println("Connected to MQTT broker") // } // // Handles MQTT connection loss // func connectionLostHandler(client mqtt.Client, err error) { // log.Printf("Connection lost: %v. Attempting to reconnect...", err) // } // func main() { // var err error // godotenv.Load() // // Connect to the database // db, err = sql.Open(dbDriver, dbDataSource) // if err != nil { // log.Fatalf("Database connection failed: %v", err) // } // defer db.Close() // // Validate database connection // if err := db.Ping(); err != nil { // log.Fatalf("Database ping failed: %v", err) // } // // Configure MQTT client // opts := mqtt.NewClientOptions(). // AddBroker(mqttBroker). // SetClientID(clientID). // SetUsername("test"). // todo: Change to .env variable // SetPassword("test"). // todo: Change to .env variable // SetCleanSession(false). // SetAutoReconnect(true). // SetOrderMatters(false). // SetOnConnectHandler(connectHandler). // SetConnectionLostHandler(connectionLostHandler) // client := mqtt.NewClient(opts) // if token := client.Connect(); token.Wait() && token.Error() != nil { // log.Fatalf("Failed to connect to MQTT broker: %v", token.Error()) // } // // Subscribe to topics // for topic, qos := range topics { // if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil { // log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error()) // } // log.Printf("Subscribed to topic: %s", topic) // } // // Keep the program running // sigChan := make(chan os.Signal, 1) // <-sigChan // } // package main import ( "database/sql" "encoding/json" "fmt" "log" "os" "strings" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/joho/godotenv" _ "github.com/lib/pq" ) var ( mqttBroker string dbDriver string dbDataSource string clientID string ) func init() { // Load environment variables from .env file if err := godotenv.Load(); err != nil { log.Fatalf("Error loading .env file: %v", err) } // Retrieve environment variables mqttBroker = os.Getenv("MQTT_BROKER") dbDriver = os.Getenv("DB_DRIVER") dbDataSource = os.Getenv("DB_DATASOURCE") clientID = os.Getenv("CLIENT_ID") // Validate required environment variables if mqttBroker == "" || dbDriver == "" || dbDataSource == "" || clientID == "" { log.Fatalf("Missing required environment variables") } } var ( topics = map[string]byte{"powerhouse": 1} // Topics to subscribe with QoS 1 db *sql.DB // Database connection ) // Handles incoming MQTT messages func messageHandler(client mqtt.Client, msg mqtt.Message) { fmt.Printf("📥 Received message on topic %s: %s\n", msg.Topic(), msg.Payload()) errorTopic := "error_topic" // Define struct for payload type Payload struct { PlantCode string `json:"plant_code"` MFMMeterID string `json:"mfm_meter_id"` RegisterData string `json:"register_data"` CreatedAt string `json:"created_at"` UpdatedAt string `json:"updated_at"` CreatedBy string `json:"created_by"` } var payload Payload if err := json.Unmarshal(msg.Payload(), &payload); err != nil { publishError(client, errorTopic, fmt.Sprintf("❌ Invalid JSON: %v", err)) return } // Validate required fields if payload.CreatedAt == "" { publishError(client, errorTopic, "❌ Missing created_at") return } // Resolve plant ID var plantID int err := db.QueryRow(`SELECT id FROM plants WHERE code = $1`, payload.PlantCode).Scan(&plantID) if err == sql.ErrNoRows { publishError(client, errorTopic, fmt.Sprintf("❌ Plant code '%s' not found", payload.PlantCode)) return } else if err != nil { publishError(client, errorTopic, fmt.Sprintf("❌ Error fetching plant ID: %v", err)) return } // Validate MFM meter var mfmMeterID int query := `SELECT id FROM mfm_meters WHERE TRIM(sequence) = TRIM($1) AND plant_id = $2 LIMIT 1` err = db.QueryRow(query, payload.MFMMeterID, plantID).Scan(&mfmMeterID) if err == sql.ErrNoRows { publishError(client, errorTopic, fmt.Sprintf("❌ mfm_meter_id '%s' not found for plant_id %d", payload.MFMMeterID, plantID)) return } else if err != nil { publishError(client, errorTopic, fmt.Sprintf("❌ Error checking MFM meter: %v", err)) return } // Check for duplicates var exists int existsQuery := `SELECT 1 FROM temp_live_readings WHERE created_at = $1 AND mfm_meter_id = $2 LIMIT 1` err = db.QueryRow(existsQuery, payload.CreatedAt, mfmMeterID).Scan(&exists) if err == nil { publishError(client, errorTopic, fmt.Sprintf("⚠️ Duplicate entry for created_at '%s' and mfm_meter_id %d", payload.CreatedAt, mfmMeterID)) return } else if err != sql.ErrNoRows { publishError(client, errorTopic, fmt.Sprintf("❌ Error checking for duplicate: %v", err)) return } // insertQuery := `INSERT INTO temp_live_readings (plant_id, mfm_meter_id, register_data, created_at, updated_at, created_by) VALUES ($1, $2, $3, $4, $5, $6)` _, err = db.Exec(insertQuery, plantID, mfmMeterID, payload.RegisterData, payload.CreatedAt, payload.UpdatedAt, payload.CreatedBy, ) if err != nil { publishError(client, errorTopic, fmt.Sprintf("❌ DB insert failed: %v", err)) return } log.Printf("✅ Data inserted successfully for plant ID %d and mfm_meter_id %d", plantID, mfmMeterID) // values := strings.Split(payload.RegisterData, ",") // if len(values) < 1 { // publishError(client, errorTopic, // fmt.Sprintf("❌ Register data has insufficient fields: need at least 1, got %d", len(values))) // return // } // fields := make([]interface{}, 22) // for i := 0; i < 22; i++ { // if i < len(values) { // v := strings.TrimSpace(values[i]) // if v == "" { // fields[i] = nil // Empty string becomes NULL // } else { // fields[i] = v // } // } else { // fields[i] = nil // No value sent → NULL // } // } // // Prepare and execute insert into mfm_readings // insertReadingQuery := `INSERT INTO mfm_readings (plant_id, mfm_meter_id,apparent_energy_received, reactive_energy_received, active_energy_received, active_power_r, active_power_y, active_power_b, active_power_total,voltage_ry, voltage_yb, voltage_br,current_r, current_y, current_b, current_n,voltage_r_n, voltage_y_n, voltage_b_n,frequency,power_factor_r, power_factor_y, power_factor_b, power_factor_total,created_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25)` // args := []interface{}{plantID, mfmMeterID} // args = append(args, fields...) // args = append(args, payload.CreatedAt) // // Execute the insert safely // _, err = db.Exec(insertReadingQuery, args...) // if err != nil { // publishError(client, errorTopic, fmt.Sprintf("❌ Failed to insert into mfm_readings: %v", err)) // return // } // log.Printf("✅ mfm_readings inserted for mfm_meter_id %d", mfmMeterID) values := strings.Split(payload.RegisterData, ",") // Step 1: Get expected parameter columns from mfm_parameters paramQuery := `SELECT name FROM mfm_parameters WHERE mfm_meter_id = $1 AND plant_id = $2 ORDER BY id ASC` rows, err := db.Query(paramQuery, mfmMeterID, plantID) if err != nil { publishError(client, errorTopic, fmt.Sprintf("❌ Failed to fetch parameters for meter %d: %v", mfmMeterID, err)) return } defer rows.Close() var paramColumns []string for rows.Next() { var name string if err := rows.Scan(&name); err != nil { publishError(client, errorTopic, fmt.Sprintf("❌ Failed to scan parameter name: %v", err)) return } paramColumns = append(paramColumns, name) } if len(paramColumns) == 0 { publishError(client, errorTopic, fmt.Sprintf("❌ No parameters found for meter_id %d", mfmMeterID)) return } // Validate values count if len(values) < len(paramColumns) { publishError(client, errorTopic, fmt.Sprintf("❌ Insufficient register values: expected %d, got %d", len(paramColumns), len(values))) return } if len(values) > len(paramColumns){ publishError(client, errorTopic, fmt.Sprintf("❌ Too many register values: expected %d, got %d", len(paramColumns), len(values))) return } // Build insert statement columns := []string{"plant_id", "mfm_meter_id"} placeholders := []string{"$1", "$2"} args := []interface{}{plantID, mfmMeterID} argIndex := 3 for i, col := range paramColumns { columns = append(columns, col) placeholders = append(placeholders, fmt.Sprintf("$%d", argIndex)) argIndex++ v := strings.TrimSpace(values[i]) if v == "" { args = append(args, nil) } else { args = append(args, v) } } // Add created_at columns = append(columns, "created_at") placeholders = append(placeholders, fmt.Sprintf("$%d", argIndex)) args = append(args, payload.CreatedAt) insertQuery = fmt.Sprintf( `INSERT INTO mfm_readings (%s) VALUES (%s)`, strings.Join(columns, ", "), strings.Join(placeholders, ", "), ) _, err = db.Exec(insertQuery, args...) if err != nil { publishError(client, errorTopic, fmt.Sprintf("❌ Failed to insert into mfm_readings: %v", err)) return } log.Printf("✅ Inserted into mfm_readings for meter %d with %d parameters", mfmMeterID, len(paramColumns)) } // Publishes error messages to a specific topic func publishError(client mqtt.Client, topic, message string) { token := client.Publish(topic, 1, false, message) token.Wait() if token.Error() != nil { log.Printf("Failed to publish error to topic '%s': %v", topic, token.Error()) } } // Handles successful MQTT connection func connectHandler(client mqtt.Client) { log.Println("Connected to MQTT broker") } // Handles MQTT connection loss func connectionLostHandler(client mqtt.Client, err error) { log.Printf("Connection lost: %v. Attempting to reconnect...", err) } func main() { var err error godotenv.Load() // Connect to the database db, err = sql.Open(dbDriver, dbDataSource) if err != nil { log.Fatalf("Database connection failed: %v", err) } defer db.Close() // Validate database connection if err := db.Ping(); err != nil { log.Fatalf("Database ping failed: %v", err) } // Configure MQTT client opts := mqtt.NewClientOptions(). AddBroker(mqttBroker). SetClientID(clientID). // SetUsername("test"). // todo: Change to .env variable // SetPassword("test"). // todo: Change to .env variable SetCleanSession(false). SetAutoReconnect(true). SetOrderMatters(false). SetOnConnectHandler(connectHandler). SetConnectionLostHandler(connectionLostHandler) client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { log.Fatalf("Failed to connect to MQTT broker: %v", token.Error()) } // Subscribe to topics for topic, qos := range topics { if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil { log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error()) } log.Printf("Subscribed to topic: %s", topic) } // Keep the program running sigChan := make(chan os.Signal, 1) <-sigChan }