use std::sync::{Arc, Mutex}; use std::process; use std::collections::HashMap; use std::thread; use std::time::{Duration, Instant}; use crate::config::{AppConfig, ModbusValueMaps}; use crate::modbus; use rumqttc::{Event, MqttOptions, Packet, Client, QoS, RecvTimeoutError}; pub fn start_mqtt_thread(config: Arc>, values: Arc>) { let mqtt_config = { let cfg = config.lock().unwrap(); cfg.mqtt.clone() }; let broker = if mqtt_config.broker.is_empty() { "localhost".to_string() } else { mqtt_config.broker.clone() }; let port = if mqtt_config.port == 0 { 1883 } else { mqtt_config.port }; let path = if let Some(ref p) = mqtt_config.path { p.clone() } else { "paramod/values".to_string() }; let leitsystem_path = mqtt_config .leitsystem_path .clone() .unwrap_or_else(|| format!("{}/leitsystem", path)); let set_write_interval = Duration::from_millis(mqtt_config.set_write_interval_ms.unwrap_or(2000).max(100)); let user = mqtt_config.user.clone().unwrap_or_default(); let password = mqtt_config.password.clone().unwrap_or_default(); let user_is_empty = user.is_empty(); thread::spawn(move || { let client_id = format!("paramod-client-{}", process::id()); let mut mqttoptions = MqttOptions::new(client_id, broker, port); mqttoptions.set_keep_alive(Duration::from_secs(5)); if !user_is_empty { mqttoptions.set_credentials(user, password); } let (mut client, mut connection) = Client::new(mqttoptions, 10); let set_topic = format!("{}/+/set", path); if let Err(e) = client.subscribe(set_topic, QoS::AtLeastOnce) { eprintln!("MQTT Subscribe fehlgeschlagen: {}", e); } let leitsystem_state_topic = format!("{}/active", leitsystem_path.trim_end_matches('/')); if let Err(e) = client.subscribe(leitsystem_state_topic.clone(), QoS::AtLeastOnce) { eprintln!("MQTT Subscribe Leitsystem fehlgeschlagen: {}", e); } let publish_interval = Duration::from_secs(5); let mut last_publish = Instant::now() - publish_interval; let mut last_set_write = Instant::now() - set_write_interval; let mut pending_set_values: HashMap = HashMap::new(); let mut leitsystem_enabled = false; loop { if last_publish.elapsed() >= publish_interval { { let values = values.lock().unwrap(); // Input Register for (name, val) in &values.modbus_input_register_values { if let Some(v) = val { if should_publish(&config, name, "input_register") { let topic = format!("{}/{}/state", path, name); let payload = format!("{}", v); let _ = client.publish(topic, QoS::AtLeastOnce, false, payload); } } } // Holding Register for (name, val) in &values.modbus_holding_register_values { if let Some(v) = val { if should_publish(&config, name, "holding_register") { let topic = format!("{}/{}/state", path, name); let payload = format!("{}", v); let _ = client.publish(topic, QoS::AtLeastOnce, false, payload); } } } // Coils for (name, val) in &values.modbus_coils_values { if let Some(v) = val { if should_publish(&config, name, "coils") { let topic = format!("{}/{}/state", path, name); let payload = format!("{}", v); let _ = client.publish(topic, QoS::AtLeastOnce, false, payload); } } } } let state_payload = if leitsystem_enabled { "ON" } else { "OFF" }; let _ = client.publish( leitsystem_state_topic.clone(), QoS::AtLeastOnce, true, state_payload, ); last_publish = Instant::now(); } if last_set_write.elapsed() >= set_write_interval { let cfg = { let lock = config.lock().unwrap(); lock.clone() }; if leitsystem_enabled { for (name, payload) in &pending_set_values { if let Err(e) = modbus::write_value_by_name(&cfg, name, payload) { eprintln!("MQTT set -> Modbus Fehler ({}): {}", name, e); } else { mirror_set_value(&values, &cfg, name, payload); } } } last_set_write = Instant::now(); } match connection.recv_timeout(Duration::from_millis(200)) { Ok(Ok(Event::Incoming(Packet::Publish(publish)))) => { if publish.topic == leitsystem_state_topic { let payload = String::from_utf8_lossy(&publish.payload).trim().to_string(); match parse_boolish(&payload) { Some(v) => leitsystem_enabled = v, None => eprintln!("Ungültiger Leitsystem state '{}', erwartet ON/OFF", payload), } continue; } if let Ok(name) = extract_name_from_set_topic(&path, &publish.topic) { let payload = String::from_utf8_lossy(&publish.payload).trim().to_string(); pending_set_values.insert(name.clone(), payload.clone()); } } Ok(Ok(_)) => {} Ok(Err(e)) => { eprintln!("MQTT Connection Fehler: {}", e); eprintln!("Hinweis: Häufige Ursache ist eine zweite MQTT-Session mit derselben Client-ID."); } Err(RecvTimeoutError::Timeout) => {} Err(RecvTimeoutError::Disconnected) => { eprintln!("MQTT Verbindung getrennt (request channel geschlossen)"); thread::sleep(Duration::from_millis(500)); } } } }); } fn parse_boolish(payload: &str) -> Option { match payload.trim().to_ascii_lowercase().as_str() { "1" | "true" | "on" => Some(true), "0" | "false" | "off" => Some(false), _ => None, } } fn mirror_set_value(values: &Arc>, cfg: &AppConfig, name: &str, payload: &str) { if let Ok(mut maps) = values.lock() { if let Some(ref coils) = cfg.modbus_coils { for map in coils { if map.contains_key(name) { if let Some(v) = parse_boolish(payload) { maps.modbus_coils_values.insert(name.to_string(), Some(if v { 1.0 } else { 0.0 })); } return; } } } if let Some(ref holding) = cfg.modbus_holding_register { for map in holding { if map.contains_key(name) { if let Ok(v) = payload.parse::() { maps.modbus_holding_register_values.insert(name.to_string(), Some(v)); } return; } } } } } fn extract_name_from_set_topic(base_path: &str, topic: &str) -> Result { let prefix = format!("{}/", base_path.trim_end_matches('/')); if !topic.starts_with(&prefix) || !topic.ends_with("/set") { return Err("Topic passt nicht zum /set-Schema".to_string()); } let without_prefix = &topic[prefix.len()..]; let name = without_prefix.strip_suffix("/set").unwrap_or_default(); if name.is_empty() || name.contains('/') { return Err("Ungültiger Variablenname im Topic".to_string()); } Ok(name.to_string()) } fn should_publish(config: &Arc>, name: &str, reg_type: &str) -> bool { let cfg = config.lock().unwrap(); match reg_type { "input_register" => { if let Some(regs) = &cfg.modbus_input_register { for map in regs { if let Some(reg) = map.get(name) { return reg.mqtt.unwrap_or(false); } } } } "holding_register" => { if let Some(regs) = &cfg.modbus_holding_register { for map in regs { if let Some(reg) = map.get(name) { return reg.mqtt.unwrap_or(false); } } } } "coils" => { if let Some(coils) = &cfg.modbus_coils { for map in coils { if let Some(coil) = map.get(name) { return coil.mqtt.unwrap_or(false); } } } } _ => {} } false }