paramod-rust/src/influx.rs

113 lines
4.4 KiB
Rust

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use crate::config::{AppConfig, ModbusValueMaps};
use influxdb::{Client, WriteQuery, Timestamp};
use chrono::Utc;
use tokio::runtime::Runtime;
pub fn start_influx_thread(config: Arc<Mutex<AppConfig>>, values: Arc<Mutex<ModbusValueMaps>>) {
let influx_config = {
let cfg = config.lock().unwrap();
cfg.influxdb.clone()
};
let url = influx_config.url.clone();
let bucket = influx_config.bucket.clone();
let _org = influx_config.org.clone();
let token = influx_config.token.clone();
let measurement = influx_config.measurement.clone().unwrap_or_else(|| "modbus".to_string());
let location = influx_config.location.clone().unwrap_or_else(|| "default".to_string());
thread::spawn(move || {
let client = Client::new(url, bucket).with_token(token);
let rt = Runtime::new().unwrap();
loop {
let mut points = Vec::new();
{
let values = values.lock().unwrap();
// Input Register
for (name, val) in &values.modbus_input_register_values {
if let Some(v) = val {
if should_write(&config, name, "input_register") {
let point = WriteQuery::new(Timestamp::from(Utc::now()), measurement.clone())
.add_field(name, *v)
.add_tag("type", "input_register")
.add_tag("location", location.as_str())
.to_owned();
points.push(point);
}
}
}
// Holding Register
for (name, val) in &values.modbus_holding_register_values {
if let Some(v) = val {
if should_write(&config, name, "holding_register") {
let point = WriteQuery::new(Timestamp::from(Utc::now()), measurement.clone())
.add_field(name, *v)
.add_tag("type", "holding_register")
.add_tag("location", location.as_str())
.to_owned();
points.push(point);
}
}
}
// Coils
for (name, val) in &values.modbus_coils_values {
if let Some(v) = val {
if should_write(&config, name, "coils") {
let point = WriteQuery::new(Timestamp::from(Utc::now()), measurement.clone())
.add_field(name, *v)
.add_tag("type", "coils")
.add_tag("location", location.as_str())
.to_owned();
points.push(point);
}
}
}
}
if !points.is_empty() {
let res = rt.block_on(client.query(points));
if let Err(e) = res {
eprintln!("Influx write error: {}", e);
}
}
thread::sleep(Duration::from_secs(10));
}
});
}
fn should_write(config: &Arc<Mutex<AppConfig>>, name: &str, reg_type: &str) -> bool {
let cfg = config.lock().unwrap();
match reg_type {
"input_register" => {
if let Some(regs) = &cfg.modbus_input_register {
for map in regs {
if let Some(reg) = map.get(name) {
return reg.influxdb.unwrap_or(false);
}
}
}
}
"holding_register" => {
if let Some(regs) = &cfg.modbus_holding_register {
for map in regs {
if let Some(reg) = map.get(name) {
return reg.influxdb.unwrap_or(false);
}
}
}
}
"coils" => {
if let Some(coils) = &cfg.modbus_coils {
for map in coils {
if let Some(coil) = map.get(name) {
return coil.influxdb.unwrap_or(false);
}
}
}
}
_ => {}
}
false
}