diff --git a/Cargo.lock b/Cargo.lock index 43c1bff..1ab8af3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -486,6 +486,16 @@ dependencies = [ "version_check", ] +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -648,6 +658,17 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" +dependencies = [ + "futures-core", + "futures-sink", + "spin", +] + [[package]] name = "fnv" version = "1.0.7" @@ -698,6 +719,7 @@ dependencies = [ "futures-task", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -1177,6 +1199,12 @@ version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +[[package]] +name = "openssl-probe" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" + [[package]] name = "ordered-multimap" version = "0.7.3" @@ -1481,6 +1509,20 @@ version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" +[[package]] +name = "ring" +version = "0.17.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" +dependencies = [ + "cc", + "cfg-if", + "getrandom 0.2.16", + "libc", + "untrusted", + "windows-sys 0.52.0", +] + [[package]] name = "ron" version = "0.8.1" @@ -1493,6 +1535,24 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "rumqttc" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d8941c6791801b667d52bfe9ff4fc7c968d4f3f9ae8ae7abdaaa1c966feafc8" +dependencies = [ + "bytes", + "flume", + "futures-util", + "log", + "rustls-native-certs", + "rustls-pemfile", + "rustls-webpki", + "thiserror", + "tokio", + "tokio-rustls", +] + [[package]] name = "rust-ini" version = "0.20.0" @@ -1503,6 +1563,49 @@ dependencies = [ "ordered-multimap", ] +[[package]] +name = "rustls" +version = "0.21.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" +dependencies = [ + "log", + "ring", + "rustls-webpki", + "sct", +] + +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" +dependencies = [ + "base64 0.21.7", +] + +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.22" @@ -1524,12 +1627,54 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schannel" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "891d81b926048e76efe18581bf793546b4c0eaf8448d72be8de2bbee5fd166e1" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring", + "untrusted", +] + +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "321c8673b092a9a42605034a9879d73cb79101ed5fd117bc9a597b89b4e9e61a" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "serde" version = "1.0.228" @@ -1698,6 +1843,15 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "stable_deref_trait" version = "1.2.1" @@ -1733,6 +1887,7 @@ dependencies = [ "actix-files", "actix-web", "config", + "rumqttc", "serde", "serde_derive", "serde_json", @@ -1765,6 +1920,26 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "time" version = "0.3.44" @@ -1859,6 +2034,16 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.17" @@ -1987,6 +2172,12 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.5.7" diff --git a/Cargo.toml b/Cargo.toml index 990de34..61ac67a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,3 +17,5 @@ config = "0.14" toml = "0.8" serde_derive = "1.0" tokio-modbus = "0.7" + +rumqttc = "0.23" diff --git a/paramod.yaml b/paramod.yaml index 9f750fd..5dea048 100644 --- a/paramod.yaml +++ b/paramod.yaml @@ -6,7 +6,7 @@ mqtt: port: 1883 user: admin password: 97sm3pHNSMZ4M5qUj0x8 - path: /heizung/paradigma + path: heizung/paradigma button_circulation: zigbee2mqtt/WirelessButton influxdb: bucket: Paradigma @@ -33,8 +33,8 @@ modbus_coils: - SHKpres: {addr: 8, write: false, mqtt: false, influxdb: false, comment: Schwimmbadheizkrei vorhanden} modbus_input_register: - TA: {addr: 0, type: INT16, factor: 0.1, mqtt: true, influxdb: true} - - TV: {addr: 1, type: INT16, factor: 0.1, mqtt: false, influxdb: true} - - TR: {addr: 2, type: INT16, factor: 0.1, mqtt: false, influxdb: true} + - TV: {addr: 1, type: INT16, factor: 0.1, mqtt: true, influxdb: true} + - TR: {addr: 2, type: INT16, factor: 0.1, mqtt: true, influxdb: true} - TWO: {addr: 3, type: INT16, factor: 0.1, mqtt: false, influxdb: true} - TPO: {addr: 4, type: INT16, factor: 0.1, mqtt: false, influxdb: true} - TPU: {addr: 5, type: INT16, factor: 0.1, mqtt: false, influxdb: true} @@ -98,7 +98,7 @@ modbus_holding_register: - ErrWE1_4: {addr: 17, type: UINT16, factor: 1, mqtt: false, influxdb: false} - ErrWE1_5: {addr: 18, type: UINT16, factor: 1, mqtt: false, influxdb: false} - KollLei: {addr: 19, type: UINT16, factor: 0.1, mqtt: false, influxdb: true} - - TagesS: {addr: 20, type: UINT16, factor: 0.1, mqtt: false, influxdb: true} + - TagesS: {addr: 20, type: UINT16, factor: 0.1, mqtt: true, influxdb: true} - GesS: {addr: 21, type: UINT32, factor: 0.1, mqtt: false, influxdb: true} - GesWW: {addr: 23, type: UINT32, factor: 0.1, mqtt: false, influxdb: true} - GesZ: {addr: 25, type: UINT32, factor: 0.1, mqtt: false, influxdb: true} diff --git a/src/app_state.rs b/src/app_state.rs index 098ae38..c3cb70b 100644 --- a/src/app_state.rs +++ b/src/app_state.rs @@ -3,7 +3,7 @@ use tera::Tera; use crate::config::{AppConfig, ModbusValueMaps}; pub struct AppState { - pub config: Mutex, + pub config: Arc>, pub value_maps: Arc>, pub templates: Tera, } @@ -11,9 +11,10 @@ pub struct AppState { impl AppState { pub fn load_from_conf(conf_path: &str) -> Self { let conf_str = std::fs::read_to_string(conf_path).expect("Config-Datei konnte nicht gelesen werden"); - let config: AppConfig = serde_yaml::from_str(&conf_str).expect("Config-Deserialisierung fehlgeschlagen"); - let value_maps = Arc::new(Mutex::new(ModbusValueMaps::from_config(&config))); + let config: AppConfig = serde_yaml::from_str(&conf_str).expect("Config-Deserialisierung fehlgeschlagen"); + let config = Arc::new(Mutex::new(config)); + let value_maps = Arc::new(Mutex::new(ModbusValueMaps::from_config(&config.lock().unwrap()))); let tera = match Tera::new("templates/**/*") { Ok(t) => t, @@ -24,7 +25,7 @@ impl AppState { }; AppState { - config: Mutex::new(config), + config, value_maps, templates: tera, } diff --git a/src/main.rs b/src/main.rs index 6cb2e5c..13faf1e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,7 @@ use serde::{Serialize, Deserialize}; mod config; mod modbus; mod app_state; +mod mqtt; pub mod modbus_types; use crate::config::{AppConfig, ModbusValueMaps}; use crate::modbus_types::{ModbusInputRegisterConfig, ModbusHoldingRegisterConfig, ModbusCoilsConfig}; @@ -247,6 +248,12 @@ async fn main() -> std::io::Result<()> { ); } + // Starte MQTT-Thread + { + let value_maps = Arc::clone(&app_state.value_maps); + mqtt::start_mqtt_thread(Arc::clone(&app_state.config), value_maps); + } + println!("Server läuft auf http://0.0.0.0:8080"); HttpServer::new(move || { diff --git a/src/mqtt.rs b/src/mqtt.rs new file mode 100644 index 0000000..44c4357 --- /dev/null +++ b/src/mqtt.rs @@ -0,0 +1,101 @@ +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::Duration; +use crate::config::{AppConfig, ModbusValueMaps}; +use rumqttc::{MqttOptions, Client, QoS}; + +pub fn start_mqtt_thread(config: Arc>, values: Arc>) { + let mqtt_config = { + let cfg = config.lock().unwrap(); + cfg.mqtt.clone() + }; + let broker = if mqtt_config.broker.is_empty() { "localhost".to_string() } else { mqtt_config.broker.clone() }; + let port = if mqtt_config.port == 0 { 1883 } else { mqtt_config.port }; + let path = if let Some(ref p) = mqtt_config.path { p.clone() } else { "paramod/values".to_string() }; + let user = mqtt_config.user.clone().unwrap_or_default(); + let password = mqtt_config.password.clone().unwrap_or_default(); + let user_is_empty = user.is_empty(); + + thread::spawn(move || { + let mut mqttoptions = MqttOptions::new("paramod-client", broker, port); + if !user_is_empty { + mqttoptions.set_credentials(user, password); + } + let (mut client, mut connection) = Client::new(mqttoptions, 10); + loop { + { + let values = values.lock().unwrap(); + // Input Register + for (name, val) in &values.modbus_input_register_values { + if let Some(v) = val { + if should_publish(&config, name, "input_register") { + let topic = format!("{}/{}", path, name); + let payload = format!("{}", v); + let _ = client.publish(topic, QoS::AtLeastOnce, false, payload); + } + } + } + // Holding Register + for (name, val) in &values.modbus_holding_register_values { + if let Some(v) = val { + if should_publish(&config, name, "holding_register") { + let topic = format!("{}/{}", path, name); + let payload = format!("{}", v); + let _ = client.publish(topic, QoS::AtLeastOnce, false, payload); + } + } + } + // Coils + for (name, val) in &values.modbus_coils_values { + if let Some(v) = val { + if should_publish(&config, name, "coils") { + let topic = format!("{}/{}", path, name); + let payload = format!("{}", v); + let _ = client.publish(topic, QoS::AtLeastOnce, false, payload); + } + } + } + } + // Handle MQTT connection events + for _event in connection.iter().take(1) { + // Optionally log or handle events + } + thread::sleep(Duration::from_secs(5)); + } + }); +} + +fn should_publish(config: &Arc>, name: &str, reg_type: &str) -> bool { + let cfg = config.lock().unwrap(); + match reg_type { + "input_register" => { + if let Some(regs) = &cfg.modbus_input_register { + for map in regs { + if let Some(reg) = map.get(name) { + return reg.mqtt.unwrap_or(false); + } + } + } + } + "holding_register" => { + if let Some(regs) = &cfg.modbus_holding_register { + for map in regs { + if let Some(reg) = map.get(name) { + return reg.mqtt.unwrap_or(false); + } + } + } + } + "coils" => { + if let Some(coils) = &cfg.modbus_coils { + for map in coils { + if let Some(coil) = map.get(name) { + return coil.mqtt.unwrap_or(false); + } + } + } + } + _ => {} + } + false +}