From b1609bb4d4e817f2a0b982ec8811748d8e635e3d Mon Sep 17 00:00:00 2001 From: ranjith Date: Fri, 25 Jul 2025 12:55:48 +0000 Subject: [PATCH] Added fpower house functionality --- mqtt_client.go | 704 ++++++++++++++++++++----------------------------- 1 file changed, 291 insertions(+), 413 deletions(-) diff --git a/mqtt_client.go b/mqtt_client.go index 03cac2b..f4cfc05 100644 --- a/mqtt_client.go +++ b/mqtt_client.go @@ -1,435 +1,313 @@ -// package main - -// // import ( -// // "database/sql" -// // "encoding/json" -// // "fmt" -// // "log" -// // "os" - -// // mqtt "github.com/eclipse/paho.mqtt.golang" -// // "github.com/joho/godotenv" -// // _ "github.com/lib/pq" // PostgreSQL driver -// // ) -// import ( -// "database/sql" -// "encoding/json" -// "fmt" -// "log" -// "os" - -// mqtt "github.com/eclipse/paho.mqtt.golang" -// "github.com/joho/godotenv" -// _ "github.com/lib/pq" -// ) - -// var ( -// mqttBroker string -// dbDriver string -// dbDataSource string -// clientID string -// ) - - -// func init() { -// // Load environment variables from .env file -// if err := godotenv.Load(); err != nil { -// log.Fatalf("Error loading .env file: %v", err) -// } - -// // Retrieve environment variables -// mqttBroker = os.Getenv("MQTT_BROKER") -// dbDriver = os.Getenv("DB_DRIVER") -// dbDataSource = os.Getenv("DB_DATASOURCE") -// clientID = os.Getenv("CLIENT_ID") - -// // Validate required environment variables -// if mqttBroker == "" || dbDriver == "" || dbDataSource == "" || clientID == "" { -// log.Fatalf("Missing required environment variables") -// } -// } - -// var ( -// topics = map[string]byte{"test": 1} // Topics to subscribe with QoS 1 -// db *sql.DB // Database connection -// ) - -// // Handles incoming MQTT messages -// func messageHandler(client mqtt.Client, msg mqtt.Message) { -// fmt.Printf("Received message on topic %s: %s\n", msg.Topic(), msg.Payload()) - -// requiredFields := []string{"temp", "humidity", "pressure", "timestamp"} -// errorTopic := "error_topic" - -// // Validate JSON payload -// var jsonData map[string]interface{} -// if err := json.Unmarshal(msg.Payload(), &jsonData); err != nil { -// publishError(client, errorTopic, fmt.Sprintf("Invalid JSON payload on topic %s: %s", msg.Topic(), msg.Payload())) -// return -// } - -// // Check for missing fields -// for _, field := range requiredFields { -// if _, exists := jsonData[field]; !exists { -// publishError(client, errorTopic, fmt.Sprintf("Missing field '%s' in payload on topic %s: %s", field, msg.Topic(), msg.Payload())) -// return -// } -// } - -// // Create table dynamically if it doesn't exist -// tableName := msg.Topic() -// createTableQuery := fmt.Sprintf(` -// CREATE TABLE IF NOT EXISTS %s ( -// id SERIAL PRIMARY KEY, -// temperature FLOAT NOT NULL, -// humidity INT NOT NULL, -// pressure INT NOT NULL, -// timestamp TIMESTAMP NOT NULL -// )`, tableName) -// if _, err := db.Exec(createTableQuery); err != nil { -// publishError(client, errorTopic, fmt.Sprintf("Failed to create table '%s': %v", tableName, err)) -// return -// } - -// // Insert data into the table -// insertQuery := fmt.Sprintf(` -// INSERT INTO %s (temperature, humidity, pressure, timestamp) -// VALUES ($1, $2, $3, $4)`, tableName) -// _, err := db.Exec(insertQuery, jsonData["temperature"], jsonData["humidity"], jsonData["pressure"], jsonData["timestamp"]) -// if err != nil { -// publishError(client, errorTopic, fmt.Sprintf("Failed to insert data into table '%s': %v", tableName, err)) -// } else { -// log.Printf("Data successfully inserted into table '%s'.", tableName) -// } -// } - -// // Publishes error messages to a specific topic -// func publishError(client mqtt.Client, topic, message string) { -// token := client.Publish(topic, 1, false, message) -// token.Wait() -// if token.Error() != nil { -// log.Printf("Failed to publish error to topic '%s': %v", topic, token.Error()) -// } -// } - -// // Handles successful MQTT connection -// func connectHandler(client mqtt.Client) { -// log.Println("Connected to MQTT broker") -// } - -// // Handles MQTT connection loss -// func connectionLostHandler(client mqtt.Client, err error) { -// log.Printf("Connection lost: %v. Attempting to reconnect...", err) -// } - -// func main() { -// var err error -// godotenv.Load() -// // Connect to the database -// db, err = sql.Open(dbDriver, dbDataSource) -// if err != nil { -// log.Fatalf("Database connection failed: %v", err) -// } -// defer db.Close() - -// // Validate database connection -// if err := db.Ping(); err != nil { -// log.Fatalf("Database ping failed: %v", err) -// } - -// // Configure MQTT client -// opts := mqtt.NewClientOptions(). -// AddBroker(mqttBroker). -// SetClientID(clientID). -// SetUsername("test"). // todo: Change to .env variable -// SetPassword("test"). // todo: Change to .env variable -// SetCleanSession(false). -// SetAutoReconnect(true). -// SetOrderMatters(false). -// SetOnConnectHandler(connectHandler). -// SetConnectionLostHandler(connectionLostHandler) - -// client := mqtt.NewClient(opts) -// if token := client.Connect(); token.Wait() && token.Error() != nil { -// log.Fatalf("Failed to connect to MQTT broker: %v", token.Error()) -// } - -// // Subscribe to topics -// for topic, qos := range topics { -// if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil { -// log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error()) -// } -// log.Printf("Subscribed to topic: %s", topic) -// } - -// // Keep the program running -// sigChan := make(chan os.Signal, 1) -// <-sigChan -// } - -// - package main import ( - "encoding/json" - "database/sql" - "log" - "net/http" - "os" - "fmt" + "database/sql" + "encoding/json" + "fmt" + "log" + "os" + "strings" - mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/joho/godotenv" - _ "github.com/lib/pq" //Required for PostgreSQL driver + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/joho/godotenv" + _ "github.com/lib/pq" ) -//JSON payload structure -// type Payload struct { -// Temperature float64 `json:"temperature"` -// Humidity int `json:"humidity"` -// Pressure int `json:"pressure"` -// Timestamp string `json:"timestamp"` -// } -type Payload struct { - PlantCode string `json:"plant_id"` - MFMMeterID string `json:"mfm_meter_id"` - RegisterData string `json:"register_data"` - CreatedAt string `json:"created_at"` - UpdatedAt string `json:"updated_at"` - CreatedBy string `json:"created_by"` +var ( + mqttBroker string + dbDriver string + dbDataSource string + clientID string +) + + +func init() { + // Load environment variables from .env file + if err := godotenv.Load(); err != nil { + log.Fatalf("Error loading .env file: %v", err) + } + + // Retrieve environment variables + mqttBroker = os.Getenv("MQTT_BROKER") + dbDriver = os.Getenv("DB_DRIVER") + dbDataSource = os.Getenv("DB_DATASOURCE") + clientID = os.Getenv("CLIENT_ID") + + // Validate required environment variables + if mqttBroker == "" || dbDriver == "" || dbDataSource == "" || clientID == "" { + log.Fatalf("Missing required environment variables") + } } -//Global variables var ( - mqttClient mqtt.Client - mqttTopic = "test" - db *sql.DB - mqttBroker string - clientID string - mqttUser string - mqttPass string - dbDriver string - dbDataSource string + topics = map[string]byte{"powerhouse": 1} // Topics to subscribe with QoS 1 + db *sql.DB // Database connection ) +// Handles incoming MQTT messages +func messageHandler(client mqtt.Client, msg mqtt.Message) { + fmt.Printf("📥 Received message on topic %s: %s\n", msg.Topic(), msg.Payload()) + + errorTopic := "error_topic" + + // Define struct for payload + type Payload struct { + PlantCode string `json:"plant_code"` + MFMMeterID string `json:"mfm_meter_id"` + RegisterData string `json:"register_data"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` + CreatedBy string `json:"created_by"` + } + + var payload Payload + if err := json.Unmarshal(msg.Payload(), &payload); err != nil { + publishError(client, errorTopic, fmt.Sprintf("❌ Invalid JSON: %v", err)) + return + } + + // Validate required fields + if payload.CreatedAt == "" { + publishError(client, errorTopic, "❌ Missing created_at") + return + } + + // Resolve plant ID + var plantID int + err := db.QueryRow(`SELECT id FROM plants WHERE code = $1`, payload.PlantCode).Scan(&plantID) + if err == sql.ErrNoRows { + publishError(client, errorTopic, fmt.Sprintf("❌ Plant code '%s' not found", payload.PlantCode)) + return + } else if err != nil { + publishError(client, errorTopic, fmt.Sprintf("❌ Error fetching plant ID: %v", err)) + return + } + + // Validate MFM meter + var mfmMeterID int + query := `SELECT id FROM mfm_meters WHERE TRIM(sequence) = TRIM($1) AND plant_id = $2 LIMIT 1` + err = db.QueryRow(query, payload.MFMMeterID, plantID).Scan(&mfmMeterID) + if err == sql.ErrNoRows { + publishError(client, errorTopic, fmt.Sprintf("❌ mfm_meter_id '%s' not found for plant_id %d", payload.MFMMeterID, plantID)) + return + } else if err != nil { + publishError(client, errorTopic, fmt.Sprintf("❌ Error checking MFM meter: %v", err)) + return + } + + // Check for duplicates + var exists int + existsQuery := `SELECT 1 FROM temp_live_readings WHERE created_at = $1 AND mfm_meter_id = $2 LIMIT 1` + err = db.QueryRow(existsQuery, payload.CreatedAt, mfmMeterID).Scan(&exists) + if err == nil { + publishError(client, errorTopic, fmt.Sprintf("⚠️ Duplicate entry for created_at '%s' and mfm_meter_id %d", payload.CreatedAt, mfmMeterID)) + return + } else if err != sql.ErrNoRows { + publishError(client, errorTopic, fmt.Sprintf("❌ Error checking for duplicate: %v", err)) + return + } + + // + insertQuery := `INSERT INTO temp_live_readings (plant_id, mfm_meter_id, register_data, created_at, updated_at, created_by) VALUES ($1, $2, $3, $4, $5, $6)` + + _, err = db.Exec(insertQuery, + plantID, + mfmMeterID, + payload.RegisterData, + payload.CreatedAt, + payload.UpdatedAt, + payload.CreatedBy, + ) + + if err != nil { + publishError(client, errorTopic, fmt.Sprintf("❌ DB insert failed: %v", err)) + return + } + + log.Printf("✅ Data inserted successfully for plant ID %d and mfm_meter_id %d", plantID, mfmMeterID) + + // values := strings.Split(payload.RegisterData, ",") + + // if len(values) < 1 { + // publishError(client, errorTopic, + // fmt.Sprintf("❌ Register data has insufficient fields: need at least 1, got %d", len(values))) + // return + // } + // fields := make([]interface{}, 22) + // for i := 0; i < 22; i++ { + // if i < len(values) { + // v := strings.TrimSpace(values[i]) + // if v == "" { + // fields[i] = nil // Empty string becomes NULL + // } else { + // fields[i] = v + // } + // } else { + // fields[i] = nil // No value sent → NULL + // } + // } + + // // Prepare and execute insert into mfm_readings + // insertReadingQuery := `INSERT INTO mfm_readings (plant_id, mfm_meter_id,apparent_energy_received, reactive_energy_received, active_energy_received, active_power_r, active_power_y, active_power_b, active_power_total,voltage_ry, voltage_yb, voltage_br,current_r, current_y, current_b, current_n,voltage_r_n, voltage_y_n, voltage_b_n,frequency,power_factor_r, power_factor_y, power_factor_b, power_factor_total,created_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25)` + + // args := []interface{}{plantID, mfmMeterID} + // args = append(args, fields...) + // args = append(args, payload.CreatedAt) + + // // Execute the insert safely + // _, err = db.Exec(insertReadingQuery, args...) + // if err != nil { + // publishError(client, errorTopic, fmt.Sprintf("❌ Failed to insert into mfm_readings: %v", err)) + // return + // } + + // log.Printf("✅ mfm_readings inserted for mfm_meter_id %d", mfmMeterID) + values := strings.Split(payload.RegisterData, ",") + + // Step 1: Get expected parameter columns from mfm_parameters + paramQuery := `SELECT name FROM mfm_parameters WHERE mfm_meter_id = $1 AND plant_id = $2 ORDER BY id ASC` + + rows, err := db.Query(paramQuery, mfmMeterID, plantID) + if err != nil { + publishError(client, errorTopic, fmt.Sprintf("❌ Failed to fetch parameters for meter %d: %v", mfmMeterID, err)) + return + } + defer rows.Close() + + var paramColumns []string + for rows.Next() { + var name string + if err := rows.Scan(&name); err != nil { + publishError(client, errorTopic, fmt.Sprintf("❌ Failed to scan parameter name: %v", err)) + return + } + paramColumns = append(paramColumns, name) + } + + if len(paramColumns) == 0 { + publishError(client, errorTopic, fmt.Sprintf("❌ No parameters found for meter_id %d", mfmMeterID)) + return + } + + // Validate values count + if len(values) < len(paramColumns) { + publishError(client, errorTopic, fmt.Sprintf("❌ Insufficient register values: expected %d, got %d", len(paramColumns), len(values))) + return + } + + if len(values) > len(paramColumns){ + publishError(client, errorTopic, fmt.Sprintf("❌ Too many register values: expected %d, got %d", len(paramColumns), len(values))) + return + } + + // Build insert statement + columns := []string{"plant_id", "mfm_meter_id"} + placeholders := []string{"$1", "$2"} + args := []interface{}{plantID, mfmMeterID} + argIndex := 3 + + for i, col := range paramColumns { + columns = append(columns, col) + placeholders = append(placeholders, fmt.Sprintf("$%d", argIndex)) + argIndex++ + v := strings.TrimSpace(values[i]) + if v == "" { + args = append(args, nil) + } else { + args = append(args, v) + } + } + + // Add created_at + columns = append(columns, "created_at") + placeholders = append(placeholders, fmt.Sprintf("$%d", argIndex)) + args = append(args, payload.CreatedAt) + + insertQuery = fmt.Sprintf( + `INSERT INTO mfm_readings (%s) VALUES (%s)`, + strings.Join(columns, ", "), + strings.Join(placeholders, ", "), + ) + + _, err = db.Exec(insertQuery, args...) + if err != nil { + publishError(client, errorTopic, fmt.Sprintf("❌ Failed to insert into mfm_readings: %v", err)) + return + } + + log.Printf("✅ Inserted into mfm_readings for meter %d with %d parameters", mfmMeterID, len(paramColumns)) + +} + + +// Publishes error messages to a specific topic +func publishError(client mqtt.Client, topic, message string) { + token := client.Publish(topic, 1, false, message) + token.Wait() + if token.Error() != nil { + log.Printf("Failed to publish error to topic '%s': %v", topic, token.Error()) + } +} + +// Handles successful MQTT connection +func connectHandler(client mqtt.Client) { + log.Println("Connected to MQTT broker") +} + +// Handles MQTT connection loss +func connectionLostHandler(client mqtt.Client, err error) { + log.Printf("Connection lost: %v. Attempting to reconnect...", err) +} + func main() { - //Load environment variables from .env - if err := godotenv.Load(); err != nil { - log.Fatalf("Error loading .env file: %v", err) - } + var err error + godotenv.Load() + // Connect to the database + db, err = sql.Open(dbDriver, dbDataSource) + if err != nil { + log.Fatalf("Database connection failed: %v", err) + } + defer db.Close() - //Get environment variables - mqttBroker = os.Getenv("MQTT_BROKER") - clientID = os.Getenv("CLIENT_ID") - mqttUser = os.Getenv("MQTT_USERNAME") - mqttPass = os.Getenv("MQTT_PASSWORD") - dbDriver = os.Getenv("DB_DRIVER") // should be "postgres" - dbDataSource = os.Getenv("DB_DATASOURCE") // Postgres connection string + // Validate database connection + if err := db.Ping(); err != nil { + log.Fatalf("Database ping failed: %v", err) + } - if mqttBroker == "" || clientID == "" || dbDriver == "" || dbDataSource == "" { - log.Fatal("Missing required environment variables") - } + // Configure MQTT client + opts := mqtt.NewClientOptions(). + AddBroker(mqttBroker). + SetClientID(clientID). + // SetUsername("test"). // todo: Change to .env variable + // SetPassword("test"). // todo: Change to .env variable + SetCleanSession(false). + SetAutoReconnect(true). + SetOrderMatters(false). + SetOnConnectHandler(connectHandler). + SetConnectionLostHandler(connectionLostHandler) - //Open DB connection - var err error - db, err = sql.Open(dbDriver, dbDataSource) - if err != nil { - log.Fatalf("Failed to connect to DB: %v", err) - } + client := mqtt.NewClient(opts) + if token := client.Connect(); token.Wait() && token.Error() != nil { + log.Fatalf("Failed to connect to MQTT broker: %v", token.Error()) + } - if err := db.Ping(); err != nil { - log.Fatalf("DB ping failed: %v", err) - } - log.Println("Connected to the database") + // Subscribe to topics + for topic, qos := range topics { + if token := client.Subscribe(topic, qos, messageHandler); token.Wait() && token.Error() != nil { + log.Fatalf("Failed to subscribe to topic '%s': %v", topic, token.Error()) + } + log.Printf("Subscribed to topic: %s", topic) + } - //Setup and connect to MQTT - opts := mqtt.NewClientOptions(). - AddBroker(mqttBroker). - SetClientID(clientID). - SetUsername(mqttUser). - SetPassword(mqttPass). - SetCleanSession(true). - SetAutoReconnect(true) - - mqttClient = mqtt.NewClient(opts) - if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { - log.Fatalf("MQTT connection failed: %v", token.Error()) - } - log.Println("Connected to MQTT Broker") - - //Start HTTP server - http.HandleFunc("/api/send-data", handlePostData) - log.Println("HTTP server listening on :8080") - log.Fatal(http.ListenAndServe(":8080", nil)) -} - -// func handlePostData(w http.ResponseWriter, r *http.Request) { -// w.Header().Set("Content-Type", "application/json") - -// if r.Method != http.MethodPost { -// http.Error(w, `{"error":"Only POST allowed"}`, http.StatusMethodNotAllowed) -// return -// } - -// // Decode payload -// var payload Payload -// var err error -// if err = json.NewDecoder(r.Body).Decode(&payload); err != nil { -// http.Error(w, `{"error":"Invalid JSON"}`, http.StatusBadRequest) -// return -// } - -// if payload.CreatedAt == "" { -// http.Error(w, `{"error":"Missing created_at"}`, http.StatusBadRequest) -// return -// } - -// var plantID int -// err = db.QueryRow(`SELECT id FROM plants WHERE code = $1`, payload.PlantName).Scan(&plantID) -// if err == sql.ErrNoRows { -// w.WriteHeader(http.StatusBadRequest) -// fmt.Fprintf(w, `{"error":"Plant code '%s' not found"}`, payload.PlantName) -// return -// } else if err != nil { -// log.Printf("Error getting plant ID: %v", err) -// http.Error(w, `{"error":"Internal server error"}`, http.StatusInternalServerError) -// return -// } - -// var mfmMeterID int -// query := ` -// SELECT id FROM mfm_meters -// WHERE TRIM(sequence) = TRIM($1) AND plant_id = $2 -// LIMIT 1 -// ` -// err = db.QueryRow(query, payload.MFMMeterID, payload.PlantName).Scan(&mfmMeterID) -// if err == sql.ErrNoRows { -// w.WriteHeader(http.StatusBadRequest) -// w.Write([]byte(`{"error": "Sequence '` + payload.MFMMeterID + `' not found for plant_id ` + strconv.Itoa(payload.PlantName) + `"}`)) -// return -// } else if err != nil { -// log.Printf("DB check error: %v", err) -// http.Error(w, `{"error":"Internal server error"}`, http.StatusInternalServerError) -// return -// } - -// insertQuery := ` -// INSERT INTO temp_live_readings ( -// plant_id, mfm_meter_id, register_data, created_at, updated_at, created_by -// ) VALUES ($1, $2, $3, $4, $5, $6) -// ` -// _, err = db.Exec(insertQuery, -// payload.PlantName, -// mfmMeterID, -// payload.RegisterData, -// payload.CreatedAt, -// payload.UpdatedAt, -// payload.CreatedBy, -// ) -// if err != nil { -// log.Printf("DB insert error: %v", err) -// http.Error(w, `{"error":"DB insert failed"}`, http.StatusInternalServerError) -// return -// } - -// msg, _ := json.Marshal(payload) -// token := mqttClient.Publish(mqttTopic, 1, false, msg) -// token.Wait() - -// w.WriteHeader(http.StatusOK) -// w.Write([]byte(`{"message":"Data inserted into DB and published to MQTT"}`)) -// } - -func handlePostData(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - - if r.Method != http.MethodPost { - http.Error(w, `{"error":"Only POST allowed"}`, http.StatusMethodNotAllowed) - return - } - - var payload Payload - var err error - - if err = json.NewDecoder(r.Body).Decode(&payload); err != nil { - http.Error(w, `{"error":"Invalid JSON"}`, http.StatusBadRequest) - return - } - - if payload.CreatedAt == "" { - http.Error(w, `{"error":"Missing created_at"}`, http.StatusBadRequest) - return - } - - //Resolve plant ID from plant_code - var plantID int - err = db.QueryRow(`SELECT id FROM plants WHERE code = $1`, payload.PlantCode).Scan(&plantID) - if err == sql.ErrNoRows { - w.WriteHeader(http.StatusBadRequest) - fmt.Fprintf(w, `{"error":"Plant code '%s' not found"}`, payload.PlantCode) - return - } else if err != nil { - log.Printf("Error getting plant ID: %v", err) - http.Error(w, `{"error":"Internal server error"}`, http.StatusInternalServerError) - return - } - - // Check if MFM meter exists for the plant - var mfmMeterID int - query := `SELECT id FROM mfm_meters WHERE TRIM(sequence) = TRIM($1) AND plant_id = $2 LIMIT 1` - err = db.QueryRow(query, payload.MFMMeterID, plantID).Scan(&mfmMeterID) - if err == sql.ErrNoRows { - w.WriteHeader(http.StatusBadRequest) - fmt.Fprintf(w, `{"error":"mfm_meter_id '%s' not found for plant_id %d"}`, payload.MFMMeterID, plantID) - return - } else if err != nil { - log.Printf("Error checking MFM meter: %v", err) - http.Error(w, `{"error":"Internal server error"}`, http.StatusInternalServerError) - return - } - - existsQuery := `SELECT 1 FROM temp_live_readings WHERE created_at = $1 AND mfm_meter_id = $2 LIMIT 1` - - var exists int - err = db.QueryRow(existsQuery, payload.CreatedAt, mfmMeterID).Scan(&exists) - if err == nil { - w.WriteHeader(http.StatusConflict) // 409 Conflict is semantically accurate - fmt.Fprintf(w, `{"error":"Duplicate entry for created_at '%s' and mfm_meter_id %d"}`, payload.CreatedAt, mfmMeterID) - return - } else if err != sql.ErrNoRows { - log.Printf("Error checking for duplicate: %v", err) - http.Error(w, `{"error":"Internal server error"}`, http.StatusInternalServerError) - return - } - - //Insert into temp_live_readings - insertQuery := `INSERT INTO temp_live_readings (plant_id, mfm_meter_id, register_data, created_at, updated_at, created_by) VALUES ($1, $2, $3, $4, $5, $6)` - - _, err = db.Exec(insertQuery, - plantID, - mfmMeterID, - payload.RegisterData, - payload.CreatedAt, - payload.UpdatedAt, - payload.CreatedBy, - ) - - if err != nil { - log.Printf("DB insert error: %v", err) - http.Error(w, `{"error":"Insert into DB failed"}`, http.StatusInternalServerError) - return - } - - //Publish to MQTT - msg, _ := json.Marshal(payload) - token := mqttClient.Publish(mqttTopic, 1, false, msg) - token.Wait() - - w.WriteHeader(http.StatusOK) - w.Write([]byte(`{"message":"Data inserted into DB and published to MQTT"}`)) + // Keep the program running + sigChan := make(chan os.Signal, 1) + <-sigChan } + + + +