Initial commit
This commit is contained in:
4
.env
Normal file
4
.env
Normal file
@@ -0,0 +1,4 @@
|
||||
MQTT_BROKER=tcp://172.31.31.32:1883
|
||||
DB_DRIVER=postgres
|
||||
DB_DATASOURCE=host=pgdb-pds user=pds password=H0ZiUEB1syaSCdHy dbname=pds sslmode=disable
|
||||
CLIENT_ID=
|
||||
15
go.mod
Normal file
15
go.mod
Normal file
@@ -0,0 +1,15 @@
|
||||
module mqtt-client
|
||||
|
||||
go 1.22.2
|
||||
|
||||
require (
|
||||
github.com/eclipse/paho.mqtt.golang v1.5.0
|
||||
github.com/joho/godotenv v1.5.1
|
||||
github.com/lib/pq v1.10.9
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/gorilla/websocket v1.5.3 // indirect
|
||||
golang.org/x/net v0.27.0 // indirect
|
||||
golang.org/x/sync v0.10.0 // indirect
|
||||
)
|
||||
12
go.sum
Normal file
12
go.sum
Normal file
@@ -0,0 +1,12 @@
|
||||
github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
|
||||
github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
|
||||
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
||||
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
|
||||
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
|
||||
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
||||
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
|
||||
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
|
||||
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
|
||||
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||
156
mqtt_client.go
Normal file
156
mqtt_client.go
Normal file
@@ -0,0 +1,156 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/joho/godotenv"
|
||||
_ "github.com/lib/pq" // PostgreSQL driver
|
||||
)
|
||||
|
||||
var (
|
||||
mqttBroker string
|
||||
dbDriver string
|
||||
dbDataSource string
|
||||
clientID string
|
||||
)
|
||||
|
||||
func init() {
|
||||
// Load environment variables from .env file
|
||||
if err := godotenv.Load(); err != nil {
|
||||
log.Fatalf("Error loading .env file: %v", err)
|
||||
}
|
||||
|
||||
// Retrieve environment variables
|
||||
mqttBroker = os.Getenv("MQTT_BROKER")
|
||||
dbDriver = os.Getenv("DB_DRIVER")
|
||||
dbDataSource = os.Getenv("DB_DATASOURCE")
|
||||
clientID = os.Getenv("CLIENT_ID")
|
||||
|
||||
// Validate required environment variables
|
||||
if mqttBroker == "" || dbDriver == "" || dbDataSource == "" || clientID == "" {
|
||||
log.Fatalf("Missing required environment variables")
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
topics = map[string]byte{"test": 1} // Topics to subscribe with QoS 1
|
||||
db *sql.DB // Database connection
|
||||
)
|
||||
|
||||
// Handles incoming MQTT messages
|
||||
func messageHandler(client mqtt.Client, msg mqtt.Message) {
|
||||
fmt.Printf("Received message on topic %s: %s\n", msg.Topic(), msg.Payload())
|
||||
|
||||
requiredFields := []string{"temp", "humidity", "pressure", "timestamp"}
|
||||
errorTopic := "error_topic"
|
||||
|
||||
// Validate JSON payload
|
||||
var jsonData map[string]interface{}
|
||||
if err := json.Unmarshal(msg.Payload(), &jsonData); err != nil {
|
||||
publishError(client, errorTopic, fmt.Sprintf("Invalid JSON payload on topic %s: %s", msg.Topic(), msg.Payload()))
|
||||
return
|
||||
}
|
||||
|
||||
// Check for missing fields
|
||||
for _, field := range requiredFields {
|
||||
if _, exists := jsonData[field]; !exists {
|
||||
publishError(client, errorTopic, fmt.Sprintf("Missing field '%s' in payload on topic %s: %s", field, msg.Topic(), msg.Payload()))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Create table dynamically if it doesn't exist
|
||||
tableName := msg.Topic()
|
||||
createTableQuery := fmt.Sprintf(`
|
||||
CREATE TABLE IF NOT EXISTS %s (
|
||||
id SERIAL PRIMARY KEY,
|
||||
temperature FLOAT NOT NULL,
|
||||
humidity INT NOT NULL,
|
||||
pressure INT NOT NULL,
|
||||
timestamp TIMESTAMP NOT NULL
|
||||
)`, tableName)
|
||||
if _, err := db.Exec(createTableQuery); err != nil {
|
||||
publishError(client, errorTopic, fmt.Sprintf("Failed to create table '%s': %v", tableName, err))
|
||||
return
|
||||
}
|
||||
|
||||
// Insert data into the table
|
||||
insertQuery := fmt.Sprintf(`
|
||||
INSERT INTO %s (temperature, humidity, pressure, timestamp)
|
||||
VALUES ($1, $2, $3, $4)`, tableName)
|
||||
_, err := db.Exec(insertQuery, jsonData["temperature"], jsonData["humidity"], jsonData["pressure"], jsonData["timestamp"])
|
||||
if err != nil {
|
||||
publishError(client, errorTopic, fmt.Sprintf("Failed to insert data into table '%s': %v", tableName, err))
|
||||
} else {
|
||||
log.Printf("Data successfully inserted into table '%s'.", tableName)
|
||||
}
|
||||
}
|
||||
|
||||
// Publishes error messages to a specific topic
|
||||
func publishError(client mqtt.Client, topic, message string) {
|
||||
token := client.Publish(topic, 1, false, message)
|
||||
token.Wait()
|
||||
if token.Error() != nil {
|
||||
log.Printf("Failed to publish error to topic '%s': %v", topic, token.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// Handles successful MQTT connection
|
||||
func connectHandler(client mqtt.Client) {
|
||||
log.Println("Connected to MQTT broker")
|
||||
}
|
||||
|
||||
// Handles MQTT connection loss
|
||||
func connectionLostHandler(client mqtt.Client, err error) {
|
||||
log.Printf("Connection lost: %v. Attempting to reconnect...", err)
|
||||
}
|
||||
|
||||
func main() {
|
||||
var err error
|
||||
godotenv.Load()
|
||||
// Connect to the database
|
||||
db, err = sql.Open(dbDriver, dbDataSource)
|
||||
if err != nil {
|
||||
log.Fatalf("Database connection failed: %v", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Validate database connection
|
||||
if err := db.Ping(); err != nil {
|
||||
log.Fatalf("Database ping failed: %v", err)
|
||||
}
|
||||
|
||||
// Configure MQTT client
|
||||
opts := mqtt.NewClientOptions().
|
||||
AddBroker(mqttBroker).
|
||||
SetClientID(clientID).
|
||||
SetUsername("test"). // todo: Change to .env variable
|
||||
SetPassword("test"). // todo: Change to .env variable
|
||||
SetCleanSession(false).
|
||||
SetAutoReconnect(true).
|
||||
SetOrderMatters(false).
|
||||
SetOnConnectHandler(connectHandler).
|
||||
SetConnectionLostHandler(connectionLostHandler)
|
||||
|
||||
client := mqtt.NewClient(opts)
|
||||
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
||||
log.Fatalf("Failed to connect to MQTT broker: %v", token.Error())
|
||||
}
|
||||
|
||||
// Subscribe to topics
|
||||
for topic, qos := range topics {
|
||||
if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil {
|
||||
log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error())
|
||||
}
|
||||
log.Printf("Subscribed to topic: %s", topic)
|
||||
}
|
||||
|
||||
// Keep the program running
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
<-sigChan
|
||||
}
|
||||
Reference in New Issue
Block a user