From 516624fd8df99de985fe29a5de68ad0c0bd4f416 Mon Sep 17 00:00:00 2001 From: ranjith Date: Tue, 29 Jul 2025 05:23:21 +0000 Subject: [PATCH] added commented line --- mqtt_client.go | 171 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 171 insertions(+) diff --git a/mqtt_client.go b/mqtt_client.go index f4cfc05..20147d1 100644 --- a/mqtt_client.go +++ b/mqtt_client.go @@ -1,3 +1,174 @@ +// package main + +// // import ( +// // "database/sql" +// // "encoding/json" +// // "fmt" +// // "log" +// // "os" + +// // mqtt "github.com/eclipse/paho.mqtt.golang" +// // "github.com/joho/godotenv" +// // _ "github.com/lib/pq" // PostgreSQL driver +// // ) +// import ( +// "database/sql" +// "encoding/json" +// "fmt" +// "log" +// "os" + +// mqtt "github.com/eclipse/paho.mqtt.golang" +// "github.com/joho/godotenv" +// _ "github.com/lib/pq" +// ) + +// var ( +// mqttBroker string +// dbDriver string +// dbDataSource string +// clientID string +// ) + + +// func init() { +// // Load environment variables from .env file +// if err := godotenv.Load(); err != nil { +// log.Fatalf("Error loading .env file: %v", err) +// } + +// // Retrieve environment variables +// mqttBroker = os.Getenv("MQTT_BROKER") +// dbDriver = os.Getenv("DB_DRIVER") +// dbDataSource = os.Getenv("DB_DATASOURCE") +// clientID = os.Getenv("CLIENT_ID") + +// // Validate required environment variables +// if mqttBroker == "" || dbDriver == "" || dbDataSource == "" || clientID == "" { +// log.Fatalf("Missing required environment variables") +// } +// } + +// var ( +// topics = map[string]byte{"test": 1} // Topics to subscribe with QoS 1 +// db *sql.DB // Database connection +// ) + +// // Handles incoming MQTT messages +// func messageHandler(client mqtt.Client, msg mqtt.Message) { +// fmt.Printf("Received message on topic %s: %s\n", msg.Topic(), msg.Payload()) + +// requiredFields := []string{"temp", "humidity", "pressure", "timestamp"} +// errorTopic := "error_topic" + +// // Validate JSON payload +// var jsonData map[string]interface{} +// if err := json.Unmarshal(msg.Payload(), &jsonData); err != nil { +// publishError(client, errorTopic, fmt.Sprintf("Invalid JSON payload on topic %s: %s", msg.Topic(), msg.Payload())) +// return +// } + +// // Check for missing fields +// for _, field := range requiredFields { +// if _, exists := jsonData[field]; !exists { +// publishError(client, errorTopic, fmt.Sprintf("Missing field '%s' in payload on topic %s: %s", field, msg.Topic(), msg.Payload())) +// return +// } +// } + +// // Create table dynamically if it doesn't exist +// tableName := msg.Topic() +// createTableQuery := fmt.Sprintf(` +// CREATE TABLE IF NOT EXISTS %s ( +// id SERIAL PRIMARY KEY, +// temperature FLOAT NOT NULL, +// humidity INT NOT NULL, +// pressure INT NOT NULL, +// timestamp TIMESTAMP NOT NULL +// )`, tableName) +// if _, err := db.Exec(createTableQuery); err != nil { +// publishError(client, errorTopic, fmt.Sprintf("Failed to create table '%s': %v", tableName, err)) +// return +// } + +// // Insert data into the table +// insertQuery := fmt.Sprintf(` +// INSERT INTO %s (temperature, humidity, pressure, timestamp) +// VALUES ($1, $2, $3, $4)`, tableName) +// _, err := db.Exec(insertQuery, jsonData["temperature"], jsonData["humidity"], jsonData["pressure"], jsonData["timestamp"]) +// if err != nil { +// publishError(client, errorTopic, fmt.Sprintf("Failed to insert data into table '%s': %v", tableName, err)) +// } else { +// log.Printf("Data successfully inserted into table '%s'.", tableName) +// } +// } + +// // Publishes error messages to a specific topic +// func publishError(client mqtt.Client, topic, message string) { +// token := client.Publish(topic, 1, false, message) +// token.Wait() +// if token.Error() != nil { +// log.Printf("Failed to publish error to topic '%s': %v", topic, token.Error()) +// } +// } + +// // Handles successful MQTT connection +// func connectHandler(client mqtt.Client) { +// log.Println("Connected to MQTT broker") +// } + +// // Handles MQTT connection loss +// func connectionLostHandler(client mqtt.Client, err error) { +// log.Printf("Connection lost: %v. Attempting to reconnect...", err) +// } + +// func main() { +// var err error +// godotenv.Load() +// // Connect to the database +// db, err = sql.Open(dbDriver, dbDataSource) +// if err != nil { +// log.Fatalf("Database connection failed: %v", err) +// } +// defer db.Close() + +// // Validate database connection +// if err := db.Ping(); err != nil { +// log.Fatalf("Database ping failed: %v", err) +// } + +// // Configure MQTT client +// opts := mqtt.NewClientOptions(). +// AddBroker(mqttBroker). +// SetClientID(clientID). +// SetUsername("test"). // todo: Change to .env variable +// SetPassword("test"). // todo: Change to .env variable +// SetCleanSession(false). +// SetAutoReconnect(true). +// SetOrderMatters(false). +// SetOnConnectHandler(connectHandler). +// SetConnectionLostHandler(connectionLostHandler) + +// client := mqtt.NewClient(opts) +// if token := client.Connect(); token.Wait() && token.Error() != nil { +// log.Fatalf("Failed to connect to MQTT broker: %v", token.Error()) +// } + +// // Subscribe to topics +// for topic, qos := range topics { +// if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil { +// log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error()) +// } +// log.Printf("Subscribed to topic: %s", topic) +// } + +// // Keep the program running +// sigChan := make(chan os.Signal, 1) +// <-sigChan +// } + +// + package main import (