diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..7284ac4 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,97 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct MqttConfig { + pub broker: String, + pub port: u16, + pub user: Option, + pub password: Option, + pub button_circulation: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct InfluxConfig { + pub bucket: String, + pub org: String, + pub token: String, + pub url: String, + pub location: Option, + pub measurement: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ModbusRegisterConfig { + pub addr: u16, + pub r#type: Option, + pub factor: Option, + pub mqtt: Option, + pub influxdb: Option, + pub comment: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ModbusConfig { + pub host: String, + pub port: u16, + pub max_coils_addr: Option, + pub max_input_addr: Option, + pub max_holding_addr: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct DefaultConfig { + pub loglevel: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct AppConfig { + pub default: DefaultConfig, + pub mqtt: MqttConfig, + pub influxdb: InfluxConfig, + pub modbus: ModbusConfig, + pub modbus_coils: Option>>, + pub modbus_input_register: Option>>, + pub modbus_holding_register: Option>>, +} + +#[derive(Debug, Default, Clone)] +pub struct ModbusValueMaps { + pub modbus_coils_values: HashMap, + pub modbus_input_register_values: HashMap, + pub modbus_holding_register_values: HashMap, +} + +impl ModbusValueMaps { + pub fn from_config(config: &AppConfig) -> Self { + let mut coils = HashMap::new(); + if let Some(ref vec) = config.modbus_coils { + for map in vec { + for key in map.keys() { + coils.entry(key.clone()).or_insert(0.0); + } + } + } + let mut input = HashMap::new(); + if let Some(ref vec) = config.modbus_input_register { + for map in vec { + for key in map.keys() { + input.entry(key.clone()).or_insert(0.0); + } + } + } + let mut holding = HashMap::new(); + if let Some(ref vec) = config.modbus_holding_register { + for map in vec { + for key in map.keys() { + holding.entry(key.clone()).or_insert(0.0); + } + } + } + ModbusValueMaps { + modbus_coils_values: coils, + modbus_input_register_values: input, + modbus_holding_register_values: holding, + } + } +} diff --git a/src/main.rs b/src/main.rs index f4568f5..71afa10 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,79 +1,23 @@ + use actix_web::{web, App, HttpResponse, HttpServer, Result}; use actix_files as actix_fs; -use serde::{Deserialize, Serialize}; -use std::sync::Mutex; +use std::sync::{Mutex, Arc}; use tera::{Context, Tera}; -// use config::{Config, File}; use std::fs; use serde_yaml; use std::collections::HashMap; +use serde::{Serialize, Deserialize}; +mod config; +mod modbus; +use crate::config::{AppConfig, ModbusConfig, ModbusRegisterConfig, DefaultConfig, MqttConfig, InfluxConfig, ModbusValueMaps}; -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct MqttConfig { - pub broker: String, - pub port: u16, - pub user: Option, - pub password: Option, - pub button_circulation: Option, -} -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct InfluxConfig { - pub bucket: String, - pub org: String, - pub token: String, - pub url: String, - pub location: Option, - pub measurement: Option, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct ModbusRegisterConfig { - pub addr: u16, - pub r#type: Option, - pub factor: Option, - pub mqtt: Option, - pub influxdb: Option, - pub comment: Option, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct ModbusConfig { - pub host: String, - pub port: u16, - pub max_coils_addr: Option, - pub max_input_addr: Option, - pub max_holding_addr: Option, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct DefaultConfig { - pub loglevel: Option, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct AppConfig { - pub default: DefaultConfig, - pub mqtt: MqttConfig, - pub influxdb: InfluxConfig, - pub modbus: ModbusConfig, - pub modbus_coils: Option>>, - pub modbus_input_register: Option>>, - pub modbus_holding_register: Option>>, -} - -/// Werthaltung für berechnete Modbus-Werte -#[derive(Debug, Default, Clone)] -pub struct ModbusValueMaps { - pub modbus_coils_values: std::collections::HashMap, - pub modbus_input_register_values: std::collections::HashMap, - pub modbus_holding_register_values: std::collections::HashMap, -} +// ...existing code... struct AppState { config: Mutex, - value_maps: Mutex, + value_maps: Arc>, templates: Tera, } @@ -82,7 +26,7 @@ impl AppState { let conf_str = std::fs::read_to_string(conf_path).expect("Config-Datei konnte nicht gelesen werden"); let config: AppConfig = serde_yaml::from_str(&conf_str).expect("Config-Deserialisierung fehlgeschlagen"); - let value_maps = Mutex::new(ModbusValueMaps::from_config(&config)); + let value_maps = Arc::new(Mutex::new(ModbusValueMaps::from_config(&config))); let tera = match Tera::new("templates/**/*") { Ok(t) => t, @@ -100,39 +44,8 @@ impl AppState { } } -impl ModbusValueMaps { - pub fn from_config(config: &AppConfig) -> Self { - let mut coils = HashMap::new(); - if let Some(ref vec) = config.modbus_coils { - for map in vec { - for key in map.keys() { - coils.entry(key.clone()).or_insert(0.0); - } - } - } - let mut input = HashMap::new(); - if let Some(ref vec) = config.modbus_input_register { - for map in vec { - for key in map.keys() { - input.entry(key.clone()).or_insert(0.0); - } - } - } - let mut holding = HashMap::new(); - if let Some(ref vec) = config.modbus_holding_register { - for map in vec { - for key in map.keys() { - holding.entry(key.clone()).or_insert(0.0); - } - } - } - ModbusValueMaps { - modbus_coils_values: coils, - modbus_input_register_values: input, - modbus_holding_register_values: holding, - } - } -} + +// ...existing code... async fn index(data: web::Data) -> Result { let config = data.config.lock().unwrap(); @@ -274,9 +187,24 @@ async fn save_settings( #[actix_web::main] async fn main() -> std::io::Result<()> { + let config_path = "paramod.yaml"; let app_state = web::Data::new(AppState::load_from_conf(config_path)); + // Starte Modbus-Polling-Thread + { + let config = app_state.config.lock().unwrap().clone(); + let value_maps = Arc::clone(&app_state.value_maps); + modbus::start_modbus_polling_thread( + &config.modbus, + &config.modbus_input_register, + &config.modbus_holding_register, + &config.modbus_coils, + value_maps, + std::time::Duration::from_secs(2), // Poll-Intervall + ); + } + println!("Server läuft auf http://0.0.0.0:8080"); HttpServer::new(move || { diff --git a/src/modbus.rs b/src/modbus.rs new file mode 100644 index 0000000..6fc4112 --- /dev/null +++ b/src/modbus.rs @@ -0,0 +1,69 @@ +use crate::config::{ModbusValueMaps, ModbusRegisterConfig, ModbusConfig}; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::Duration; + +/// Startet einen Thread, der zyklisch Modbus-Register abfragt und ModbusValueMaps aktualisiert +pub fn start_modbus_polling_thread( + modbus_config: &ModbusConfig, + input_registers: &Option>>, + holding_registers: &Option>>, + coils: &Option>>, + value_maps: Arc>, + poll_interval: Duration, +) { + let input_registers = input_registers.clone(); + let holding_registers = holding_registers.clone(); + let coils = coils.clone(); + thread::spawn(move || { + loop { + // Input Register + if let Some(ref vec) = input_registers { + for map in vec { + for (key, reg) in map { + let typ = reg.r#type.as_deref().unwrap_or(""); + let value = match typ { + "INT16" => { let raw: i16 = 0; raw as f64 }, + "UINT16" => { let raw: u16 = 0; raw as f64 }, + "UINT32" => { let raw1: u16 = 0; let raw2: u16 = 0; let raw32: u32 = ((raw1 as u32) << 16) | (raw2 as u32); raw32 as f64 }, + _ => 0.0, + }; + if let Ok(mut maps) = value_maps.lock() { + maps.modbus_input_register_values.insert(key.clone(), value); + } + } + } + } + // Holding Register + if let Some(ref vec) = holding_registers { + for map in vec { + for (key, reg) in map { + let typ = reg.r#type.as_deref().unwrap_or(""); + let value = match typ { + "INT16" => { let raw: i16 = 0; raw as f64 }, + "UINT16" => { let raw: u16 = 0; raw as f64 }, + "UINT32" => { let raw1: u16 = 0; let raw2: u16 = 0; let raw32: u32 = ((raw1 as u32) << 16) | (raw2 as u32); raw32 as f64 }, + _ => 0.0, + }; + if let Ok(mut maps) = value_maps.lock() { + maps.modbus_holding_register_values.insert(key.clone(), value); + } + } + } + } + // Coils + if let Some(ref vec) = coils { + for map in vec { + for (key, _reg) in map { + let value = 0.0; // Hier Coil-Wert abfragen + if let Ok(mut maps) = value_maps.lock() { + maps.modbus_coils_values.insert(key.clone(), value); + } + } + } + } + thread::sleep(poll_interval); + } + }); +} diff --git a/src/modbus_thread.rs b/src/modbus_thread.rs deleted file mode 100644 index 7d3460e..0000000 --- a/src/modbus_thread.rs +++ /dev/null @@ -1,100 +0,0 @@ -use std::collections::HashMap; -use std::sync::{Arc, Mutex}; -use std::time::Duration; -use tokio::sync::Notify; -use tokio::task::JoinHandle; -use tokio_modbus::client::tcp; -use tokio_modbus::prelude::*; -use log::{info, error, debug}; - -// Platzhalter für Konfiguration -#[derive(Clone)] -pub struct ModbusConfig { - pub modbus_host: String, - pub modbus_port: u16, - pub modbus_max_holding_addr: u16, - pub modbus_max_input_addr: u16, - // Register-Konfigurationen als Map: key -> (addr, type, factor, mqtt, influxdb) - pub modbus_input_register: HashMap, - pub modbus_holding_register: HashMap, -} - -#[derive(Clone)] -pub struct RegisterConfig { - pub addr: u16, - pub reg_type: String, // z.B. "INT16", "UINT16" - pub factor: f64, - pub mqtt: bool, - pub influxdb: bool, -} - -const NONSET_INT16: i16 = -32768; -const NONSET_UINT16: u16 = 65535; - -pub struct ModbusThread { - config: ModbusConfig, - timeout: Duration, - polling_interval: Duration, - min_write_interval: Duration, - running: Arc, - stop_notify: Arc, - // Register-Cache - input_register: Arc>>, - holding_register: Arc>>, - // Schreib-Queue - write_holding_register: Arc>>, - // Async-Task-Handle - handle: Option>, -} - -impl ModbusThread { - pub fn new(config: ModbusConfig, timeout: f64, polling_interval: f64, min_write_interval: f64) -> Self { - Self { - config, - timeout: Duration::from_secs_f64(timeout), - polling_interval: Duration::from_secs_f64(polling_interval), - min_write_interval: Duration::from_secs_f64(min_write_interval), - running: Arc::new(Notify::new()), - stop_notify: Arc::new(Notify::new()), - input_register: Arc::new(Mutex::new(Vec::new())), - holding_register: Arc::new(Mutex::new(Vec::new())), - write_holding_register: Arc::new(Mutex::new(HashMap::new())), - handle: None, - } - } - - pub fn start(&mut self) { - let config = self.config.clone(); - let timeout = self.timeout; - let polling_interval = self.polling_interval; - let min_write_interval = self.min_write_interval; - let running = self.running.clone(); - let stop_notify = self.stop_notify.clone(); - let input_register = self.input_register.clone(); - let holding_register = self.holding_register.clone(); - let write_holding_register = self.write_holding_register.clone(); - - self.handle = Some(tokio::spawn(async move { - let mut write_countdown = min_write_interval; - let socket_addr = format!("{}:{}", config.modbus_host, config.modbus_port); - loop { - // Stop-Check - if stop_notify.is_notified() { - break; - } - // Connect - let mut ctx = match tcp::connect(socket_addr.clone()).await { - Ok(c) => { - info!("Modbus: Verbindung hergestellt"); - c - } - Err(e) => { - error!("Modbus: Verbindungsfehler: {:?}", e); - tokio::time::sleep(Duration::from_secs(2)).await; - continue; - } - }; - // Polling-Loop - loop { - if stop_notify.is_notified() { - let _ = \ No newline at end of file