diff --git a/mqtt_client.go b/mqtt_client.go index cafc2d7..024e737 100644 --- a/mqtt_client.go +++ b/mqtt_client.go @@ -1,174 +1,3 @@ -// package main - -// // import ( -// // "database/sql" -// // "encoding/json" -// // "fmt" -// // "log" -// // "os" - -// // mqtt "github.com/eclipse/paho.mqtt.golang" -// // "github.com/joho/godotenv" -// // _ "github.com/lib/pq" // PostgreSQL driver -// // ) -// import ( -// "database/sql" -// "encoding/json" -// "fmt" -// "log" -// "os" - -// mqtt "github.com/eclipse/paho.mqtt.golang" -// "github.com/joho/godotenv" -// _ "github.com/lib/pq" -// ) - -// var ( -// mqttBroker string -// dbDriver string -// dbDataSource string -// clientID string -// ) - - -// func init() { -// // Load environment variables from .env file -// if err := godotenv.Load(); err != nil { -// log.Fatalf("Error loading .env file: %v", err) -// } - -// // Retrieve environment variables -// mqttBroker = os.Getenv("MQTT_BROKER") -// dbDriver = os.Getenv("DB_DRIVER") -// dbDataSource = os.Getenv("DB_DATASOURCE") -// clientID = os.Getenv("CLIENT_ID") - -// // Validate required environment variables -// if mqttBroker == "" || dbDriver == "" || dbDataSource == "" || clientID == "" { -// log.Fatalf("Missing required environment variables") -// } -// } - -// var ( -// topics = map[string]byte{"test": 1} // Topics to subscribe with QoS 1 -// db *sql.DB // Database connection -// ) - -// // Handles incoming MQTT messages -// func messageHandler(client mqtt.Client, msg mqtt.Message) { -// fmt.Printf("Received message on topic %s: %s\n", msg.Topic(), msg.Payload()) - -// requiredFields := []string{"temp", "humidity", "pressure", "timestamp"} -// errorTopic := "error_topic" - -// // Validate JSON payload -// var jsonData map[string]interface{} -// if err := json.Unmarshal(msg.Payload(), &jsonData); err != nil { -// publishError(client, errorTopic, fmt.Sprintf("Invalid JSON payload on topic %s: %s", msg.Topic(), msg.Payload())) -// return -// } - -// // Check for missing fields -// for _, field := range requiredFields { -// if _, exists := jsonData[field]; !exists { -// publishError(client, errorTopic, fmt.Sprintf("Missing field '%s' in payload on topic %s: %s", field, msg.Topic(), msg.Payload())) -// return -// } -// } - -// // Create table dynamically if it doesn't exist -// tableName := msg.Topic() -// createTableQuery := fmt.Sprintf(` -// CREATE TABLE IF NOT EXISTS %s ( -// id SERIAL PRIMARY KEY, -// temperature FLOAT NOT NULL, -// humidity INT NOT NULL, -// pressure INT NOT NULL, -// timestamp TIMESTAMP NOT NULL -// )`, tableName) -// if _, err := db.Exec(createTableQuery); err != nil { -// publishError(client, errorTopic, fmt.Sprintf("Failed to create table '%s': %v", tableName, err)) -// return -// } - -// // Insert data into the table -// insertQuery := fmt.Sprintf(` -// INSERT INTO %s (temperature, humidity, pressure, timestamp) -// VALUES ($1, $2, $3, $4)`, tableName) -// _, err := db.Exec(insertQuery, jsonData["temperature"], jsonData["humidity"], jsonData["pressure"], jsonData["timestamp"]) -// if err != nil { -// publishError(client, errorTopic, fmt.Sprintf("Failed to insert data into table '%s': %v", tableName, err)) -// } else { -// log.Printf("Data successfully inserted into table '%s'.", tableName) -// } -// } - -// // Publishes error messages to a specific topic -// func publishError(client mqtt.Client, topic, message string) { -// token := client.Publish(topic, 1, false, message) -// token.Wait() -// if token.Error() != nil { -// log.Printf("Failed to publish error to topic '%s': %v", topic, token.Error()) -// } -// } - -// // Handles successful MQTT connection -// func connectHandler(client mqtt.Client) { -// log.Println("Connected to MQTT broker") -// } - -// // Handles MQTT connection loss -// func connectionLostHandler(client mqtt.Client, err error) { -// log.Printf("Connection lost: %v. Attempting to reconnect...", err) -// } - -// func main() { -// var err error -// godotenv.Load() -// // Connect to the database -// db, err = sql.Open(dbDriver, dbDataSource) -// if err != nil { -// log.Fatalf("Database connection failed: %v", err) -// } -// defer db.Close() - -// // Validate database connection -// if err := db.Ping(); err != nil { -// log.Fatalf("Database ping failed: %v", err) -// } - -// // Configure MQTT client -// opts := mqtt.NewClientOptions(). -// AddBroker(mqttBroker). -// SetClientID(clientID). -// SetUsername("test"). // todo: Change to .env variable -// SetPassword("test"). // todo: Change to .env variable -// SetCleanSession(false). -// SetAutoReconnect(true). -// SetOrderMatters(false). -// SetOnConnectHandler(connectHandler). -// SetConnectionLostHandler(connectionLostHandler) - -// client := mqtt.NewClient(opts) -// if token := client.Connect(); token.Wait() && token.Error() != nil { -// log.Fatalf("Failed to connect to MQTT broker: %v", token.Error()) -// } - -// // Subscribe to topics -// for topic, qos := range topics { -// if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil { -// log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error()) -// } -// log.Printf("Subscribed to topic: %s", topic) -// } - -// // Keep the program running -// sigChan := make(chan os.Signal, 1) -// <-sigChan -// } - -// - package main import ( @@ -340,7 +169,7 @@ func messageHandler(client mqtt.Client, msg mqtt.Message) { rows, err := db.Query(paramQuery, mfmMeterID, plantID) if err != nil { - publishError(client, errorTopic, fmt.Sprintf("❌ Failed to fetch parameters for meter %d: %v", mfmMeterID, err)) + logAndPublishError(client, errorTopic, fmt.Sprintf("❌ Failed to fetch parameters for meter %d: %v", mfmMeterID, err)) return } defer rows.Close() @@ -349,25 +178,25 @@ func messageHandler(client mqtt.Client, msg mqtt.Message) { for rows.Next() { var name string if err := rows.Scan(&name); err != nil { - publishError(client, errorTopic, fmt.Sprintf("❌ Failed to scan parameter name: %v", err)) + logAndPublishError(client, errorTopic, fmt.Sprintf("❌ Failed to scan parameter name: %v", err)) return } paramColumns = append(paramColumns, name) } if len(paramColumns) == 0 { - publishError(client, errorTopic, fmt.Sprintf("❌ No parameters found for meter_id %d", mfmMeterID)) + logAndPublishError(client, errorTopic, fmt.Sprintf("❌ No parameters found for meter_id %d", mfmMeterID)) return } // Validate values count if len(values) < len(paramColumns) { - publishError(client, errorTopic, fmt.Sprintf("❌ Insufficient register values: expected %d, got %d", len(paramColumns), len(values))) + logAndPublishError(client, errorTopic, fmt.Sprintf("❌ Insufficient register values: expected %d, got %d", len(paramColumns), len(values))) return } if len(values) > len(paramColumns){ - publishError(client, errorTopic, fmt.Sprintf("❌ Too many register values: expected %d, got %d", len(paramColumns), len(values))) + logAndPublishError(client, errorTopic, fmt.Sprintf("❌ Too many register values: expected %d, got %d", len(paramColumns), len(values))) return } @@ -402,81 +231,10 @@ func messageHandler(client mqtt.Client, msg mqtt.Message) { _, err = db.Exec(insertQuery, args...) if err != nil { - publishError(client, errorTopic, fmt.Sprintf("❌ Failed to insert into mfm_readings: %v", err)) + logAndPublishError(client, errorTopic, fmt.Sprintf("❌ Failed to insert into mfm_readings: %v", err)) return } log.Printf("✅ Inserted into mfm_readings for meter %d with %d parameters", mfmMeterID, len(paramColumns)) -} - - -// Publishes error messages to a specific topic -func publishError(client mqtt.Client, topic, message string) { - token := client.Publish(topic, 1, false, message) - token.Wait() - if token.Error() != nil { - log.Printf("Failed to publish error to topic '%s': %v", topic, token.Error()) - } -} - -// Handles successful MQTT connection -func connectHandler(client mqtt.Client) { - log.Println("Connected to MQTT broker") -} - -// Handles MQTT connection loss -func connectionLostHandler(client mqtt.Client, err error) { - log.Printf("Connection lost: %v. Attempting to reconnect...", err) -} - -func main() { - var err error - godotenv.Load() - // Connect to the database - db, err = sql.Open(dbDriver, dbDataSource) - if err != nil { - log.Fatalf("Database connection failed: %v", err) - } - defer db.Close() - - // Validate database connection - if err := db.Ping(); err != nil { - log.Fatalf("Database ping failed: %v", err) - } - - // Configure MQTT client - opts := mqtt.NewClientOptions(). - AddBroker(mqttBroker). - SetClientID(clientID). - SetCleanSession(false). - SetAutoReconnect(true). - SetOrderMatters(false). - SetOnConnectHandler(connectHandler). - SetConnectionLostHandler(connectionLostHandler) - - client := mqtt.NewClient(opts) - if token := client.Connect(); token.Wait() && token.Error() != nil { - log.Fatalf("Failed to connect to MQTT broker: %v", token.Error()) - } - - // Subscribe to topics - for topic, qos := range topics { - if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil { - log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error()) - } - log.Printf("Subscribed to topic: %s", topic) - } - - // Keep the program running - sigChan := make(chan os.Signal, 1) - <-sigChan -} - - - - - - - - +} \ No newline at end of file