package main import ( "database/sql" "encoding/json" "fmt" "log" "os" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/joho/godotenv" _ "github.com/lib/pq" // PostgreSQL driver ) var ( mqttBroker string dbDriver string dbDataSource string clientID string ) func init() { // Load environment variables from .env file if err := godotenv.Load(); err != nil { log.Fatalf("Error loading .env file: %v", err) } // Retrieve environment variables mqttBroker = os.Getenv("MQTT_BROKER") dbDriver = os.Getenv("DB_DRIVER") dbDataSource = os.Getenv("DB_DATASOURCE") clientID = os.Getenv("CLIENT_ID") // Validate required environment variables if mqttBroker == "" || dbDriver == "" || dbDataSource == "" || clientID == "" { log.Fatalf("Missing required environment variables") } } var ( topics = map[string]byte{"test": 1} // Topics to subscribe with QoS 1 db *sql.DB // Database connection ) // Handles incoming MQTT messages func messageHandler(client mqtt.Client, msg mqtt.Message) { fmt.Printf("Received message on topic %s: %s\n", msg.Topic(), msg.Payload()) requiredFields := []string{"temp", "humidity", "pressure", "timestamp"} errorTopic := "error_topic" // Validate JSON payload var jsonData map[string]interface{} if err := json.Unmarshal(msg.Payload(), &jsonData); err != nil { publishError(client, errorTopic, fmt.Sprintf("Invalid JSON payload on topic %s: %s", msg.Topic(), msg.Payload())) return } // Check for missing fields for _, field := range requiredFields { if _, exists := jsonData[field]; !exists { publishError(client, errorTopic, fmt.Sprintf("Missing field '%s' in payload on topic %s: %s", field, msg.Topic(), msg.Payload())) return } } // Create table dynamically if it doesn't exist tableName := msg.Topic() createTableQuery := fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, temperature FLOAT NOT NULL, humidity INT NOT NULL, pressure INT NOT NULL, timestamp TIMESTAMP NOT NULL )`, tableName) if _, err := db.Exec(createTableQuery); err != nil { publishError(client, errorTopic, fmt.Sprintf("Failed to create table '%s': %v", tableName, err)) return } // Insert data into the table insertQuery := fmt.Sprintf(` INSERT INTO %s (temperature, humidity, pressure, timestamp) VALUES ($1, $2, $3, $4)`, tableName) _, err := db.Exec(insertQuery, jsonData["temperature"], jsonData["humidity"], jsonData["pressure"], jsonData["timestamp"]) if err != nil { publishError(client, errorTopic, fmt.Sprintf("Failed to insert data into table '%s': %v", tableName, err)) } else { log.Printf("Data successfully inserted into table '%s'.", tableName) } } // Publishes error messages to a specific topic func publishError(client mqtt.Client, topic, message string) { token := client.Publish(topic, 1, false, message) token.Wait() if token.Error() != nil { log.Printf("Failed to publish error to topic '%s': %v", topic, token.Error()) } } // Handles successful MQTT connection func connectHandler(client mqtt.Client) { log.Println("Connected to MQTT broker") } // Handles MQTT connection loss func connectionLostHandler(client mqtt.Client, err error) { log.Printf("Connection lost: %v. Attempting to reconnect...", err) } func main() { var err error godotenv.Load() // Connect to the database db, err = sql.Open(dbDriver, dbDataSource) if err != nil { log.Fatalf("Database connection failed: %v", err) } defer db.Close() // Validate database connection if err := db.Ping(); err != nil { log.Fatalf("Database ping failed: %v", err) } // Configure MQTT client opts := mqtt.NewClientOptions(). AddBroker(mqttBroker). SetClientID(clientID). SetUsername("test"). // todo: Change to .env variable SetPassword("test"). // todo: Change to .env variable SetCleanSession(false). SetAutoReconnect(true). SetOrderMatters(false). SetOnConnectHandler(connectHandler). SetConnectionLostHandler(connectionLostHandler) client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { log.Fatalf("Failed to connect to MQTT broker: %v", token.Error()) } // Subscribe to topics for topic, qos := range topics { if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil { log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error()) } log.Printf("Subscribed to topic: %s", topic) } // Keep the program running sigChan := make(chan os.Signal, 1) <-sigChan }