From 5fa8b09acc328aa1e019647e212aeb5e34564f90 Mon Sep 17 00:00:00 2001 From: ranjith Date: Tue, 15 Jul 2025 15:22:42 +0530 Subject: [PATCH] Initial commit --- .env | 4 ++ go.mod | 15 +++++ go.sum | 12 ++++ mqtt_client.go | 156 +++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 187 insertions(+) create mode 100644 .env create mode 100644 go.mod create mode 100644 go.sum create mode 100644 mqtt_client.go diff --git a/.env b/.env new file mode 100644 index 0000000..5012b13 --- /dev/null +++ b/.env @@ -0,0 +1,4 @@ +MQTT_BROKER=tcp://172.31.31.32:1883 +DB_DRIVER=postgres +DB_DATASOURCE=host=pgdb-pds user=pds password=H0ZiUEB1syaSCdHy dbname=pds sslmode=disable +CLIENT_ID= \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..7bbb7e0 --- /dev/null +++ b/go.mod @@ -0,0 +1,15 @@ +module mqtt-client + +go 1.22.2 + +require ( + github.com/eclipse/paho.mqtt.golang v1.5.0 + github.com/joho/godotenv v1.5.1 + github.com/lib/pq v1.10.9 +) + +require ( + github.com/gorilla/websocket v1.5.3 // indirect + golang.org/x/net v0.27.0 // indirect + golang.org/x/sync v0.10.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..5cd59db --- /dev/null +++ b/go.sum @@ -0,0 +1,12 @@ +github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o= +github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= +golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= diff --git a/mqtt_client.go b/mqtt_client.go new file mode 100644 index 0000000..b24f4bb --- /dev/null +++ b/mqtt_client.go @@ -0,0 +1,156 @@ +package main + +import ( + "database/sql" + "encoding/json" + "fmt" + "log" + "os" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/joho/godotenv" + _ "github.com/lib/pq" // PostgreSQL driver +) + +var ( + mqttBroker string + dbDriver string + dbDataSource string + clientID string +) + +func init() { + // Load environment variables from .env file + if err := godotenv.Load(); err != nil { + log.Fatalf("Error loading .env file: %v", err) + } + + // Retrieve environment variables + mqttBroker = os.Getenv("MQTT_BROKER") + dbDriver = os.Getenv("DB_DRIVER") + dbDataSource = os.Getenv("DB_DATASOURCE") + clientID = os.Getenv("CLIENT_ID") + + // Validate required environment variables + if mqttBroker == "" || dbDriver == "" || dbDataSource == "" || clientID == "" { + log.Fatalf("Missing required environment variables") + } +} + +var ( + topics = map[string]byte{"test": 1} // Topics to subscribe with QoS 1 + db *sql.DB // Database connection +) + +// Handles incoming MQTT messages +func messageHandler(client mqtt.Client, msg mqtt.Message) { + fmt.Printf("Received message on topic %s: %s\n", msg.Topic(), msg.Payload()) + + requiredFields := []string{"temp", "humidity", "pressure", "timestamp"} + errorTopic := "error_topic" + + // Validate JSON payload + var jsonData map[string]interface{} + if err := json.Unmarshal(msg.Payload(), &jsonData); err != nil { + publishError(client, errorTopic, fmt.Sprintf("Invalid JSON payload on topic %s: %s", msg.Topic(), msg.Payload())) + return + } + + // Check for missing fields + for _, field := range requiredFields { + if _, exists := jsonData[field]; !exists { + publishError(client, errorTopic, fmt.Sprintf("Missing field '%s' in payload on topic %s: %s", field, msg.Topic(), msg.Payload())) + return + } + } + + // Create table dynamically if it doesn't exist + tableName := msg.Topic() + createTableQuery := fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + temperature FLOAT NOT NULL, + humidity INT NOT NULL, + pressure INT NOT NULL, + timestamp TIMESTAMP NOT NULL + )`, tableName) + if _, err := db.Exec(createTableQuery); err != nil { + publishError(client, errorTopic, fmt.Sprintf("Failed to create table '%s': %v", tableName, err)) + return + } + + // Insert data into the table + insertQuery := fmt.Sprintf(` + INSERT INTO %s (temperature, humidity, pressure, timestamp) + VALUES ($1, $2, $3, $4)`, tableName) + _, err := db.Exec(insertQuery, jsonData["temperature"], jsonData["humidity"], jsonData["pressure"], jsonData["timestamp"]) + if err != nil { + publishError(client, errorTopic, fmt.Sprintf("Failed to insert data into table '%s': %v", tableName, err)) + } else { + log.Printf("Data successfully inserted into table '%s'.", tableName) + } +} + +// Publishes error messages to a specific topic +func publishError(client mqtt.Client, topic, message string) { + token := client.Publish(topic, 1, false, message) + token.Wait() + if token.Error() != nil { + log.Printf("Failed to publish error to topic '%s': %v", topic, token.Error()) + } +} + +// Handles successful MQTT connection +func connectHandler(client mqtt.Client) { + log.Println("Connected to MQTT broker") +} + +// Handles MQTT connection loss +func connectionLostHandler(client mqtt.Client, err error) { + log.Printf("Connection lost: %v. Attempting to reconnect...", err) +} + +func main() { + var err error + godotenv.Load() + // Connect to the database + db, err = sql.Open(dbDriver, dbDataSource) + if err != nil { + log.Fatalf("Database connection failed: %v", err) + } + defer db.Close() + + // Validate database connection + if err := db.Ping(); err != nil { + log.Fatalf("Database ping failed: %v", err) + } + + // Configure MQTT client + opts := mqtt.NewClientOptions(). + AddBroker(mqttBroker). + SetClientID(clientID). + SetUsername("test"). // todo: Change to .env variable + SetPassword("test"). // todo: Change to .env variable + SetCleanSession(false). + SetAutoReconnect(true). + SetOrderMatters(false). + SetOnConnectHandler(connectHandler). + SetConnectionLostHandler(connectionLostHandler) + + client := mqtt.NewClient(opts) + if token := client.Connect(); token.Wait() && token.Error() != nil { + log.Fatalf("Failed to connect to MQTT broker: %v", token.Error()) + } + + // Subscribe to topics + for topic, qos := range topics { + if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil { + log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error()) + } + log.Printf("Subscribed to topic: %s", topic) + } + + // Keep the program running + sigChan := make(chan os.Signal, 1) + <-sigChan +}