Compare commits
13 Commits
860bea9f0e
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| ddd065e2a9 | |||
| acec9d72b4 | |||
|
|
64a8a32d1a | ||
| 89786c9dfa | |||
|
|
bd68af1f58 | ||
|
|
f6d3908088 | ||
|
|
5aad1d7d44 | ||
|
|
02fe9ea45e | ||
|
|
edfdc3699d | ||
|
|
dbcb05d017 | ||
|
|
3fd408d3ee | ||
|
|
36142de600 | ||
|
|
1c2e45541c |
198
mqtt_client.go
198
mqtt_client.go
@@ -1,174 +1,3 @@
|
|||||||
// package main
|
|
||||||
|
|
||||||
// // import (
|
|
||||||
// // "database/sql"
|
|
||||||
// // "encoding/json"
|
|
||||||
// // "fmt"
|
|
||||||
// // "log"
|
|
||||||
// // "os"
|
|
||||||
|
|
||||||
// // mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
||||||
// // "github.com/joho/godotenv"
|
|
||||||
// // _ "github.com/lib/pq" // PostgreSQL driver
|
|
||||||
// // )
|
|
||||||
// import (
|
|
||||||
// "database/sql"
|
|
||||||
// "encoding/json"
|
|
||||||
// "fmt"
|
|
||||||
// "log"
|
|
||||||
// "os"
|
|
||||||
|
|
||||||
// mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
||||||
// "github.com/joho/godotenv"
|
|
||||||
// _ "github.com/lib/pq"
|
|
||||||
// )
|
|
||||||
|
|
||||||
// var (
|
|
||||||
// mqttBroker string
|
|
||||||
// dbDriver string
|
|
||||||
// dbDataSource string
|
|
||||||
// clientID string
|
|
||||||
// )
|
|
||||||
|
|
||||||
|
|
||||||
// func init() {
|
|
||||||
// // Load environment variables from .env file
|
|
||||||
// if err := godotenv.Load(); err != nil {
|
|
||||||
// log.Fatalf("Error loading .env file: %v", err)
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // Retrieve environment variables
|
|
||||||
// mqttBroker = os.Getenv("MQTT_BROKER")
|
|
||||||
// dbDriver = os.Getenv("DB_DRIVER")
|
|
||||||
// dbDataSource = os.Getenv("DB_DATASOURCE")
|
|
||||||
// clientID = os.Getenv("CLIENT_ID")
|
|
||||||
|
|
||||||
// // Validate required environment variables
|
|
||||||
// if mqttBroker == "" || dbDriver == "" || dbDataSource == "" || clientID == "" {
|
|
||||||
// log.Fatalf("Missing required environment variables")
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// var (
|
|
||||||
// topics = map[string]byte{"test": 1} // Topics to subscribe with QoS 1
|
|
||||||
// db *sql.DB // Database connection
|
|
||||||
// )
|
|
||||||
|
|
||||||
// // Handles incoming MQTT messages
|
|
||||||
// func messageHandler(client mqtt.Client, msg mqtt.Message) {
|
|
||||||
// fmt.Printf("Received message on topic %s: %s\n", msg.Topic(), msg.Payload())
|
|
||||||
|
|
||||||
// requiredFields := []string{"temp", "humidity", "pressure", "timestamp"}
|
|
||||||
// errorTopic := "error_topic"
|
|
||||||
|
|
||||||
// // Validate JSON payload
|
|
||||||
// var jsonData map[string]interface{}
|
|
||||||
// if err := json.Unmarshal(msg.Payload(), &jsonData); err != nil {
|
|
||||||
// publishError(client, errorTopic, fmt.Sprintf("Invalid JSON payload on topic %s: %s", msg.Topic(), msg.Payload()))
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // Check for missing fields
|
|
||||||
// for _, field := range requiredFields {
|
|
||||||
// if _, exists := jsonData[field]; !exists {
|
|
||||||
// publishError(client, errorTopic, fmt.Sprintf("Missing field '%s' in payload on topic %s: %s", field, msg.Topic(), msg.Payload()))
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // Create table dynamically if it doesn't exist
|
|
||||||
// tableName := msg.Topic()
|
|
||||||
// createTableQuery := fmt.Sprintf(`
|
|
||||||
// CREATE TABLE IF NOT EXISTS %s (
|
|
||||||
// id SERIAL PRIMARY KEY,
|
|
||||||
// temperature FLOAT NOT NULL,
|
|
||||||
// humidity INT NOT NULL,
|
|
||||||
// pressure INT NOT NULL,
|
|
||||||
// timestamp TIMESTAMP NOT NULL
|
|
||||||
// )`, tableName)
|
|
||||||
// if _, err := db.Exec(createTableQuery); err != nil {
|
|
||||||
// publishError(client, errorTopic, fmt.Sprintf("Failed to create table '%s': %v", tableName, err))
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // Insert data into the table
|
|
||||||
// insertQuery := fmt.Sprintf(`
|
|
||||||
// INSERT INTO %s (temperature, humidity, pressure, timestamp)
|
|
||||||
// VALUES ($1, $2, $3, $4)`, tableName)
|
|
||||||
// _, err := db.Exec(insertQuery, jsonData["temperature"], jsonData["humidity"], jsonData["pressure"], jsonData["timestamp"])
|
|
||||||
// if err != nil {
|
|
||||||
// publishError(client, errorTopic, fmt.Sprintf("Failed to insert data into table '%s': %v", tableName, err))
|
|
||||||
// } else {
|
|
||||||
// log.Printf("Data successfully inserted into table '%s'.", tableName)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // Publishes error messages to a specific topic
|
|
||||||
// func publishError(client mqtt.Client, topic, message string) {
|
|
||||||
// token := client.Publish(topic, 1, false, message)
|
|
||||||
// token.Wait()
|
|
||||||
// if token.Error() != nil {
|
|
||||||
// log.Printf("Failed to publish error to topic '%s': %v", topic, token.Error())
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // Handles successful MQTT connection
|
|
||||||
// func connectHandler(client mqtt.Client) {
|
|
||||||
// log.Println("Connected to MQTT broker")
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // Handles MQTT connection loss
|
|
||||||
// func connectionLostHandler(client mqtt.Client, err error) {
|
|
||||||
// log.Printf("Connection lost: %v. Attempting to reconnect...", err)
|
|
||||||
// }
|
|
||||||
|
|
||||||
// func main() {
|
|
||||||
// var err error
|
|
||||||
// godotenv.Load()
|
|
||||||
// // Connect to the database
|
|
||||||
// db, err = sql.Open(dbDriver, dbDataSource)
|
|
||||||
// if err != nil {
|
|
||||||
// log.Fatalf("Database connection failed: %v", err)
|
|
||||||
// }
|
|
||||||
// defer db.Close()
|
|
||||||
|
|
||||||
// // Validate database connection
|
|
||||||
// if err := db.Ping(); err != nil {
|
|
||||||
// log.Fatalf("Database ping failed: %v", err)
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // Configure MQTT client
|
|
||||||
// opts := mqtt.NewClientOptions().
|
|
||||||
// AddBroker(mqttBroker).
|
|
||||||
// SetClientID(clientID).
|
|
||||||
// SetUsername("test"). // todo: Change to .env variable
|
|
||||||
// SetPassword("test"). // todo: Change to .env variable
|
|
||||||
// SetCleanSession(false).
|
|
||||||
// SetAutoReconnect(true).
|
|
||||||
// SetOrderMatters(false).
|
|
||||||
// SetOnConnectHandler(connectHandler).
|
|
||||||
// SetConnectionLostHandler(connectionLostHandler)
|
|
||||||
|
|
||||||
// client := mqtt.NewClient(opts)
|
|
||||||
// if token := client.Connect(); token.Wait() && token.Error() != nil {
|
|
||||||
// log.Fatalf("Failed to connect to MQTT broker: %v", token.Error())
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // Subscribe to topics
|
|
||||||
// for topic, qos := range topics {
|
|
||||||
// if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil {
|
|
||||||
// log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error())
|
|
||||||
// }
|
|
||||||
// log.Printf("Subscribed to topic: %s", topic)
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // Keep the program running
|
|
||||||
// sigChan := make(chan os.Signal, 1)
|
|
||||||
// <-sigChan
|
|
||||||
// }
|
|
||||||
|
|
||||||
//
|
|
||||||
|
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -214,6 +43,10 @@ var (
|
|||||||
topics = map[string]byte{"powerhouse": 1} // Topics to subscribe with QoS 1
|
topics = map[string]byte{"powerhouse": 1} // Topics to subscribe with QoS 1
|
||||||
db *sql.DB // Database connection
|
db *sql.DB // Database connection
|
||||||
)
|
)
|
||||||
|
func publishError(client mqtt.Client, topic, errorMsg string) {
|
||||||
|
token := client.Publish(topic, 0, false, errorMsg)
|
||||||
|
token.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
// Handles incoming MQTT messages
|
// Handles incoming MQTT messages
|
||||||
func messageHandler(client mqtt.Client, msg mqtt.Message) {
|
func messageHandler(client mqtt.Client, msg mqtt.Message) {
|
||||||
@@ -266,7 +99,7 @@ func messageHandler(client mqtt.Client, msg mqtt.Message) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for duplicates
|
|
||||||
var exists int
|
var exists int
|
||||||
existsQuery := `SELECT 1 FROM temp_live_readings WHERE created_at = $1 AND mfm_meter_id = $2 LIMIT 1`
|
existsQuery := `SELECT 1 FROM temp_live_readings WHERE created_at = $1 AND mfm_meter_id = $2 LIMIT 1`
|
||||||
err = db.QueryRow(existsQuery, payload.CreatedAt, mfmMeterID).Scan(&exists)
|
err = db.QueryRow(existsQuery, payload.CreatedAt, mfmMeterID).Scan(&exists)
|
||||||
@@ -294,6 +127,7 @@ func messageHandler(client mqtt.Client, msg mqtt.Message) {
|
|||||||
publishError(client, errorTopic, fmt.Sprintf("❌ DB insert failed: %v", err))
|
publishError(client, errorTopic, fmt.Sprintf("❌ DB insert failed: %v", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
//..
|
||||||
|
|
||||||
log.Printf("✅ Data inserted successfully for plant ID %d and mfm_meter_id %d", plantID, mfmMeterID)
|
log.Printf("✅ Data inserted successfully for plant ID %d and mfm_meter_id %d", plantID, mfmMeterID)
|
||||||
|
|
||||||
@@ -371,7 +205,6 @@ func messageHandler(client mqtt.Client, msg mqtt.Message) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build insert statement
|
|
||||||
columns := []string{"plant_id", "mfm_meter_id"}
|
columns := []string{"plant_id", "mfm_meter_id"}
|
||||||
placeholders := []string{"$1", "$2"}
|
placeholders := []string{"$1", "$2"}
|
||||||
args := []interface{}{plantID, mfmMeterID}
|
args := []interface{}{plantID, mfmMeterID}
|
||||||
@@ -411,15 +244,6 @@ func messageHandler(client mqtt.Client, msg mqtt.Message) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// Publishes error messages to a specific topic
|
|
||||||
func publishError(client mqtt.Client, topic, message string) {
|
|
||||||
token := client.Publish(topic, 1, false, message)
|
|
||||||
token.Wait()
|
|
||||||
if token.Error() != nil {
|
|
||||||
log.Printf("Failed to publish error to topic '%s': %v", topic, token.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Handles successful MQTT connection
|
// Handles successful MQTT connection
|
||||||
func connectHandler(client mqtt.Client) {
|
func connectHandler(client mqtt.Client) {
|
||||||
log.Println("Connected to MQTT broker")
|
log.Println("Connected to MQTT broker")
|
||||||
@@ -449,6 +273,8 @@ func main() {
|
|||||||
opts := mqtt.NewClientOptions().
|
opts := mqtt.NewClientOptions().
|
||||||
AddBroker(mqttBroker).
|
AddBroker(mqttBroker).
|
||||||
SetClientID(clientID).
|
SetClientID(clientID).
|
||||||
|
// SetUsername("test").
|
||||||
|
// SetPassword("test").
|
||||||
SetCleanSession(false).
|
SetCleanSession(false).
|
||||||
SetAutoReconnect(true).
|
SetAutoReconnect(true).
|
||||||
SetOrderMatters(false).
|
SetOrderMatters(false).
|
||||||
@@ -460,7 +286,6 @@ func main() {
|
|||||||
log.Fatalf("Failed to connect to MQTT broker: %v", token.Error())
|
log.Fatalf("Failed to connect to MQTT broker: %v", token.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe to topics
|
|
||||||
for topic, qos := range topics {
|
for topic, qos := range topics {
|
||||||
if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil {
|
if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil {
|
||||||
log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error())
|
log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error())
|
||||||
@@ -468,15 +293,8 @@ func main() {
|
|||||||
log.Printf("Subscribed to topic: %s", topic)
|
log.Printf("Subscribed to topic: %s", topic)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Keep the program running
|
|
||||||
sigChan := make(chan os.Signal, 1)
|
sigChan := make(chan os.Signal, 1)
|
||||||
<-sigChan
|
<-sigChan
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user