Stop tracking .env, update dependencies and MQTT logic
This commit is contained in:
4
.env
4
.env
@@ -1,4 +0,0 @@
|
|||||||
MQTT_BROKER=tcp://172.31.31.32:1883
|
|
||||||
DB_DRIVER=postgres
|
|
||||||
DB_DATASOURCE=host=pgdb-pds user=pds password=H0ZiUEB1syaSCdHy dbname=pds sslmode=disable
|
|
||||||
CLIENT_ID=
|
|
||||||
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
|||||||
|
.env
|
||||||
2
go.mod
2
go.mod
@@ -1,6 +1,6 @@
|
|||||||
module mqtt-client
|
module mqtt-client
|
||||||
|
|
||||||
go 1.22.2
|
go 1.22
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/eclipse/paho.mqtt.golang v1.5.0
|
github.com/eclipse/paho.mqtt.golang v1.5.0
|
||||||
|
|||||||
509
mqtt_client.go
509
mqtt_client.go
@@ -1,156 +1,435 @@
|
|||||||
|
// package main
|
||||||
|
|
||||||
|
// // import (
|
||||||
|
// // "database/sql"
|
||||||
|
// // "encoding/json"
|
||||||
|
// // "fmt"
|
||||||
|
// // "log"
|
||||||
|
// // "os"
|
||||||
|
|
||||||
|
// // mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||||
|
// // "github.com/joho/godotenv"
|
||||||
|
// // _ "github.com/lib/pq" // PostgreSQL driver
|
||||||
|
// // )
|
||||||
|
// import (
|
||||||
|
// "database/sql"
|
||||||
|
// "encoding/json"
|
||||||
|
// "fmt"
|
||||||
|
// "log"
|
||||||
|
// "os"
|
||||||
|
|
||||||
|
// mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||||
|
// "github.com/joho/godotenv"
|
||||||
|
// _ "github.com/lib/pq"
|
||||||
|
// )
|
||||||
|
|
||||||
|
// var (
|
||||||
|
// mqttBroker string
|
||||||
|
// dbDriver string
|
||||||
|
// dbDataSource string
|
||||||
|
// clientID string
|
||||||
|
// )
|
||||||
|
|
||||||
|
|
||||||
|
// func init() {
|
||||||
|
// // Load environment variables from .env file
|
||||||
|
// if err := godotenv.Load(); err != nil {
|
||||||
|
// log.Fatalf("Error loading .env file: %v", err)
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // Retrieve environment variables
|
||||||
|
// mqttBroker = os.Getenv("MQTT_BROKER")
|
||||||
|
// dbDriver = os.Getenv("DB_DRIVER")
|
||||||
|
// dbDataSource = os.Getenv("DB_DATASOURCE")
|
||||||
|
// clientID = os.Getenv("CLIENT_ID")
|
||||||
|
|
||||||
|
// // Validate required environment variables
|
||||||
|
// if mqttBroker == "" || dbDriver == "" || dbDataSource == "" || clientID == "" {
|
||||||
|
// log.Fatalf("Missing required environment variables")
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// var (
|
||||||
|
// topics = map[string]byte{"test": 1} // Topics to subscribe with QoS 1
|
||||||
|
// db *sql.DB // Database connection
|
||||||
|
// )
|
||||||
|
|
||||||
|
// // Handles incoming MQTT messages
|
||||||
|
// func messageHandler(client mqtt.Client, msg mqtt.Message) {
|
||||||
|
// fmt.Printf("Received message on topic %s: %s\n", msg.Topic(), msg.Payload())
|
||||||
|
|
||||||
|
// requiredFields := []string{"temp", "humidity", "pressure", "timestamp"}
|
||||||
|
// errorTopic := "error_topic"
|
||||||
|
|
||||||
|
// // Validate JSON payload
|
||||||
|
// var jsonData map[string]interface{}
|
||||||
|
// if err := json.Unmarshal(msg.Payload(), &jsonData); err != nil {
|
||||||
|
// publishError(client, errorTopic, fmt.Sprintf("Invalid JSON payload on topic %s: %s", msg.Topic(), msg.Payload()))
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // Check for missing fields
|
||||||
|
// for _, field := range requiredFields {
|
||||||
|
// if _, exists := jsonData[field]; !exists {
|
||||||
|
// publishError(client, errorTopic, fmt.Sprintf("Missing field '%s' in payload on topic %s: %s", field, msg.Topic(), msg.Payload()))
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // Create table dynamically if it doesn't exist
|
||||||
|
// tableName := msg.Topic()
|
||||||
|
// createTableQuery := fmt.Sprintf(`
|
||||||
|
// CREATE TABLE IF NOT EXISTS %s (
|
||||||
|
// id SERIAL PRIMARY KEY,
|
||||||
|
// temperature FLOAT NOT NULL,
|
||||||
|
// humidity INT NOT NULL,
|
||||||
|
// pressure INT NOT NULL,
|
||||||
|
// timestamp TIMESTAMP NOT NULL
|
||||||
|
// )`, tableName)
|
||||||
|
// if _, err := db.Exec(createTableQuery); err != nil {
|
||||||
|
// publishError(client, errorTopic, fmt.Sprintf("Failed to create table '%s': %v", tableName, err))
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // Insert data into the table
|
||||||
|
// insertQuery := fmt.Sprintf(`
|
||||||
|
// INSERT INTO %s (temperature, humidity, pressure, timestamp)
|
||||||
|
// VALUES ($1, $2, $3, $4)`, tableName)
|
||||||
|
// _, err := db.Exec(insertQuery, jsonData["temperature"], jsonData["humidity"], jsonData["pressure"], jsonData["timestamp"])
|
||||||
|
// if err != nil {
|
||||||
|
// publishError(client, errorTopic, fmt.Sprintf("Failed to insert data into table '%s': %v", tableName, err))
|
||||||
|
// } else {
|
||||||
|
// log.Printf("Data successfully inserted into table '%s'.", tableName)
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // Publishes error messages to a specific topic
|
||||||
|
// func publishError(client mqtt.Client, topic, message string) {
|
||||||
|
// token := client.Publish(topic, 1, false, message)
|
||||||
|
// token.Wait()
|
||||||
|
// if token.Error() != nil {
|
||||||
|
// log.Printf("Failed to publish error to topic '%s': %v", topic, token.Error())
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // Handles successful MQTT connection
|
||||||
|
// func connectHandler(client mqtt.Client) {
|
||||||
|
// log.Println("Connected to MQTT broker")
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // Handles MQTT connection loss
|
||||||
|
// func connectionLostHandler(client mqtt.Client, err error) {
|
||||||
|
// log.Printf("Connection lost: %v. Attempting to reconnect...", err)
|
||||||
|
// }
|
||||||
|
|
||||||
|
// func main() {
|
||||||
|
// var err error
|
||||||
|
// godotenv.Load()
|
||||||
|
// // Connect to the database
|
||||||
|
// db, err = sql.Open(dbDriver, dbDataSource)
|
||||||
|
// if err != nil {
|
||||||
|
// log.Fatalf("Database connection failed: %v", err)
|
||||||
|
// }
|
||||||
|
// defer db.Close()
|
||||||
|
|
||||||
|
// // Validate database connection
|
||||||
|
// if err := db.Ping(); err != nil {
|
||||||
|
// log.Fatalf("Database ping failed: %v", err)
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // Configure MQTT client
|
||||||
|
// opts := mqtt.NewClientOptions().
|
||||||
|
// AddBroker(mqttBroker).
|
||||||
|
// SetClientID(clientID).
|
||||||
|
// SetUsername("test"). // todo: Change to .env variable
|
||||||
|
// SetPassword("test"). // todo: Change to .env variable
|
||||||
|
// SetCleanSession(false).
|
||||||
|
// SetAutoReconnect(true).
|
||||||
|
// SetOrderMatters(false).
|
||||||
|
// SetOnConnectHandler(connectHandler).
|
||||||
|
// SetConnectionLostHandler(connectionLostHandler)
|
||||||
|
|
||||||
|
// client := mqtt.NewClient(opts)
|
||||||
|
// if token := client.Connect(); token.Wait() && token.Error() != nil {
|
||||||
|
// log.Fatalf("Failed to connect to MQTT broker: %v", token.Error())
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // Subscribe to topics
|
||||||
|
// for topic, qos := range topics {
|
||||||
|
// if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil {
|
||||||
|
// log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error())
|
||||||
|
// }
|
||||||
|
// log.Printf("Subscribed to topic: %s", topic)
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // Keep the program running
|
||||||
|
// sigChan := make(chan os.Signal, 1)
|
||||||
|
// <-sigChan
|
||||||
|
// }
|
||||||
|
|
||||||
|
//
|
||||||
|
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"database/sql"
|
||||||
"log"
|
"log"
|
||||||
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||||
"github.com/joho/godotenv"
|
"github.com/joho/godotenv"
|
||||||
_ "github.com/lib/pq" // PostgreSQL driver
|
_ "github.com/lib/pq" //Required for PostgreSQL driver
|
||||||
)
|
)
|
||||||
|
|
||||||
|
//JSON payload structure
|
||||||
|
// type Payload struct {
|
||||||
|
// Temperature float64 `json:"temperature"`
|
||||||
|
// Humidity int `json:"humidity"`
|
||||||
|
// Pressure int `json:"pressure"`
|
||||||
|
// Timestamp string `json:"timestamp"`
|
||||||
|
// }
|
||||||
|
type Payload struct {
|
||||||
|
PlantCode string `json:"plant_id"`
|
||||||
|
MFMMeterID string `json:"mfm_meter_id"`
|
||||||
|
RegisterData string `json:"register_data"`
|
||||||
|
CreatedAt string `json:"created_at"`
|
||||||
|
UpdatedAt string `json:"updated_at"`
|
||||||
|
CreatedBy string `json:"created_by"`
|
||||||
|
}
|
||||||
|
|
||||||
|
//Global variables
|
||||||
var (
|
var (
|
||||||
|
mqttClient mqtt.Client
|
||||||
|
mqttTopic = "test"
|
||||||
|
db *sql.DB
|
||||||
mqttBroker string
|
mqttBroker string
|
||||||
|
clientID string
|
||||||
|
mqttUser string
|
||||||
|
mqttPass string
|
||||||
dbDriver string
|
dbDriver string
|
||||||
dbDataSource string
|
dbDataSource string
|
||||||
clientID string
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func main() {
|
||||||
// Load environment variables from .env file
|
//Load environment variables from .env
|
||||||
if err := godotenv.Load(); err != nil {
|
if err := godotenv.Load(); err != nil {
|
||||||
log.Fatalf("Error loading .env file: %v", err)
|
log.Fatalf("Error loading .env file: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Retrieve environment variables
|
//Get environment variables
|
||||||
mqttBroker = os.Getenv("MQTT_BROKER")
|
mqttBroker = os.Getenv("MQTT_BROKER")
|
||||||
dbDriver = os.Getenv("DB_DRIVER")
|
|
||||||
dbDataSource = os.Getenv("DB_DATASOURCE")
|
|
||||||
clientID = os.Getenv("CLIENT_ID")
|
clientID = os.Getenv("CLIENT_ID")
|
||||||
|
mqttUser = os.Getenv("MQTT_USERNAME")
|
||||||
|
mqttPass = os.Getenv("MQTT_PASSWORD")
|
||||||
|
dbDriver = os.Getenv("DB_DRIVER") // should be "postgres"
|
||||||
|
dbDataSource = os.Getenv("DB_DATASOURCE") // Postgres connection string
|
||||||
|
|
||||||
// Validate required environment variables
|
if mqttBroker == "" || clientID == "" || dbDriver == "" || dbDataSource == "" {
|
||||||
if mqttBroker == "" || dbDriver == "" || dbDataSource == "" || clientID == "" {
|
log.Fatal("Missing required environment variables")
|
||||||
log.Fatalf("Missing required environment variables")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
topics = map[string]byte{"test": 1} // Topics to subscribe with QoS 1
|
|
||||||
db *sql.DB // Database connection
|
|
||||||
)
|
|
||||||
|
|
||||||
// Handles incoming MQTT messages
|
|
||||||
func messageHandler(client mqtt.Client, msg mqtt.Message) {
|
|
||||||
fmt.Printf("Received message on topic %s: %s\n", msg.Topic(), msg.Payload())
|
|
||||||
|
|
||||||
requiredFields := []string{"temp", "humidity", "pressure", "timestamp"}
|
|
||||||
errorTopic := "error_topic"
|
|
||||||
|
|
||||||
// Validate JSON payload
|
|
||||||
var jsonData map[string]interface{}
|
|
||||||
if err := json.Unmarshal(msg.Payload(), &jsonData); err != nil {
|
|
||||||
publishError(client, errorTopic, fmt.Sprintf("Invalid JSON payload on topic %s: %s", msg.Topic(), msg.Payload()))
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for missing fields
|
//Open DB connection
|
||||||
for _, field := range requiredFields {
|
|
||||||
if _, exists := jsonData[field]; !exists {
|
|
||||||
publishError(client, errorTopic, fmt.Sprintf("Missing field '%s' in payload on topic %s: %s", field, msg.Topic(), msg.Payload()))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create table dynamically if it doesn't exist
|
|
||||||
tableName := msg.Topic()
|
|
||||||
createTableQuery := fmt.Sprintf(`
|
|
||||||
CREATE TABLE IF NOT EXISTS %s (
|
|
||||||
id SERIAL PRIMARY KEY,
|
|
||||||
temperature FLOAT NOT NULL,
|
|
||||||
humidity INT NOT NULL,
|
|
||||||
pressure INT NOT NULL,
|
|
||||||
timestamp TIMESTAMP NOT NULL
|
|
||||||
)`, tableName)
|
|
||||||
if _, err := db.Exec(createTableQuery); err != nil {
|
|
||||||
publishError(client, errorTopic, fmt.Sprintf("Failed to create table '%s': %v", tableName, err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Insert data into the table
|
|
||||||
insertQuery := fmt.Sprintf(`
|
|
||||||
INSERT INTO %s (temperature, humidity, pressure, timestamp)
|
|
||||||
VALUES ($1, $2, $3, $4)`, tableName)
|
|
||||||
_, err := db.Exec(insertQuery, jsonData["temperature"], jsonData["humidity"], jsonData["pressure"], jsonData["timestamp"])
|
|
||||||
if err != nil {
|
|
||||||
publishError(client, errorTopic, fmt.Sprintf("Failed to insert data into table '%s': %v", tableName, err))
|
|
||||||
} else {
|
|
||||||
log.Printf("Data successfully inserted into table '%s'.", tableName)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Publishes error messages to a specific topic
|
|
||||||
func publishError(client mqtt.Client, topic, message string) {
|
|
||||||
token := client.Publish(topic, 1, false, message)
|
|
||||||
token.Wait()
|
|
||||||
if token.Error() != nil {
|
|
||||||
log.Printf("Failed to publish error to topic '%s': %v", topic, token.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Handles successful MQTT connection
|
|
||||||
func connectHandler(client mqtt.Client) {
|
|
||||||
log.Println("Connected to MQTT broker")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Handles MQTT connection loss
|
|
||||||
func connectionLostHandler(client mqtt.Client, err error) {
|
|
||||||
log.Printf("Connection lost: %v. Attempting to reconnect...", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
var err error
|
var err error
|
||||||
godotenv.Load()
|
|
||||||
// Connect to the database
|
|
||||||
db, err = sql.Open(dbDriver, dbDataSource)
|
db, err = sql.Open(dbDriver, dbDataSource)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Database connection failed: %v", err)
|
log.Fatalf("Failed to connect to DB: %v", err)
|
||||||
}
|
}
|
||||||
defer db.Close()
|
|
||||||
|
|
||||||
// Validate database connection
|
|
||||||
if err := db.Ping(); err != nil {
|
if err := db.Ping(); err != nil {
|
||||||
log.Fatalf("Database ping failed: %v", err)
|
log.Fatalf("DB ping failed: %v", err)
|
||||||
}
|
}
|
||||||
|
log.Println("Connected to the database")
|
||||||
|
|
||||||
// Configure MQTT client
|
//Setup and connect to MQTT
|
||||||
opts := mqtt.NewClientOptions().
|
opts := mqtt.NewClientOptions().
|
||||||
AddBroker(mqttBroker).
|
AddBroker(mqttBroker).
|
||||||
SetClientID(clientID).
|
SetClientID(clientID).
|
||||||
SetUsername("test"). // todo: Change to .env variable
|
SetUsername(mqttUser).
|
||||||
SetPassword("test"). // todo: Change to .env variable
|
SetPassword(mqttPass).
|
||||||
SetCleanSession(false).
|
SetCleanSession(true).
|
||||||
SetAutoReconnect(true).
|
SetAutoReconnect(true)
|
||||||
SetOrderMatters(false).
|
|
||||||
SetOnConnectHandler(connectHandler).
|
|
||||||
SetConnectionLostHandler(connectionLostHandler)
|
|
||||||
|
|
||||||
client := mqtt.NewClient(opts)
|
mqttClient = mqtt.NewClient(opts)
|
||||||
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
|
||||||
log.Fatalf("Failed to connect to MQTT broker: %v", token.Error())
|
log.Fatalf("MQTT connection failed: %v", token.Error())
|
||||||
}
|
}
|
||||||
|
log.Println("Connected to MQTT Broker")
|
||||||
|
|
||||||
// Subscribe to topics
|
//Start HTTP server
|
||||||
for topic, qos := range topics {
|
http.HandleFunc("/api/send-data", handlePostData)
|
||||||
if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil {
|
log.Println("HTTP server listening on :8080")
|
||||||
log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error())
|
log.Fatal(http.ListenAndServe(":8080", nil))
|
||||||
}
|
|
||||||
log.Printf("Subscribed to topic: %s", topic)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Keep the program running
|
|
||||||
sigChan := make(chan os.Signal, 1)
|
|
||||||
<-sigChan
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// func handlePostData(w http.ResponseWriter, r *http.Request) {
|
||||||
|
// w.Header().Set("Content-Type", "application/json")
|
||||||
|
|
||||||
|
// if r.Method != http.MethodPost {
|
||||||
|
// http.Error(w, `{"error":"Only POST allowed"}`, http.StatusMethodNotAllowed)
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // Decode payload
|
||||||
|
// var payload Payload
|
||||||
|
// var err error
|
||||||
|
// if err = json.NewDecoder(r.Body).Decode(&payload); err != nil {
|
||||||
|
// http.Error(w, `{"error":"Invalid JSON"}`, http.StatusBadRequest)
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
|
||||||
|
// if payload.CreatedAt == "" {
|
||||||
|
// http.Error(w, `{"error":"Missing created_at"}`, http.StatusBadRequest)
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
|
||||||
|
// var plantID int
|
||||||
|
// err = db.QueryRow(`SELECT id FROM plants WHERE code = $1`, payload.PlantName).Scan(&plantID)
|
||||||
|
// if err == sql.ErrNoRows {
|
||||||
|
// w.WriteHeader(http.StatusBadRequest)
|
||||||
|
// fmt.Fprintf(w, `{"error":"Plant code '%s' not found"}`, payload.PlantName)
|
||||||
|
// return
|
||||||
|
// } else if err != nil {
|
||||||
|
// log.Printf("Error getting plant ID: %v", err)
|
||||||
|
// http.Error(w, `{"error":"Internal server error"}`, http.StatusInternalServerError)
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
|
||||||
|
// var mfmMeterID int
|
||||||
|
// query := `
|
||||||
|
// SELECT id FROM mfm_meters
|
||||||
|
// WHERE TRIM(sequence) = TRIM($1) AND plant_id = $2
|
||||||
|
// LIMIT 1
|
||||||
|
// `
|
||||||
|
// err = db.QueryRow(query, payload.MFMMeterID, payload.PlantName).Scan(&mfmMeterID)
|
||||||
|
// if err == sql.ErrNoRows {
|
||||||
|
// w.WriteHeader(http.StatusBadRequest)
|
||||||
|
// w.Write([]byte(`{"error": "Sequence '` + payload.MFMMeterID + `' not found for plant_id ` + strconv.Itoa(payload.PlantName) + `"}`))
|
||||||
|
// return
|
||||||
|
// } else if err != nil {
|
||||||
|
// log.Printf("DB check error: %v", err)
|
||||||
|
// http.Error(w, `{"error":"Internal server error"}`, http.StatusInternalServerError)
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
|
||||||
|
// insertQuery := `
|
||||||
|
// INSERT INTO temp_live_readings (
|
||||||
|
// plant_id, mfm_meter_id, register_data, created_at, updated_at, created_by
|
||||||
|
// ) VALUES ($1, $2, $3, $4, $5, $6)
|
||||||
|
// `
|
||||||
|
// _, err = db.Exec(insertQuery,
|
||||||
|
// payload.PlantName,
|
||||||
|
// mfmMeterID,
|
||||||
|
// payload.RegisterData,
|
||||||
|
// payload.CreatedAt,
|
||||||
|
// payload.UpdatedAt,
|
||||||
|
// payload.CreatedBy,
|
||||||
|
// )
|
||||||
|
// if err != nil {
|
||||||
|
// log.Printf("DB insert error: %v", err)
|
||||||
|
// http.Error(w, `{"error":"DB insert failed"}`, http.StatusInternalServerError)
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
|
||||||
|
// msg, _ := json.Marshal(payload)
|
||||||
|
// token := mqttClient.Publish(mqttTopic, 1, false, msg)
|
||||||
|
// token.Wait()
|
||||||
|
|
||||||
|
// w.WriteHeader(http.StatusOK)
|
||||||
|
// w.Write([]byte(`{"message":"Data inserted into DB and published to MQTT"}`))
|
||||||
|
// }
|
||||||
|
|
||||||
|
func handlePostData(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
|
||||||
|
if r.Method != http.MethodPost {
|
||||||
|
http.Error(w, `{"error":"Only POST allowed"}`, http.StatusMethodNotAllowed)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var payload Payload
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if err = json.NewDecoder(r.Body).Decode(&payload); err != nil {
|
||||||
|
http.Error(w, `{"error":"Invalid JSON"}`, http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if payload.CreatedAt == "" {
|
||||||
|
http.Error(w, `{"error":"Missing created_at"}`, http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
//Resolve plant ID from plant_code
|
||||||
|
var plantID int
|
||||||
|
err = db.QueryRow(`SELECT id FROM plants WHERE code = $1`, payload.PlantCode).Scan(&plantID)
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
fmt.Fprintf(w, `{"error":"Plant code '%s' not found"}`, payload.PlantCode)
|
||||||
|
return
|
||||||
|
} else if err != nil {
|
||||||
|
log.Printf("Error getting plant ID: %v", err)
|
||||||
|
http.Error(w, `{"error":"Internal server error"}`, http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if MFM meter exists for the plant
|
||||||
|
var mfmMeterID int
|
||||||
|
query := `SELECT id FROM mfm_meters WHERE TRIM(sequence) = TRIM($1) AND plant_id = $2 LIMIT 1`
|
||||||
|
err = db.QueryRow(query, payload.MFMMeterID, plantID).Scan(&mfmMeterID)
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
fmt.Fprintf(w, `{"error":"mfm_meter_id '%s' not found for plant_id %d"}`, payload.MFMMeterID, plantID)
|
||||||
|
return
|
||||||
|
} else if err != nil {
|
||||||
|
log.Printf("Error checking MFM meter: %v", err)
|
||||||
|
http.Error(w, `{"error":"Internal server error"}`, http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
existsQuery := `SELECT 1 FROM temp_live_readings WHERE created_at = $1 AND mfm_meter_id = $2 LIMIT 1`
|
||||||
|
|
||||||
|
var exists int
|
||||||
|
err = db.QueryRow(existsQuery, payload.CreatedAt, mfmMeterID).Scan(&exists)
|
||||||
|
if err == nil {
|
||||||
|
w.WriteHeader(http.StatusConflict) // 409 Conflict is semantically accurate
|
||||||
|
fmt.Fprintf(w, `{"error":"Duplicate entry for created_at '%s' and mfm_meter_id %d"}`, payload.CreatedAt, mfmMeterID)
|
||||||
|
return
|
||||||
|
} else if err != sql.ErrNoRows {
|
||||||
|
log.Printf("Error checking for duplicate: %v", err)
|
||||||
|
http.Error(w, `{"error":"Internal server error"}`, http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
//Insert into temp_live_readings
|
||||||
|
insertQuery := `INSERT INTO temp_live_readings (plant_id, mfm_meter_id, register_data, created_at, updated_at, created_by) VALUES ($1, $2, $3, $4, $5, $6)`
|
||||||
|
|
||||||
|
_, err = db.Exec(insertQuery,
|
||||||
|
plantID,
|
||||||
|
mfmMeterID,
|
||||||
|
payload.RegisterData,
|
||||||
|
payload.CreatedAt,
|
||||||
|
payload.UpdatedAt,
|
||||||
|
payload.CreatedBy,
|
||||||
|
)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("DB insert error: %v", err)
|
||||||
|
http.Error(w, `{"error":"Insert into DB failed"}`, http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
//Publish to MQTT
|
||||||
|
msg, _ := json.Marshal(payload)
|
||||||
|
token := mqttClient.Publish(mqttTopic, 1, false, msg)
|
||||||
|
token.Wait()
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
w.Write([]byte(`{"message":"Data inserted into DB and published to MQTT"}`))
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user