Schreibe MQTT.Set Werte, wenn das Leitsystem aktiv ist
Some checks failed
Build Docker Image (Podman) / build (push) Failing after 15m52s

This commit is contained in:
Eric Neuber 2026-03-13 22:56:06 +01:00
parent 497a0e48e3
commit 3ed80f4b68
6 changed files with 278 additions and 39 deletions

View File

@ -7,7 +7,7 @@ mqtt:
user: admin user: admin
password: 97sm3pHNSMZ4M5qUj0x8 password: 97sm3pHNSMZ4M5qUj0x8
path: heizung/paradigma path: heizung/paradigma
button_circulation: zigbee2mqtt/WirelessButton leitsystem_path: heizung/leitsystem2
influxdb: influxdb:
bucket: Paradigma bucket: Paradigma
org: skaville org: skaville
@ -22,7 +22,7 @@ modbus:
max_input_addr: 45 max_input_addr: 45
max_holding_addr: 61 max_holding_addr: 61
modbus_coils: modbus_coils:
- MgtSystem: {addr: 0, write: false, mqtt: false, influxdb: false, comment: Leitsystem aktiv} - MgtSystem: {addr: 0, write: false, mqtt: true, influxdb: false, comment: Leitsystem aktiv}
- HK1pres: {addr: 1, write: false, mqtt: false, influxdb: false, comment: HK1 vorhanden} - HK1pres: {addr: 1, write: false, mqtt: false, influxdb: false, comment: HK1 vorhanden}
- HK2pres: {addr: 2, write: false, mqtt: false, influxdb: false, comment: HK2 vorhanden} - HK2pres: {addr: 2, write: false, mqtt: false, influxdb: false, comment: HK2 vorhanden}
- HK3pres: {addr: 3, write: false, mqtt: false, influxdb: false, comment: HK3 vorhanden} - HK3pres: {addr: 3, write: false, mqtt: false, influxdb: false, comment: HK3 vorhanden}
@ -78,9 +78,9 @@ modbus_input_register:
- VKW: {addr: 43, type: INT16, factor: 0.1, mqtt: false, influxdb: true} - VKW: {addr: 43, type: INT16, factor: 0.1, mqtt: false, influxdb: true}
- VSPm: {addr: 44, type: INT16, factor: 0.1, mqtt: false, influxdb: true} - VSPm: {addr: 44, type: INT16, factor: 0.1, mqtt: false, influxdb: true}
modbus_holding_register: modbus_holding_register:
- nothing: {addr: 0, type: UINT16, factor: 1, write: true, mqtt: false, influxdb: false} - nothing: {addr: 0, type: UINT16, factor: 1, write: false, mqtt: false, influxdb: false}
- ErrLS: {addr: 1, type: UINT16, factor: 1, write: false, mqtt: false, influxdb: true} - ErrLS: {addr: 1, type: UINT16, factor: 1, write: false, mqtt: false, influxdb: true}
- TVsoll: {addr: 2, type: INT16, factor: 0.1, write: false, mqtt: false, influxdb: true} - TVsoll: {addr: 2, type: INT16, factor: 0.1, write: true, mqtt: true, influxdb: true}
- TV2soll: {addr: 3, type: INT16, factor: 0.1, write: false, mqtt: false, influxdb: false} - TV2soll: {addr: 3, type: INT16, factor: 0.1, write: false, mqtt: false, influxdb: false}
- TV3soll: {addr: 4, type: INT16, factor: 0.1, write: false, mqtt: false, influxdb: false} - TV3soll: {addr: 4, type: INT16, factor: 0.1, write: false, mqtt: false, influxdb: false}
- HK1soll: {addr: 5, type: UINT16, factor: 1, write: false, mqtt: false, influxdb: true} - HK1soll: {addr: 5, type: UINT16, factor: 1, write: false, mqtt: false, influxdb: true}

View File

@ -24,7 +24,8 @@ pub struct MqttConfig {
pub user: Option<String>, pub user: Option<String>,
pub password: Option<String>, pub password: Option<String>,
pub path: Option<String>, pub path: Option<String>,
pub button_circulation: Option<String>, pub leitsystem_path: Option<String>,
pub set_write_interval_ms: Option<u64>,
} }
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]

View File

@ -1,5 +1,6 @@
use crate::config::{ModbusValueMaps, ModbusConfig}; use crate::config::{ModbusValueMaps, ModbusConfig};
use crate::modbus_types::{ModbusInputRegisterConfig, ModbusHoldingRegisterConfig, ModbusCoilsConfig}; use crate::modbus_types::{ModbusInputRegisterConfig, ModbusHoldingRegisterConfig, ModbusCoilsConfig};
use crate::config::AppConfig;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread; use std::thread;
@ -9,6 +10,112 @@ use tokio_modbus::prelude::*;
use tokio_modbus::client::tcp; use tokio_modbus::client::tcp;
use std::net::SocketAddr; use std::net::SocketAddr;
pub fn write_value_by_name(config: &AppConfig, name: &str, payload: &str) -> Result<(), String> {
let addr: SocketAddr = format!("{}:{}", config.modbus.host, config.modbus.port)
.parse()
.map_err(|e| format!("Ungültige Modbus-Adresse: {}", e))?;
if let Some(ref coils) = config.modbus_coils {
for map in coils {
if let Some(coil) = map.get(name) {
if !coil.write.unwrap_or(false) {
return Err(format!("Schreiben für '{}' nicht erlaubt", name));
}
let bool_value = parse_bool_payload(payload)?;
let rt = tokio::runtime::Runtime::new()
.map_err(|e| format!("Tokio Runtime Fehler: {}", e))?;
rt.block_on(async {
let mut client = tcp::connect_slave(addr, 1u8.into())
.await
.map_err(|e| format!("Modbus Verbindung fehlgeschlagen: {}", e))?;
client
.write_single_coil(coil.addr, bool_value)
.await
.map_err(|e| format!("Coil schreiben fehlgeschlagen: {}", e))
})?;
return Ok(());
}
}
}
if let Some(ref holding_registers) = config.modbus_holding_register {
for map in holding_registers {
if let Some(reg) = map.get(name) {
if !reg.write.unwrap_or(false) {
return Err(format!("Schreiben für '{}' nicht erlaubt", name));
}
let numeric_value = payload
.trim()
.parse::<f64>()
.map_err(|e| format!("Ungültiger Zahlenwert '{}': {}", payload, e))?;
let factor = reg.factor.unwrap_or(1.0);
let raw = if factor == 0.0 {
numeric_value
} else {
numeric_value / factor
};
let rt = tokio::runtime::Runtime::new()
.map_err(|e| format!("Tokio Runtime Fehler: {}", e))?;
return rt.block_on(async {
let mut client = tcp::connect_slave(addr, 1u8.into())
.await
.map_err(|e| format!("Modbus Verbindung fehlgeschlagen: {}", e))?;
match reg.r#type.as_deref().unwrap_or("UINT16") {
"INT16" => {
let int_value = raw.round();
if int_value < i16::MIN as f64 || int_value > i16::MAX as f64 {
return Err(format!("Wert außerhalb INT16-Bereich für '{}': {}", name, numeric_value));
}
let register = int_value as i16 as u16;
client
.write_multiple_registers(reg.addr, &[register])
.await
.map_err(|e| format!("Holding Register schreiben fehlgeschlagen: {}", e))?;
}
"UINT32" => {
let int_value = raw.round();
if int_value < 0.0 || int_value > u32::MAX as f64 {
return Err(format!("Wert außerhalb UINT32-Bereich für '{}': {}", name, numeric_value));
}
let val = int_value as u32;
let high = ((val >> 16) & 0xFFFF) as u16;
let low = (val & 0xFFFF) as u16;
client
.write_multiple_registers(reg.addr, &[high, low])
.await
.map_err(|e| format!("Holding Register (UINT32) schreiben fehlgeschlagen: {}", e))?;
}
_ => {
let int_value = raw.round();
if int_value < 0.0 || int_value > u16::MAX as f64 {
return Err(format!("Wert außerhalb UINT16-Bereich für '{}': {}", name, numeric_value));
}
client
.write_multiple_registers(reg.addr, &[int_value as u16])
.await
.map_err(|e| format!("Holding Register schreiben fehlgeschlagen: {}", e))?;
}
}
Ok(())
});
}
}
}
Err(format!("Kein schreibbares Modbus-Mapping für '{}' gefunden", name))
}
fn parse_bool_payload(payload: &str) -> Result<bool, String> {
match payload.trim().to_ascii_lowercase().as_str() {
"1" | "true" | "on" => Ok(true),
"0" | "false" | "off" => Ok(false),
_ => Err(format!("Ungültiger bool-Wert '{}', erwartet 0/1 oder true/false", payload)),
}
}
/// Startet einen Thread, der zyklisch Modbus-Register abfragt und ModbusValueMaps aktualisiert /// Startet einen Thread, der zyklisch Modbus-Register abfragt und ModbusValueMaps aktualisiert
pub fn start_modbus_polling_thread( pub fn start_modbus_polling_thread(
modbus_config: &ModbusConfig, modbus_config: &ModbusConfig,

View File

@ -1,8 +1,11 @@
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::process;
use std::collections::HashMap;
use std::thread; use std::thread;
use std::time::Duration; use std::time::{Duration, Instant};
use crate::config::{AppConfig, ModbusValueMaps}; use crate::config::{AppConfig, ModbusValueMaps};
use rumqttc::{MqttOptions, Client, QoS}; use crate::modbus;
use rumqttc::{Event, MqttOptions, Packet, Client, QoS, RecvTimeoutError};
pub fn start_mqtt_thread(config: Arc<Mutex<AppConfig>>, values: Arc<Mutex<ModbusValueMaps>>) { pub fn start_mqtt_thread(config: Arc<Mutex<AppConfig>>, values: Arc<Mutex<ModbusValueMaps>>) {
let mqtt_config = { let mqtt_config = {
@ -12,17 +15,41 @@ pub fn start_mqtt_thread(config: Arc<Mutex<AppConfig>>, values: Arc<Mutex<Modbus
let broker = if mqtt_config.broker.is_empty() { "localhost".to_string() } else { mqtt_config.broker.clone() }; let broker = if mqtt_config.broker.is_empty() { "localhost".to_string() } else { mqtt_config.broker.clone() };
let port = if mqtt_config.port == 0 { 1883 } else { mqtt_config.port }; let port = if mqtt_config.port == 0 { 1883 } else { mqtt_config.port };
let path = if let Some(ref p) = mqtt_config.path { p.clone() } else { "paramod/values".to_string() }; let path = if let Some(ref p) = mqtt_config.path { p.clone() } else { "paramod/values".to_string() };
let leitsystem_path = mqtt_config
.leitsystem_path
.clone()
.unwrap_or_else(|| format!("{}/leitsystem", path));
let set_write_interval = Duration::from_millis(mqtt_config.set_write_interval_ms.unwrap_or(2000).max(100));
let user = mqtt_config.user.clone().unwrap_or_default(); let user = mqtt_config.user.clone().unwrap_or_default();
let password = mqtt_config.password.clone().unwrap_or_default(); let password = mqtt_config.password.clone().unwrap_or_default();
let user_is_empty = user.is_empty(); let user_is_empty = user.is_empty();
thread::spawn(move || { thread::spawn(move || {
let mut mqttoptions = MqttOptions::new("paramod-client", broker, port); let client_id = format!("paramod-client-{}", process::id());
let mut mqttoptions = MqttOptions::new(client_id, broker, port);
mqttoptions.set_keep_alive(Duration::from_secs(5));
if !user_is_empty { if !user_is_empty {
mqttoptions.set_credentials(user, password); mqttoptions.set_credentials(user, password);
} }
let (mut client, mut connection) = Client::new(mqttoptions, 10); let (mut client, mut connection) = Client::new(mqttoptions, 10);
let set_topic = format!("{}/+/set", path);
if let Err(e) = client.subscribe(set_topic, QoS::AtLeastOnce) {
eprintln!("MQTT Subscribe fehlgeschlagen: {}", e);
}
let leitsystem_state_topic = format!("{}/state", leitsystem_path.trim_end_matches('/'));
if let Err(e) = client.subscribe(leitsystem_state_topic.clone(), QoS::AtLeastOnce) {
eprintln!("MQTT Subscribe Leitsystem fehlgeschlagen: {}", e);
}
let publish_interval = Duration::from_secs(5);
let mut last_publish = Instant::now() - publish_interval;
let mut last_set_write = Instant::now() - set_write_interval;
let mut pending_set_values: HashMap<String, String> = HashMap::new();
let mut leitsystem_enabled = false;
loop { loop {
if last_publish.elapsed() >= publish_interval {
{ {
let values = values.lock().unwrap(); let values = values.lock().unwrap();
// Input Register // Input Register
@ -56,15 +83,114 @@ pub fn start_mqtt_thread(config: Arc<Mutex<AppConfig>>, values: Arc<Mutex<Modbus
} }
} }
} }
// Handle MQTT connection events
for _event in connection.iter().take(1) { let state_payload = if leitsystem_enabled { "ON" } else { "OFF" };
// Optionally log or handle events let _ = client.publish(
leitsystem_state_topic.clone(),
QoS::AtLeastOnce,
true,
state_payload,
);
last_publish = Instant::now();
}
if last_set_write.elapsed() >= set_write_interval {
let cfg = {
let lock = config.lock().unwrap();
lock.clone()
};
if leitsystem_enabled {
for (name, payload) in &pending_set_values {
if let Err(e) = modbus::write_value_by_name(&cfg, name, payload) {
eprintln!("MQTT set -> Modbus Fehler ({}): {}", name, e);
} else {
mirror_set_value(&values, &cfg, name, payload);
}
}
}
last_set_write = Instant::now();
}
match connection.recv_timeout(Duration::from_millis(200)) {
Ok(Ok(Event::Incoming(Packet::Publish(publish)))) => {
if publish.topic == leitsystem_state_topic {
let payload = String::from_utf8_lossy(&publish.payload).trim().to_string();
match parse_boolish(&payload) {
Some(v) => leitsystem_enabled = v,
None => eprintln!("Ungültiger Leitsystem state '{}', erwartet ON/OFF", payload),
}
continue;
}
if let Ok(name) = extract_name_from_set_topic(&path, &publish.topic) {
let payload = String::from_utf8_lossy(&publish.payload).trim().to_string();
pending_set_values.insert(name.clone(), payload.clone());
}
}
Ok(Ok(_)) => {}
Ok(Err(e)) => {
eprintln!("MQTT Connection Fehler: {}", e);
eprintln!("Hinweis: Häufige Ursache ist eine zweite MQTT-Session mit derselben Client-ID.");
}
Err(RecvTimeoutError::Timeout) => {}
Err(RecvTimeoutError::Disconnected) => {
eprintln!("MQTT Verbindung getrennt (request channel geschlossen)");
thread::sleep(Duration::from_millis(500));
}
} }
thread::sleep(Duration::from_secs(5));
} }
}); });
} }
fn parse_boolish(payload: &str) -> Option<bool> {
match payload.trim().to_ascii_lowercase().as_str() {
"1" | "true" | "on" => Some(true),
"0" | "false" | "off" => Some(false),
_ => None,
}
}
fn mirror_set_value(values: &Arc<Mutex<ModbusValueMaps>>, cfg: &AppConfig, name: &str, payload: &str) {
if let Ok(mut maps) = values.lock() {
if let Some(ref coils) = cfg.modbus_coils {
for map in coils {
if map.contains_key(name) {
if let Some(v) = parse_boolish(payload) {
maps.modbus_coils_values.insert(name.to_string(), Some(if v { 1.0 } else { 0.0 }));
}
return;
}
}
}
if let Some(ref holding) = cfg.modbus_holding_register {
for map in holding {
if map.contains_key(name) {
if let Ok(v) = payload.parse::<f64>() {
maps.modbus_holding_register_values.insert(name.to_string(), Some(v));
}
return;
}
}
}
}
}
fn extract_name_from_set_topic(base_path: &str, topic: &str) -> Result<String, String> {
let prefix = format!("{}/", base_path.trim_end_matches('/'));
if !topic.starts_with(&prefix) || !topic.ends_with("/set") {
return Err("Topic passt nicht zum /set-Schema".to_string());
}
let without_prefix = &topic[prefix.len()..];
let name = without_prefix.strip_suffix("/set").unwrap_or_default();
if name.is_empty() || name.contains('/') {
return Err("Ungültiger Variablenname im Topic".to_string());
}
Ok(name.to_string())
}
fn should_publish(config: &Arc<Mutex<AppConfig>>, name: &str, reg_type: &str) -> bool { fn should_publish(config: &Arc<Mutex<AppConfig>>, name: &str, reg_type: &str) -> bool {
let cfg = config.lock().unwrap(); let cfg = config.lock().unwrap();
match reg_type { match reg_type {

View File

@ -55,7 +55,8 @@ async function saveSettings() {
user: document.getElementById('mqtt_user').value || null, user: document.getElementById('mqtt_user').value || null,
password: document.getElementById('mqtt_password').value || null, password: document.getElementById('mqtt_password').value || null,
path: document.getElementById('mqtt_path')?.value || null, path: document.getElementById('mqtt_path')?.value || null,
button_circulation: document.getElementById('mqtt_button_circulation').value || null leitsystem_path: document.getElementById('mqtt_leitsystem_path')?.value || null,
set_write_interval_ms: parseInt(document.getElementById('mqtt_set_write_interval_ms')?.value, 10) || 2000
}; };
// InfluxDB // InfluxDB

View File

@ -98,8 +98,12 @@
<input type="text" id="mqtt_path" class="text-input" value="{{ mqtt.path | default(value="") }}" /> <input type="text" id="mqtt_path" class="text-input" value="{{ mqtt.path | default(value="") }}" />
</div> </div>
<div class="form-group"> <div class="form-group">
<label for="mqtt_button_circulation">Button Circulation:</label> <label for="mqtt_leitsystem_path">Leitsystem Pfad:</label>
<input type="text" id="mqtt_button_circulation" class="text-input" value="{{ mqtt.button_circulation | default(value="") }}" /> <input type="text" id="mqtt_leitsystem_path" class="text-input" value="{{ mqtt.leitsystem_path | default(value="") }}" />
</div>
<div class="form-group">
<label for="mqtt_set_write_interval_ms">Set-Sendeintervall (ms):</label>
<input type="number" min="100" id="mqtt_set_write_interval_ms" class="text-input" value="{{ mqtt.set_write_interval_ms | default(value="2000") }}" />
</div> </div>
</div> </div>