use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; use crate::config::{AppConfig, ModbusValueMaps}; use rumqttc::{MqttOptions, Client, QoS}; pub fn start_mqtt_thread(config: Arc>, values: Arc>) { let mqtt_config = { let cfg = config.lock().unwrap(); cfg.mqtt.clone() }; let broker = if mqtt_config.broker.is_empty() { "localhost".to_string() } else { mqtt_config.broker.clone() }; let port = if mqtt_config.port == 0 { 1883 } else { mqtt_config.port }; let path = if let Some(ref p) = mqtt_config.path { p.clone() } else { "paramod/values".to_string() }; let user = mqtt_config.user.clone().unwrap_or_default(); let password = mqtt_config.password.clone().unwrap_or_default(); let user_is_empty = user.is_empty(); thread::spawn(move || { let mut mqttoptions = MqttOptions::new("paramod-client", broker, port); if !user_is_empty { mqttoptions.set_credentials(user, password); } let (mut client, mut connection) = Client::new(mqttoptions, 10); loop { { let values = values.lock().unwrap(); // Input Register for (name, val) in &values.modbus_input_register_values { if let Some(v) = val { if should_publish(&config, name, "input_register") { let topic = format!("{}/{}", path, name); let payload = format!("{}", v); let _ = client.publish(topic, QoS::AtLeastOnce, false, payload); } } } // Holding Register for (name, val) in &values.modbus_holding_register_values { if let Some(v) = val { if should_publish(&config, name, "holding_register") { let topic = format!("{}/{}", path, name); let payload = format!("{}", v); let _ = client.publish(topic, QoS::AtLeastOnce, false, payload); } } } // Coils for (name, val) in &values.modbus_coils_values { if let Some(v) = val { if should_publish(&config, name, "coils") { let topic = format!("{}/{}", path, name); let payload = format!("{}", v); let _ = client.publish(topic, QoS::AtLeastOnce, false, payload); } } } } // Handle MQTT connection events for _event in connection.iter().take(1) { // Optionally log or handle events } thread::sleep(Duration::from_secs(5)); } }); } fn should_publish(config: &Arc>, name: &str, reg_type: &str) -> bool { let cfg = config.lock().unwrap(); match reg_type { "input_register" => { if let Some(regs) = &cfg.modbus_input_register { for map in regs { if let Some(reg) = map.get(name) { return reg.mqtt.unwrap_or(false); } } } } "holding_register" => { if let Some(regs) = &cfg.modbus_holding_register { for map in regs { if let Some(reg) = map.get(name) { return reg.mqtt.unwrap_or(false); } } } } "coils" => { if let Some(coils) = &cfg.modbus_coils { for map in coils { if let Some(coil) = map.get(name) { return coil.mqtt.unwrap_or(false); } } } } _ => {} } false }