Werte in den MQTT Broker übermitteln

This commit is contained in:
Eric Neuber 2026-02-17 21:01:10 +01:00
parent 483d27750b
commit 58bacd2f6d
6 changed files with 310 additions and 8 deletions

191
Cargo.lock generated
View File

@ -486,6 +486,16 @@ dependencies = [
"version_check", "version_check",
] ]
[[package]]
name = "core-foundation"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]] [[package]]
name = "core-foundation-sys" name = "core-foundation-sys"
version = "0.8.7" version = "0.8.7"
@ -648,6 +658,17 @@ dependencies = [
"miniz_oxide", "miniz_oxide",
] ]
[[package]]
name = "flume"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095"
dependencies = [
"futures-core",
"futures-sink",
"spin",
]
[[package]] [[package]]
name = "fnv" name = "fnv"
version = "1.0.7" version = "1.0.7"
@ -698,6 +719,7 @@ dependencies = [
"futures-task", "futures-task",
"pin-project-lite", "pin-project-lite",
"pin-utils", "pin-utils",
"slab",
] ]
[[package]] [[package]]
@ -1177,6 +1199,12 @@ version = "1.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
[[package]]
name = "openssl-probe"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
[[package]] [[package]]
name = "ordered-multimap" name = "ordered-multimap"
version = "0.7.3" version = "0.7.3"
@ -1481,6 +1509,20 @@ version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58"
[[package]]
name = "ring"
version = "0.17.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7"
dependencies = [
"cc",
"cfg-if",
"getrandom 0.2.16",
"libc",
"untrusted",
"windows-sys 0.52.0",
]
[[package]] [[package]]
name = "ron" name = "ron"
version = "0.8.1" version = "0.8.1"
@ -1493,6 +1535,24 @@ dependencies = [
"serde_derive", "serde_derive",
] ]
[[package]]
name = "rumqttc"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d8941c6791801b667d52bfe9ff4fc7c968d4f3f9ae8ae7abdaaa1c966feafc8"
dependencies = [
"bytes",
"flume",
"futures-util",
"log",
"rustls-native-certs",
"rustls-pemfile",
"rustls-webpki",
"thiserror",
"tokio",
"tokio-rustls",
]
[[package]] [[package]]
name = "rust-ini" name = "rust-ini"
version = "0.20.0" version = "0.20.0"
@ -1503,6 +1563,49 @@ dependencies = [
"ordered-multimap", "ordered-multimap",
] ]
[[package]]
name = "rustls"
version = "0.21.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e"
dependencies = [
"log",
"ring",
"rustls-webpki",
"sct",
]
[[package]]
name = "rustls-native-certs"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00"
dependencies = [
"openssl-probe",
"rustls-pemfile",
"schannel",
"security-framework",
]
[[package]]
name = "rustls-pemfile"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c"
dependencies = [
"base64 0.21.7",
]
[[package]]
name = "rustls-webpki"
version = "0.101.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765"
dependencies = [
"ring",
"untrusted",
]
[[package]] [[package]]
name = "rustversion" name = "rustversion"
version = "1.0.22" version = "1.0.22"
@ -1524,12 +1627,54 @@ dependencies = [
"winapi-util", "winapi-util",
] ]
[[package]]
name = "schannel"
version = "0.1.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "891d81b926048e76efe18581bf793546b4c0eaf8448d72be8de2bbee5fd166e1"
dependencies = [
"windows-sys 0.61.2",
]
[[package]] [[package]]
name = "scopeguard" name = "scopeguard"
version = "1.2.0" version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "sct"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "security-framework"
version = "2.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02"
dependencies = [
"bitflags",
"core-foundation",
"core-foundation-sys",
"libc",
"security-framework-sys",
]
[[package]]
name = "security-framework-sys"
version = "2.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "321c8673b092a9a42605034a9879d73cb79101ed5fd117bc9a597b89b4e9e61a"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.228" version = "1.0.228"
@ -1698,6 +1843,15 @@ dependencies = [
"windows-sys 0.60.2", "windows-sys 0.60.2",
] ]
[[package]]
name = "spin"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
dependencies = [
"lock_api",
]
[[package]] [[package]]
name = "stable_deref_trait" name = "stable_deref_trait"
version = "1.2.1" version = "1.2.1"
@ -1733,6 +1887,7 @@ dependencies = [
"actix-files", "actix-files",
"actix-web", "actix-web",
"config", "config",
"rumqttc",
"serde", "serde",
"serde_derive", "serde_derive",
"serde_json", "serde_json",
@ -1765,6 +1920,26 @@ dependencies = [
"unicode-segmentation", "unicode-segmentation",
] ]
[[package]]
name = "thiserror"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "time" name = "time"
version = "0.3.44" version = "0.3.44"
@ -1859,6 +2034,16 @@ dependencies = [
"tokio-util", "tokio-util",
] ]
[[package]]
name = "tokio-rustls"
version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081"
dependencies = [
"rustls",
"tokio",
]
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.7.17" version = "0.7.17"
@ -1987,6 +2172,12 @@ version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861"
[[package]]
name = "untrusted"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]] [[package]]
name = "url" name = "url"
version = "2.5.7" version = "2.5.7"

View File

@ -17,3 +17,5 @@ config = "0.14"
toml = "0.8" toml = "0.8"
serde_derive = "1.0" serde_derive = "1.0"
tokio-modbus = "0.7" tokio-modbus = "0.7"
rumqttc = "0.23"

View File

@ -6,7 +6,7 @@ mqtt:
port: 1883 port: 1883
user: admin user: admin
password: 97sm3pHNSMZ4M5qUj0x8 password: 97sm3pHNSMZ4M5qUj0x8
path: /heizung/paradigma path: heizung/paradigma
button_circulation: zigbee2mqtt/WirelessButton button_circulation: zigbee2mqtt/WirelessButton
influxdb: influxdb:
bucket: Paradigma bucket: Paradigma
@ -33,8 +33,8 @@ modbus_coils:
- SHKpres: {addr: 8, write: false, mqtt: false, influxdb: false, comment: Schwimmbadheizkrei vorhanden} - SHKpres: {addr: 8, write: false, mqtt: false, influxdb: false, comment: Schwimmbadheizkrei vorhanden}
modbus_input_register: modbus_input_register:
- TA: {addr: 0, type: INT16, factor: 0.1, mqtt: true, influxdb: true} - TA: {addr: 0, type: INT16, factor: 0.1, mqtt: true, influxdb: true}
- TV: {addr: 1, type: INT16, factor: 0.1, mqtt: false, influxdb: true} - TV: {addr: 1, type: INT16, factor: 0.1, mqtt: true, influxdb: true}
- TR: {addr: 2, type: INT16, factor: 0.1, mqtt: false, influxdb: true} - TR: {addr: 2, type: INT16, factor: 0.1, mqtt: true, influxdb: true}
- TWO: {addr: 3, type: INT16, factor: 0.1, mqtt: false, influxdb: true} - TWO: {addr: 3, type: INT16, factor: 0.1, mqtt: false, influxdb: true}
- TPO: {addr: 4, type: INT16, factor: 0.1, mqtt: false, influxdb: true} - TPO: {addr: 4, type: INT16, factor: 0.1, mqtt: false, influxdb: true}
- TPU: {addr: 5, type: INT16, factor: 0.1, mqtt: false, influxdb: true} - TPU: {addr: 5, type: INT16, factor: 0.1, mqtt: false, influxdb: true}
@ -98,7 +98,7 @@ modbus_holding_register:
- ErrWE1_4: {addr: 17, type: UINT16, factor: 1, mqtt: false, influxdb: false} - ErrWE1_4: {addr: 17, type: UINT16, factor: 1, mqtt: false, influxdb: false}
- ErrWE1_5: {addr: 18, type: UINT16, factor: 1, mqtt: false, influxdb: false} - ErrWE1_5: {addr: 18, type: UINT16, factor: 1, mqtt: false, influxdb: false}
- KollLei: {addr: 19, type: UINT16, factor: 0.1, mqtt: false, influxdb: true} - KollLei: {addr: 19, type: UINT16, factor: 0.1, mqtt: false, influxdb: true}
- TagesS: {addr: 20, type: UINT16, factor: 0.1, mqtt: false, influxdb: true} - TagesS: {addr: 20, type: UINT16, factor: 0.1, mqtt: true, influxdb: true}
- GesS: {addr: 21, type: UINT32, factor: 0.1, mqtt: false, influxdb: true} - GesS: {addr: 21, type: UINT32, factor: 0.1, mqtt: false, influxdb: true}
- GesWW: {addr: 23, type: UINT32, factor: 0.1, mqtt: false, influxdb: true} - GesWW: {addr: 23, type: UINT32, factor: 0.1, mqtt: false, influxdb: true}
- GesZ: {addr: 25, type: UINT32, factor: 0.1, mqtt: false, influxdb: true} - GesZ: {addr: 25, type: UINT32, factor: 0.1, mqtt: false, influxdb: true}

View File

@ -3,7 +3,7 @@ use tera::Tera;
use crate::config::{AppConfig, ModbusValueMaps}; use crate::config::{AppConfig, ModbusValueMaps};
pub struct AppState { pub struct AppState {
pub config: Mutex<AppConfig>, pub config: Arc<Mutex<AppConfig>>,
pub value_maps: Arc<Mutex<ModbusValueMaps>>, pub value_maps: Arc<Mutex<ModbusValueMaps>>,
pub templates: Tera, pub templates: Tera,
} }
@ -11,9 +11,10 @@ pub struct AppState {
impl AppState { impl AppState {
pub fn load_from_conf(conf_path: &str) -> Self { pub fn load_from_conf(conf_path: &str) -> Self {
let conf_str = std::fs::read_to_string(conf_path).expect("Config-Datei konnte nicht gelesen werden"); let conf_str = std::fs::read_to_string(conf_path).expect("Config-Datei konnte nicht gelesen werden");
let config: AppConfig = serde_yaml::from_str(&conf_str).expect("Config-Deserialisierung fehlgeschlagen");
let value_maps = Arc::new(Mutex::new(ModbusValueMaps::from_config(&config))); let config: AppConfig = serde_yaml::from_str(&conf_str).expect("Config-Deserialisierung fehlgeschlagen");
let config = Arc::new(Mutex::new(config));
let value_maps = Arc::new(Mutex::new(ModbusValueMaps::from_config(&config.lock().unwrap())));
let tera = match Tera::new("templates/**/*") { let tera = match Tera::new("templates/**/*") {
Ok(t) => t, Ok(t) => t,
@ -24,7 +25,7 @@ impl AppState {
}; };
AppState { AppState {
config: Mutex::new(config), config,
value_maps, value_maps,
templates: tera, templates: tera,
} }

View File

@ -9,6 +9,7 @@ use serde::{Serialize, Deserialize};
mod config; mod config;
mod modbus; mod modbus;
mod app_state; mod app_state;
mod mqtt;
pub mod modbus_types; pub mod modbus_types;
use crate::config::{AppConfig, ModbusValueMaps}; use crate::config::{AppConfig, ModbusValueMaps};
use crate::modbus_types::{ModbusInputRegisterConfig, ModbusHoldingRegisterConfig, ModbusCoilsConfig}; use crate::modbus_types::{ModbusInputRegisterConfig, ModbusHoldingRegisterConfig, ModbusCoilsConfig};
@ -247,6 +248,12 @@ async fn main() -> std::io::Result<()> {
); );
} }
// Starte MQTT-Thread
{
let value_maps = Arc::clone(&app_state.value_maps);
mqtt::start_mqtt_thread(Arc::clone(&app_state.config), value_maps);
}
println!("Server läuft auf http://0.0.0.0:8080"); println!("Server läuft auf http://0.0.0.0:8080");
HttpServer::new(move || { HttpServer::new(move || {

101
src/mqtt.rs Normal file
View File

@ -0,0 +1,101 @@
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use crate::config::{AppConfig, ModbusValueMaps};
use rumqttc::{MqttOptions, Client, QoS};
pub fn start_mqtt_thread(config: Arc<Mutex<AppConfig>>, values: Arc<Mutex<ModbusValueMaps>>) {
let mqtt_config = {
let cfg = config.lock().unwrap();
cfg.mqtt.clone()
};
let broker = if mqtt_config.broker.is_empty() { "localhost".to_string() } else { mqtt_config.broker.clone() };
let port = if mqtt_config.port == 0 { 1883 } else { mqtt_config.port };
let path = if let Some(ref p) = mqtt_config.path { p.clone() } else { "paramod/values".to_string() };
let user = mqtt_config.user.clone().unwrap_or_default();
let password = mqtt_config.password.clone().unwrap_or_default();
let user_is_empty = user.is_empty();
thread::spawn(move || {
let mut mqttoptions = MqttOptions::new("paramod-client", broker, port);
if !user_is_empty {
mqttoptions.set_credentials(user, password);
}
let (mut client, mut connection) = Client::new(mqttoptions, 10);
loop {
{
let values = values.lock().unwrap();
// Input Register
for (name, val) in &values.modbus_input_register_values {
if let Some(v) = val {
if should_publish(&config, name, "input_register") {
let topic = format!("{}/{}", path, name);
let payload = format!("{}", v);
let _ = client.publish(topic, QoS::AtLeastOnce, false, payload);
}
}
}
// Holding Register
for (name, val) in &values.modbus_holding_register_values {
if let Some(v) = val {
if should_publish(&config, name, "holding_register") {
let topic = format!("{}/{}", path, name);
let payload = format!("{}", v);
let _ = client.publish(topic, QoS::AtLeastOnce, false, payload);
}
}
}
// Coils
for (name, val) in &values.modbus_coils_values {
if let Some(v) = val {
if should_publish(&config, name, "coils") {
let topic = format!("{}/{}", path, name);
let payload = format!("{}", v);
let _ = client.publish(topic, QoS::AtLeastOnce, false, payload);
}
}
}
}
// Handle MQTT connection events
for _event in connection.iter().take(1) {
// Optionally log or handle events
}
thread::sleep(Duration::from_secs(5));
}
});
}
fn should_publish(config: &Arc<Mutex<AppConfig>>, name: &str, reg_type: &str) -> bool {
let cfg = config.lock().unwrap();
match reg_type {
"input_register" => {
if let Some(regs) = &cfg.modbus_input_register {
for map in regs {
if let Some(reg) = map.get(name) {
return reg.mqtt.unwrap_or(false);
}
}
}
}
"holding_register" => {
if let Some(regs) = &cfg.modbus_holding_register {
for map in regs {
if let Some(reg) = map.get(name) {
return reg.mqtt.unwrap_or(false);
}
}
}
}
"coils" => {
if let Some(coils) = &cfg.modbus_coils {
for map in coils {
if let Some(coil) = map.get(name) {
return coil.mqtt.unwrap_or(false);
}
}
}
}
_ => {}
}
false
}