Files
tracking/mqtt_client.go
2025-09-16 16:37:17 +05:30

157 lines
4.5 KiB
Go

package main
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"os"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/joho/godotenv"
_ "github.com/lib/pq" // PostgreSQL driver
)
var (
mqttBroker string
dbDriver string
dbDataSource string
clientID string
)
func init() {
// Load environment variables from .env file
if err := godotenv.Load(); err != nil {
log.Fatalf("Error loading .env file: %v", err)
}
// Retrieve environment variables
mqttBroker = os.Getenv("MQTT_BROKER")
dbDriver = os.Getenv("DB_DRIVER")
dbDataSource = os.Getenv("DB_DATASOURCE")
clientID = os.Getenv("CLIENT_ID")
// Validate required environment variables
if mqttBroker == "" || dbDriver == "" || dbDataSource == "" || clientID == "" {
log.Fatalf("Missing required environment variables")
}
}
var (
topics = map[string]byte{"test": 1} // Topics to subscribe with QoS 1
db *sql.DB // Database connection
)
// Handles incoming MQTT messages
func messageHandler(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message on topic %s: %s\n", msg.Topic(), msg.Payload())
requiredFields := []string{"temp", "humidity", "pressure", "timestamp"}
errorTopic := "error_topic"
// Validate JSON payload
var jsonData map[string]interface{}
if err := json.Unmarshal(msg.Payload(), &jsonData); err != nil {
publishError(client, errorTopic, fmt.Sprintf("Invalid JSON payload on topic %s: %s", msg.Topic(), msg.Payload()))
return
}
// Check for missing fields
for _, field := range requiredFields {
if _, exists := jsonData[field]; !exists {
publishError(client, errorTopic, fmt.Sprintf("Missing field '%s' in payload on topic %s: %s", field, msg.Topic(), msg.Payload()))
return
}
}
// Create table dynamically if it doesn't exist
tableName := msg.Topic()
createTableQuery := fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
temperature FLOAT NOT NULL,
humidity INT NOT NULL,
pressure INT NOT NULL,
timestamp TIMESTAMP NOT NULL
)`, tableName)
if _, err := db.Exec(createTableQuery); err != nil {
publishError(client, errorTopic, fmt.Sprintf("Failed to create table '%s': %v", tableName, err))
return
}
// Insert data into the table
insertQuery := fmt.Sprintf(`
INSERT INTO %s (temperature, humidity, pressure, timestamp)
VALUES ($1, $2, $3, $4)`, tableName)
_, err := db.Exec(insertQuery, jsonData["temperature"], jsonData["humidity"], jsonData["pressure"], jsonData["timestamp"])
if err != nil {
publishError(client, errorTopic, fmt.Sprintf("Failed to insert data into table '%s': %v", tableName, err))
} else {
log.Printf("Data successfully inserted into table '%s'.", tableName)
}
}
// Publishes error messages to a specific topic
func publishError(client mqtt.Client, topic, message string) {
token := client.Publish(topic, 1, false, message)
token.Wait()
if token.Error() != nil {
log.Printf("Failed to publish error to topic '%s': %v", topic, token.Error())
}
}
// Handles successful MQTT connection
func connectHandler(client mqtt.Client) {
log.Println("Connected to MQTT broker")
}
// Handles MQTT connection loss
func connectionLostHandler(client mqtt.Client, err error) {
log.Printf("Connection lost: %v. Attempting to reconnect...", err)
}
func main() {
var err error
godotenv.Load()
// Connect to the database
db, err = sql.Open(dbDriver, dbDataSource)
if err != nil {
log.Fatalf("Database connection failed: %v", err)
}
defer db.Close()
// Validate database connection
if err := db.Ping(); err != nil {
log.Fatalf("Database ping failed: %v", err)
}
// Configure MQTT client
opts := mqtt.NewClientOptions().
AddBroker(mqttBroker).
SetClientID(clientID).
SetUsername("test"). // todo: Change to .env variable
SetPassword("test"). // todo: Change to .env variable
SetCleanSession(false).
SetAutoReconnect(true).
SetOrderMatters(false).
SetOnConnectHandler(connectHandler).
SetConnectionLostHandler(connectionLostHandler)
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Fatalf("Failed to connect to MQTT broker: %v", token.Error())
}
// Subscribe to topics
for topic, qos := range topics {
if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil {
log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error())
}
log.Printf("Subscribed to topic: %s", topic)
}
// Keep the program running
sigChan := make(chan os.Signal, 1)
<-sigChan
}