NorthStar-Endurance-TestBench/EnduranceTestBench HMI/logger.py

48 lines
1.7 KiB
Python

import csv
import os
import threading
from datetime import datetime
LOG_INTERVAL_SECONDS = 0.5
VALVE_IDS = range(5, 25)
BASE_LOG_DIR = "logs"
def start_per_valve_logger(data_mgr):
def log_data():
threading.Timer(LOG_INTERVAL_SECONDS, log_data).start()
try:
now = datetime.now()
timestamp_str = now.strftime("%Y-%m-%d %H:%M:%S")
date_folder = now.strftime("%Y-%m-%d")
valve_data = data_mgr.get_valve_data()
flow = data_mgr.get_flow_data()
pressure = data_mgr.get_pressure_data()
# Aggregate flow and pressure
total_flow = sum(flow.get(f"0x{i:02X}", 0) for i in range(1, 5)) / 100.0
total_pressure = sum(pressure.get(f"0x{i:02X}", 0) for i in range(1, 4)) / 100.0
# Create daily folder
log_dir = os.path.join(BASE_LOG_DIR, date_folder)
os.makedirs(log_dir, exist_ok=True)
for node_id in VALVE_IDS:
valve = valve_data.get(node_id, {})
setpoint = valve.get("setpoint", 0)
feedback = valve.get("feedback", 0)
filepath = os.path.join(log_dir, f"Node{node_id:02}.csv")
is_new_file = not os.path.exists(filepath)
with open(filepath, "a", newline="") as f:
writer = csv.writer(f)
if is_new_file:
writer.writerow(["Timestamp", "Setpoint", "Feedback", "Flow (L/h)", "Pressure (bar)"])
writer.writerow([timestamp_str, setpoint, feedback, total_flow, total_pressure])
except Exception as e:
print(f"[LOG ERROR] {e}")
log_data()