diff --git a/paramod.yaml b/paramod.yaml index f243099..4708c23 100644 --- a/paramod.yaml +++ b/paramod.yaml @@ -7,7 +7,7 @@ mqtt: user: admin password: 97sm3pHNSMZ4M5qUj0x8 path: heizung/paradigma - button_circulation: zigbee2mqtt/WirelessButton + leitsystem_path: heizung/leitsystem2 influxdb: bucket: Paradigma org: skaville @@ -22,7 +22,7 @@ modbus: max_input_addr: 45 max_holding_addr: 61 modbus_coils: - - MgtSystem: {addr: 0, write: false, mqtt: false, influxdb: false, comment: Leitsystem aktiv} + - MgtSystem: {addr: 0, write: false, mqtt: true, influxdb: false, comment: Leitsystem aktiv} - HK1pres: {addr: 1, write: false, mqtt: false, influxdb: false, comment: HK1 vorhanden} - HK2pres: {addr: 2, write: false, mqtt: false, influxdb: false, comment: HK2 vorhanden} - HK3pres: {addr: 3, write: false, mqtt: false, influxdb: false, comment: HK3 vorhanden} @@ -78,9 +78,9 @@ modbus_input_register: - VKW: {addr: 43, type: INT16, factor: 0.1, mqtt: false, influxdb: true} - VSPm: {addr: 44, type: INT16, factor: 0.1, mqtt: false, influxdb: true} modbus_holding_register: - - nothing: {addr: 0, type: UINT16, factor: 1, write: true, mqtt: false, influxdb: false} + - nothing: {addr: 0, type: UINT16, factor: 1, write: false, mqtt: false, influxdb: false} - ErrLS: {addr: 1, type: UINT16, factor: 1, write: false, mqtt: false, influxdb: true} - - TVsoll: {addr: 2, type: INT16, factor: 0.1, write: false, mqtt: false, influxdb: true} + - TVsoll: {addr: 2, type: INT16, factor: 0.1, write: true, mqtt: true, influxdb: true} - TV2soll: {addr: 3, type: INT16, factor: 0.1, write: false, mqtt: false, influxdb: false} - TV3soll: {addr: 4, type: INT16, factor: 0.1, write: false, mqtt: false, influxdb: false} - HK1soll: {addr: 5, type: UINT16, factor: 1, write: false, mqtt: false, influxdb: true} diff --git a/src/config.rs b/src/config.rs index 8b2725a..5d1040f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -24,7 +24,8 @@ pub struct MqttConfig { pub user: Option, pub password: Option, pub path: Option, - pub button_circulation: Option, + pub leitsystem_path: Option, + pub set_write_interval_ms: Option, } #[derive(Debug, Serialize, Deserialize, Clone)] diff --git a/src/modbus.rs b/src/modbus.rs index d363210..5f1d87e 100644 --- a/src/modbus.rs +++ b/src/modbus.rs @@ -1,5 +1,6 @@ use crate::config::{ModbusValueMaps, ModbusConfig}; use crate::modbus_types::{ModbusInputRegisterConfig, ModbusHoldingRegisterConfig, ModbusCoilsConfig}; +use crate::config::AppConfig; use std::collections::HashMap; use std::sync::{Arc, Mutex}; use std::thread; @@ -9,6 +10,112 @@ use tokio_modbus::prelude::*; use tokio_modbus::client::tcp; use std::net::SocketAddr; +pub fn write_value_by_name(config: &AppConfig, name: &str, payload: &str) -> Result<(), String> { + let addr: SocketAddr = format!("{}:{}", config.modbus.host, config.modbus.port) + .parse() + .map_err(|e| format!("Ungültige Modbus-Adresse: {}", e))?; + + if let Some(ref coils) = config.modbus_coils { + for map in coils { + if let Some(coil) = map.get(name) { + if !coil.write.unwrap_or(false) { + return Err(format!("Schreiben für '{}' nicht erlaubt", name)); + } + let bool_value = parse_bool_payload(payload)?; + let rt = tokio::runtime::Runtime::new() + .map_err(|e| format!("Tokio Runtime Fehler: {}", e))?; + rt.block_on(async { + let mut client = tcp::connect_slave(addr, 1u8.into()) + .await + .map_err(|e| format!("Modbus Verbindung fehlgeschlagen: {}", e))?; + client + .write_single_coil(coil.addr, bool_value) + .await + .map_err(|e| format!("Coil schreiben fehlgeschlagen: {}", e)) + })?; + return Ok(()); + } + } + } + + if let Some(ref holding_registers) = config.modbus_holding_register { + for map in holding_registers { + if let Some(reg) = map.get(name) { + if !reg.write.unwrap_or(false) { + return Err(format!("Schreiben für '{}' nicht erlaubt", name)); + } + let numeric_value = payload + .trim() + .parse::() + .map_err(|e| format!("Ungültiger Zahlenwert '{}': {}", payload, e))?; + let factor = reg.factor.unwrap_or(1.0); + let raw = if factor == 0.0 { + numeric_value + } else { + numeric_value / factor + }; + + let rt = tokio::runtime::Runtime::new() + .map_err(|e| format!("Tokio Runtime Fehler: {}", e))?; + + return rt.block_on(async { + let mut client = tcp::connect_slave(addr, 1u8.into()) + .await + .map_err(|e| format!("Modbus Verbindung fehlgeschlagen: {}", e))?; + + match reg.r#type.as_deref().unwrap_or("UINT16") { + "INT16" => { + let int_value = raw.round(); + if int_value < i16::MIN as f64 || int_value > i16::MAX as f64 { + return Err(format!("Wert außerhalb INT16-Bereich für '{}': {}", name, numeric_value)); + } + let register = int_value as i16 as u16; + client + .write_multiple_registers(reg.addr, &[register]) + .await + .map_err(|e| format!("Holding Register schreiben fehlgeschlagen: {}", e))?; + } + "UINT32" => { + let int_value = raw.round(); + if int_value < 0.0 || int_value > u32::MAX as f64 { + return Err(format!("Wert außerhalb UINT32-Bereich für '{}': {}", name, numeric_value)); + } + let val = int_value as u32; + let high = ((val >> 16) & 0xFFFF) as u16; + let low = (val & 0xFFFF) as u16; + client + .write_multiple_registers(reg.addr, &[high, low]) + .await + .map_err(|e| format!("Holding Register (UINT32) schreiben fehlgeschlagen: {}", e))?; + } + _ => { + let int_value = raw.round(); + if int_value < 0.0 || int_value > u16::MAX as f64 { + return Err(format!("Wert außerhalb UINT16-Bereich für '{}': {}", name, numeric_value)); + } + client + .write_multiple_registers(reg.addr, &[int_value as u16]) + .await + .map_err(|e| format!("Holding Register schreiben fehlgeschlagen: {}", e))?; + } + } + Ok(()) + }); + } + } + } + + Err(format!("Kein schreibbares Modbus-Mapping für '{}' gefunden", name)) +} + +fn parse_bool_payload(payload: &str) -> Result { + match payload.trim().to_ascii_lowercase().as_str() { + "1" | "true" | "on" => Ok(true), + "0" | "false" | "off" => Ok(false), + _ => Err(format!("Ungültiger bool-Wert '{}', erwartet 0/1 oder true/false", payload)), + } +} + /// Startet einen Thread, der zyklisch Modbus-Register abfragt und ModbusValueMaps aktualisiert pub fn start_modbus_polling_thread( modbus_config: &ModbusConfig, diff --git a/src/mqtt.rs b/src/mqtt.rs index 0610dc4..61cf3f4 100644 --- a/src/mqtt.rs +++ b/src/mqtt.rs @@ -1,8 +1,11 @@ use std::sync::{Arc, Mutex}; +use std::process; +use std::collections::HashMap; use std::thread; -use std::time::Duration; +use std::time::{Duration, Instant}; use crate::config::{AppConfig, ModbusValueMaps}; -use rumqttc::{MqttOptions, Client, QoS}; +use crate::modbus; +use rumqttc::{Event, MqttOptions, Packet, Client, QoS, RecvTimeoutError}; pub fn start_mqtt_thread(config: Arc>, values: Arc>) { let mqtt_config = { @@ -12,59 +15,182 @@ pub fn start_mqtt_thread(config: Arc>, values: Arc = HashMap::new(); + let mut leitsystem_enabled = false; + loop { - { - let values = values.lock().unwrap(); - // Input Register - for (name, val) in &values.modbus_input_register_values { - if let Some(v) = val { - if should_publish(&config, name, "input_register") { - let topic = format!("{}/{}/state", path, name); - let payload = format!("{}", v); - let _ = client.publish(topic, QoS::AtLeastOnce, false, payload); + if last_publish.elapsed() >= publish_interval { + { + let values = values.lock().unwrap(); + // Input Register + for (name, val) in &values.modbus_input_register_values { + if let Some(v) = val { + if should_publish(&config, name, "input_register") { + let topic = format!("{}/{}/state", path, name); + let payload = format!("{}", v); + let _ = client.publish(topic, QoS::AtLeastOnce, false, payload); + } + } + } + // Holding Register + for (name, val) in &values.modbus_holding_register_values { + if let Some(v) = val { + if should_publish(&config, name, "holding_register") { + let topic = format!("{}/{}/state", path, name); + let payload = format!("{}", v); + let _ = client.publish(topic, QoS::AtLeastOnce, false, payload); + } + } + } + // Coils + for (name, val) in &values.modbus_coils_values { + if let Some(v) = val { + if should_publish(&config, name, "coils") { + let topic = format!("{}/{}/state", path, name); + let payload = format!("{}", v); + let _ = client.publish(topic, QoS::AtLeastOnce, false, payload); + } } } } - // Holding Register - for (name, val) in &values.modbus_holding_register_values { - if let Some(v) = val { - if should_publish(&config, name, "holding_register") { - let topic = format!("{}/{}/state", path, name); - let payload = format!("{}", v); - let _ = client.publish(topic, QoS::AtLeastOnce, false, payload); + + let state_payload = if leitsystem_enabled { "ON" } else { "OFF" }; + let _ = client.publish( + leitsystem_state_topic.clone(), + QoS::AtLeastOnce, + true, + state_payload, + ); + last_publish = Instant::now(); + } + + if last_set_write.elapsed() >= set_write_interval { + let cfg = { + let lock = config.lock().unwrap(); + lock.clone() + }; + + if leitsystem_enabled { + for (name, payload) in &pending_set_values { + if let Err(e) = modbus::write_value_by_name(&cfg, name, payload) { + eprintln!("MQTT set -> Modbus Fehler ({}): {}", name, e); + } else { + mirror_set_value(&values, &cfg, name, payload); } } } - // Coils - for (name, val) in &values.modbus_coils_values { - if let Some(v) = val { - if should_publish(&config, name, "coils") { - let topic = format!("{}/{}/state", path, name); - let payload = format!("{}", v); - let _ = client.publish(topic, QoS::AtLeastOnce, false, payload); + + last_set_write = Instant::now(); + } + + match connection.recv_timeout(Duration::from_millis(200)) { + Ok(Ok(Event::Incoming(Packet::Publish(publish)))) => { + if publish.topic == leitsystem_state_topic { + let payload = String::from_utf8_lossy(&publish.payload).trim().to_string(); + match parse_boolish(&payload) { + Some(v) => leitsystem_enabled = v, + None => eprintln!("Ungültiger Leitsystem state '{}', erwartet ON/OFF", payload), } + continue; } + + if let Ok(name) = extract_name_from_set_topic(&path, &publish.topic) { + let payload = String::from_utf8_lossy(&publish.payload).trim().to_string(); + pending_set_values.insert(name.clone(), payload.clone()); + } + } + Ok(Ok(_)) => {} + Ok(Err(e)) => { + eprintln!("MQTT Connection Fehler: {}", e); + eprintln!("Hinweis: Häufige Ursache ist eine zweite MQTT-Session mit derselben Client-ID."); + } + Err(RecvTimeoutError::Timeout) => {} + Err(RecvTimeoutError::Disconnected) => { + eprintln!("MQTT Verbindung getrennt (request channel geschlossen)"); + thread::sleep(Duration::from_millis(500)); } } - // Handle MQTT connection events - for _event in connection.iter().take(1) { - // Optionally log or handle events - } - thread::sleep(Duration::from_secs(5)); } }); } +fn parse_boolish(payload: &str) -> Option { + match payload.trim().to_ascii_lowercase().as_str() { + "1" | "true" | "on" => Some(true), + "0" | "false" | "off" => Some(false), + _ => None, + } +} + +fn mirror_set_value(values: &Arc>, cfg: &AppConfig, name: &str, payload: &str) { + if let Ok(mut maps) = values.lock() { + if let Some(ref coils) = cfg.modbus_coils { + for map in coils { + if map.contains_key(name) { + if let Some(v) = parse_boolish(payload) { + maps.modbus_coils_values.insert(name.to_string(), Some(if v { 1.0 } else { 0.0 })); + } + return; + } + } + } + + if let Some(ref holding) = cfg.modbus_holding_register { + for map in holding { + if map.contains_key(name) { + if let Ok(v) = payload.parse::() { + maps.modbus_holding_register_values.insert(name.to_string(), Some(v)); + } + return; + } + } + } + } +} + +fn extract_name_from_set_topic(base_path: &str, topic: &str) -> Result { + let prefix = format!("{}/", base_path.trim_end_matches('/')); + if !topic.starts_with(&prefix) || !topic.ends_with("/set") { + return Err("Topic passt nicht zum /set-Schema".to_string()); + } + let without_prefix = &topic[prefix.len()..]; + let name = without_prefix.strip_suffix("/set").unwrap_or_default(); + if name.is_empty() || name.contains('/') { + return Err("Ungültiger Variablenname im Topic".to_string()); + } + Ok(name.to_string()) +} + fn should_publish(config: &Arc>, name: &str, reg_type: &str) -> bool { let cfg = config.lock().unwrap(); match reg_type { diff --git a/static/settings.js b/static/settings.js index 9e74e57..4d8f823 100644 --- a/static/settings.js +++ b/static/settings.js @@ -55,7 +55,8 @@ async function saveSettings() { user: document.getElementById('mqtt_user').value || null, password: document.getElementById('mqtt_password').value || null, path: document.getElementById('mqtt_path')?.value || null, - button_circulation: document.getElementById('mqtt_button_circulation').value || null + leitsystem_path: document.getElementById('mqtt_leitsystem_path')?.value || null, + set_write_interval_ms: parseInt(document.getElementById('mqtt_set_write_interval_ms')?.value, 10) || 2000 }; // InfluxDB diff --git a/templates/settings.html b/templates/settings.html index 0fa8b91..eba102e 100644 --- a/templates/settings.html +++ b/templates/settings.html @@ -98,8 +98,12 @@
- - + + +
+
+ +