Compare commits

...

4 Commits

8 changed files with 533 additions and 126 deletions

169
main.py
View File

@ -5,9 +5,7 @@ import logging
import os import os
from fastapi import Request, APIRouter from fastapi import Request, APIRouter
import platform import platform
from fastapi.templating import ( from fastapi.templating import Jinja2Templates
Jinja2Templates,
) # pip install fastapi uvicorn jinja2 python-multipart passlib
from starlette.middleware.sessions import SessionMiddleware from starlette.middleware.sessions import SessionMiddleware
from starlette.exceptions import HTTPException as StarletteHTTPException from starlette.exceptions import HTTPException as StarletteHTTPException
from starlette.status import HTTP_302_FOUND from starlette.status import HTTP_302_FOUND
@ -23,15 +21,16 @@ import numpy as np
import aiohttp import aiohttp
import httpx import httpx
from serial_manager import SerialConfig, SerialStore, SerialReader
from protocol_decoder import decode_frames
from serial_csv_logger import SerialCsvLogger # <-- CSV logger
if platform.system() in ["Darwin"]: # macOS or Windows if platform.system() in ["Darwin"]: # macOS or Windows
from MockCAN import CANBackend from MockCAN import CANBackend
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
else: else:
from classCAN import CANBackend # Your real backend from classCAN import CANBackend # Your real backend
logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("uvicorn.access").setLevel(logging.WARNING) logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
@ -67,10 +66,12 @@ write_buffer = deque()
flush_interval = 1.0 # flush every 1 second flush_interval = 1.0 # flush every 1 second
last_flush_time = datetime.datetime.now() last_flush_time = datetime.datetime.now()
# ---- Serial intake globals ----
serial_store = SerialStore(capacity=5000)
serial_reader: SerialReader | None = None
serial_csv: SerialCsvLogger | None = None # <-- added
## LOGGING ## LOGGING
def format_PU_data(data): def format_PU_data(data):
return { return {
"timestamp": datetime.datetime.now().isoformat(), "timestamp": datetime.datetime.now().isoformat(),
@ -78,8 +79,7 @@ def format_PU_data(data):
"Qdilute": np.round(data.get("FM1", 0.0), 1), "Qdilute": np.round(data.get("FM1", 0.0), 1),
"Qdrain": np.round(data.get("FM4", 0.0), 1), "Qdrain": np.round(data.get("FM4", 0.0), 1),
"Qrecirc": np.round(data.get("FM3", 0.0), 1), "Qrecirc": np.round(data.get("FM3", 0.0), 1),
"QdrainEDI": np.round(data.get("FM2", 0.0), 1) "QdrainEDI": np.round(data.get("FM2", 0.0), 1) - np.round(data.get("FM1", 0.0), 1),
- np.round(data.get("FM1", 0.0), 1),
"Pro": np.round(data.get("PS2", 0.0), 2), "Pro": np.round(data.get("PS2", 0.0), 2),
"Pdilute": np.round(data.get("PS3", 0.0), 2), "Pdilute": np.round(data.get("PS3", 0.0), 2),
"Pretentate": np.round(data.get("PS1", 0.0), 2), "Pretentate": np.round(data.get("PS1", 0.0), 2),
@ -102,7 +102,6 @@ def format_PU_data(data):
"MV08_sp": np.round(data.get("MV08_sp", 0.0), 1), "MV08_sp": np.round(data.get("MV08_sp", 0.0), 1),
} }
def format_DS_data(data): def format_DS_data(data):
return { return {
"timestamp": datetime.datetime.now().isoformat(), "timestamp": datetime.datetime.now().isoformat(),
@ -112,10 +111,7 @@ def format_DS_data(data):
"Qoutlet": np.round(data.get("Outlet_flow", 0.0), 1), "Qoutlet": np.round(data.get("Outlet_flow", 0.0), 1),
} }
# CREDENTIALS # CREDENTIALS
# Load users from JSON file at startup
CREDENTIAL_PATH = Path("credentials.json") CREDENTIAL_PATH = Path("credentials.json")
if CREDENTIAL_PATH.exists(): if CREDENTIAL_PATH.exists():
with CREDENTIAL_PATH.open("r") as f: with CREDENTIAL_PATH.open("r") as f:
@ -126,7 +122,6 @@ else:
USERNAME = CREDENTIALS["username"] USERNAME = CREDENTIALS["username"]
PASSWORD = CREDENTIALS["password"] PASSWORD = CREDENTIALS["password"]
# ======== LOGIN & SESSION HANDLING ======== # ======== LOGIN & SESSION HANDLING ========
def require_login(request: Request): def require_login(request: Request):
user = request.session.get("user") user = request.session.get("user")
@ -135,12 +130,10 @@ def require_login(request: Request):
raise StarletteHTTPException(status_code=302, detail="Redirect to login") raise StarletteHTTPException(status_code=302, detail="Redirect to login")
return user return user
@app.get("/", response_class=HTMLResponse) @app.get("/", response_class=HTMLResponse)
def login_form(request: Request): def login_form(request: Request):
return templates.TemplateResponse("login.html", {"request": request}) return templates.TemplateResponse("login.html", {"request": request})
@app.post("/login") @app.post("/login")
def login(request: Request, username: str = Form(...), password: str = Form(...)): def login(request: Request, username: str = Form(...), password: str = Form(...)):
if username == USERNAME and password == PASSWORD: if username == USERNAME and password == PASSWORD:
@ -150,22 +143,46 @@ def login(request: Request, username: str = Form(...), password: str = Form(...)
"login.html", {"request": request, "error": "Invalid credentials.json"} "login.html", {"request": request, "error": "Invalid credentials.json"}
) )
@app.get("/logout") @app.get("/logout")
def logout(request: Request): def logout(request: Request):
request.session.clear() request.session.clear()
return RedirectResponse("/", status_code=HTTP_302_FOUND) return RedirectResponse("/", status_code=HTTP_302_FOUND)
# ======== PROTECTED INTERFACE / STARTUP-SHUTDOWN ========
# ======== PROTECTED INTERFACE ========
@app.on_event("startup") @app.on_event("startup")
async def startup_event(): async def startup_event():
# ----- CSV logger -----
global serial_csv
serial_csv = SerialCsvLogger(out_dir="serial_logs", rotate_daily=True)
# ----- start the serial reader -----
global serial_reader
cfg = SerialConfig(
port=os.getenv("SERIAL_PORT", "/dev/ttyUSB0"),
baudrate=int(os.getenv("SERIAL_BAUD", "115200")),
csv_log_path=None, # disable the generic CSV inside reader; use segregated logger instead
ring_capacity=int(os.getenv("SERIAL_RING", "5000")),
)
serial_reader = SerialReader(
cfg,
serial_store,
decoder=decode_frames,
on_message=(lambda p: serial_csv.log(p)) # write CSV per message type
)
serial_reader.start()
# ----- your existing tasks -----
asyncio.create_task(update_latest_data()) asyncio.create_task(update_latest_data())
asyncio.create_task(update_latest_flow()) asyncio.create_task(update_latest_flow())
@app.on_event("shutdown")
def _serial_stop():
if serial_reader:
serial_reader.stop()
if serial_csv:
serial_csv.close()
# ======== PAGES ========
@app.get("/control", response_class=HTMLResponse) @app.get("/control", response_class=HTMLResponse)
def control_page(request: Request): def control_page(request: Request):
can_backend.connect() can_backend.connect()
@ -173,28 +190,35 @@ def control_page(request: Request):
return RedirectResponse("/", status_code=HTTP_302_FOUND) return RedirectResponse("/", status_code=HTTP_302_FOUND)
return templates.TemplateResponse("control.html", {"request": request}) return templates.TemplateResponse("control.html", {"request": request})
@app.get("/monitor-DS", response_class=HTMLResponse) @app.get("/monitor-DS", response_class=HTMLResponse)
def monitor_page(request: Request): def monitor_page(request: Request):
with open("static/monitor_DS.html") as f: with open("static/monitor_DS.html") as f:
return HTMLResponse(f.read()) return HTMLResponse(f.read())
@app.get("/monitor-PU", response_class=HTMLResponse) @app.get("/monitor-PU", response_class=HTMLResponse)
def monitor_page(request: Request): def monitor_page(request: Request):
with open("static/monitor_PU.html") as f: with open("static/monitor_PU.html") as f:
return HTMLResponse(f.read()) return HTMLResponse(f.read())
@app.get("/multi-monitor-PU", response_class=HTMLResponse) @app.get("/multi-monitor-PU", response_class=HTMLResponse)
def monitor_page(request: Request): def monitor_page(request: Request):
with open("static/multi_pu_dashboard.html") as f: with open("static/multi_pu_dashboard.html") as f:
return HTMLResponse(f.read()) return HTMLResponse(f.read())
# ======== SERIAL API ========
@app.get("/serial/messages")
def serial_messages(n: int = 100):
return serial_store.latest(min(max(n, 1), 1000))
@app.get("/serial/stats")
def serial_stats():
return serial_store.stats()
@app.get("/serial/snapshot")
def serial_snapshot():
return serial_store.latest_by_id()
# ======== CAN + BACKEND ROUTES ======== # ======== CAN + BACKEND ROUTES ========
@app.post("/connect_toggle") @app.post("/connect_toggle")
def connect_toggle(): def connect_toggle():
logging.info(f"Toggling CAN connection, CAN is {can_backend.connected}") logging.info(f"Toggling CAN connection, CAN is {can_backend.connected}")
@ -209,15 +233,11 @@ def connect_toggle():
raise HTTPException(status_code=500, detail="Connection failed.") raise HTTPException(status_code=500, detail="Connection failed.")
return {"connected": can_backend.connected} return {"connected": can_backend.connected}
@app.get("/is_connected") @app.get("/is_connected")
def is_connected(): def is_connected():
return {"connected": can_backend.connected} return {"connected": can_backend.connected}
# PU CONTROL # PU CONTROL
@app.post("/command/{state}/pu/{pu_number}") @app.post("/command/{state}/pu/{pu_number}")
def send_command(state: str, pu_number: int, ploop_setpoint: float = Query(...)): def send_command(state: str, pu_number: int, ploop_setpoint: float = Query(...)):
global DEFAULT_FEED_VALVE global DEFAULT_FEED_VALVE
@ -238,6 +258,16 @@ def send_command(state: str, pu_number: int, ploop_setpoint: float = Query(...))
logging.info(f"Sending state '{state}' to PU {pu_number}") logging.info(f"Sending state '{state}' to PU {pu_number}")
if state == "IDLE":
set_patient_skid_users(0)
url = f"http://192.168.1.28:8000/stop_test"
response = httpx.get(url, timeout=1.0)
logging.info(f"Stopping test on Patient Skid: {response.status_code}")
url = f"http://192.168.1.28:8000/close_valves"
response = httpx.get(url, timeout=1.0)
logging.info(f"Closing valves on Patient Skid: {response.status_code}")
try: try:
can_backend.send_state_command(state, pu_number, ploop_setpoint) can_backend.send_state_command(state, pu_number, ploop_setpoint)
current_state = can_backend.read_current_state(pu_number) current_state = can_backend.read_current_state(pu_number)
@ -252,13 +282,7 @@ def send_command(state: str, pu_number: int, ploop_setpoint: float = Query(...))
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
if state == "IDLE":
set_patient_skid_users(0)
## MONITORING ## MONITORING
@app.get("/api/pu_status") @app.get("/api/pu_status")
def get_pu_status(): def get_pu_status():
global active_PUs global active_PUs
@ -278,13 +302,12 @@ def get_pu_status():
return JSONResponse(content=states) return JSONResponse(content=states)
async def update_latest_data(): async def update_latest_data():
global active_PUs global active_PUs
while True: while True:
# DS # DS
data = can_backend.get_latest_data(pu_number=0) data = can_backend.get_latest_data(pu_number=0)
latest_data[f"DS"] = format_DS_data(data) latest_data["DS"] = format_DS_data(data)
# PUs # PUs
for pu in active_PUs: for pu in active_PUs:
@ -294,13 +317,11 @@ async def update_latest_data():
logging.debug(f"[MONITOR DS BUFFER] latest_data: {latest_data}") logging.debug(f"[MONITOR DS BUFFER] latest_data: {latest_data}")
await asyncio.sleep(0.05) await asyncio.sleep(0.05)
@app.get("/monitor") @app.get("/monitor")
async def get_monitor_data(): async def get_monitor_data():
return latest_data return latest_data
# return JSONResponse(content=latest_data) # return JSONResponse(content=latest_data)
# LOCAL RECORDER # LOCAL RECORDER
@app.post("/start_recording") @app.post("/start_recording")
async def start_recording(): async def start_recording():
@ -315,11 +336,11 @@ async def start_recording():
filepath = os.path.join("recordings", filename) filepath = os.path.join("recordings", filename)
recording_file = open(filepath, "w", newline="") recording_file = open(filepath, "w", newline="")
fieldnames_common = ["timestamp", "pu", "QSkid"] fieldnames_common = ["timestamp", "pu", "QSkid"]
fieldnames_DS = list(format_DS_data({}).keys()) fieldnames_DS = list(format_DS_data({}).keys())
fieldnames_DS.pop(0) # .pop(0) removing extra timestamp fieldnames_DS.pop(0) # removing extra timestamp
fieldnames_PUs = list(format_PU_data({}).keys()) fieldnames_PUs = list(format_PU_data({}).keys())
fieldnames_PUs.pop(0) # .pop(0) removing extra timestamp fieldnames_PUs.pop(0) # removing extra timestamp
fieldnames = fieldnames_common + fieldnames_DS + fieldnames_PUs fieldnames = fieldnames_common + fieldnames_DS + fieldnames_PUs
@ -331,7 +352,6 @@ async def start_recording():
logging.info(f"[RECORDING STARTED] File: {filepath}") logging.info(f"[RECORDING STARTED] File: {filepath}")
return {"status": "recording started", "file": filename} return {"status": "recording started", "file": filename}
@app.post("/stop_recording") @app.post("/stop_recording")
async def stop_recording(): async def stop_recording():
global recording_flag, recording_task, recording_file global recording_flag, recording_task, recording_file
@ -351,7 +371,6 @@ async def stop_recording():
logging.info("[RECORDING STOPPED]") logging.info("[RECORDING STOPPED]")
return {"status": "recording stopped"} return {"status": "recording stopped"}
async def record_data_loop(): async def record_data_loop():
global recording_writer, recording_file, write_buffer, last_flush_time global recording_writer, recording_file, write_buffer, last_flush_time
@ -363,18 +382,13 @@ async def record_data_loop():
recording_writer.writerow(row) recording_writer.writerow(row)
# Flush every flush_interval seconds # Flush every flush_interval seconds
if ( if (datetime.datetime.now() - last_flush_time).total_seconds() >= flush_interval:
datetime.datetime.now() - last_flush_time
).total_seconds() >= flush_interval:
recording_file.flush() recording_file.flush()
last_flush_time = datetime.datetime.now() last_flush_time = datetime.datetime.now()
await asyncio.sleep(0.05) # 10 Hz await asyncio.sleep(0.05) # 10 Hz
## AUTOMATIC TESTING ## AUTOMATIC TESTING
async def send_command_with_delay( async def send_command_with_delay(
state: str, pu: int, delay_s: int = 0, ploop_setpoint: float = 0.0 state: str, pu: int, delay_s: int = 0, ploop_setpoint: float = 0.0
): ):
@ -382,15 +396,11 @@ async def send_command_with_delay(
logging.info(f"[AUTO TEST] Sending {state} to PU{pu} after {delay_s}s") logging.info(f"[AUTO TEST] Sending {state} to PU{pu} after {delay_s}s")
can_backend.send_state_command(state, pu, ploop_setpoint) can_backend.send_state_command(state, pu, ploop_setpoint)
async def set_patients_with_delay(count: int, delay_s: int): async def set_patients_with_delay(count: int, delay_s: int):
await asyncio.sleep(delay_s) await asyncio.sleep(delay_s)
logging.info( logging.info(f"[AUTO TEST] Sending {count} patients to patient skid after {delay_s}s")
f"[AUTO TEST] Sending {count} patients to patient skid after {delay_s}s"
)
set_patient_skid_users(count) set_patient_skid_users(count)
@router.post("/test/auto/1") @router.post("/test/auto/1")
async def auto_test_pu1(ploop_setpoint: float = Query(0.0)): async def auto_test_pu1(ploop_setpoint: float = Query(0.0)):
pu = 1 pu = 1
@ -398,52 +408,39 @@ async def auto_test_pu1(ploop_setpoint: float = Query(0.0)):
asyncio.create_task(run_auto_test_pu1(pu, ploop_setpoint)) asyncio.create_task(run_auto_test_pu1(pu, ploop_setpoint))
return {"status": "started", "pu": pu} return {"status": "started", "pu": pu}
@router.post("/test/auto/2") @router.post("/test/auto/2")
async def auto_test_pu2(ploop_setpoint: float = Query(0.0)): async def auto_test_pu2(ploop_setpoint: float = Query(0.0)):
logging.info("[AUTO TEST] Starting automatic test for 2 PUs") logging.info("[AUTO TEST] Starting automatic test for 2 PUs")
asyncio.create_task(run_auto_test_pu2(ploop_setpoint)) asyncio.create_task(run_auto_test_pu2(ploop_setpoint))
return {"status": "started", "pu": [1, 2]} return {"status": "started", "pu": [1, 2]}
async def run_auto_test_pu1(pu: int, ploop_setpoint: float): async def run_auto_test_pu1(pu: int, ploop_setpoint: float):
await send_command_with_delay( await send_command_with_delay("PRE-PRODUCTION", pu, delay_s=0, ploop_setpoint=ploop_setpoint)
"PRE-PRODUCTION", pu, delay_s=0, ploop_setpoint=ploop_setpoint await send_command_with_delay("PRODUCTION", pu, delay_s=180, ploop_setpoint=ploop_setpoint)
)
await send_command_with_delay(
"PRODUCTION", pu, delay_s=180, ploop_setpoint=ploop_setpoint
)
await set_patients_with_delay(5, delay_s=60) await set_patients_with_delay(5, delay_s=60)
await set_patients_with_delay(10, delay_s=60) await set_patients_with_delay(10, delay_s=60)
await send_command_with_delay("IDLE", pu, delay_s=60, ploop_setpoint=ploop_setpoint) await send_command_with_delay("IDLE", pu, delay_s=60, ploop_setpoint=ploop_setpoint)
logging.info("[AUTO TEST] Finished PU1 test") logging.info("[AUTO TEST] Finished PU1 test")
async def run_auto_test_pu2(ploop_setpoint: float): async def run_auto_test_pu2(ploop_setpoint: float):
# Step 1: Run PU1 test # Step 1: Run PU1 test
await run_auto_test_pu1(1, ploop_setpoint) await run_auto_test_pu1(1, ploop_setpoint)
# Step 2: PU2 sequence # Step 2: PU2 sequence
await send_command_with_delay( await send_command_with_delay("PRE-PRODUCTION", 2, delay_s=0, ploop_setpoint=ploop_setpoint)
"PRE-PRODUCTION", 2, delay_s=0, ploop_setpoint=ploop_setpoint await send_command_with_delay("PRODUCTION", 2, delay_s=180, ploop_setpoint=ploop_setpoint)
)
await send_command_with_delay(
"PRODUCTION", 2, delay_s=180, ploop_setpoint=ploop_setpoint
)
await set_patients_with_delay(15, delay_s=60) await set_patients_with_delay(15, delay_s=60)
await set_patients_with_delay(0, delay_s=60) await set_patients_with_delay(0, delay_s=60)
await send_command_with_delay("IDLE", 2, delay_s=60, ploop_setpoint=ploop_setpoint) await send_command_with_delay("IDLE", 2, delay_s=60, ploop_setpoint=ploop_setpoint)
await send_command_with_delay("IDLE", 1, delay_s=60, ploop_setpoint=ploop_setpoint) await send_command_with_delay("IDLE", 1, delay_s=60, ploop_setpoint=ploop_setpoint)
logging.info("[AUTO TEST] Finished PU1 + PU2 test") logging.info("[AUTO TEST] Finished PU1 + PU2 test")
@router.post("/test/auto/3") @router.post("/test/auto/3")
async def auto_test_pu3(): async def auto_test_pu3():
# Call the function for PU3 auto test # Call the function for PU3 auto test
logging.info("Start auto test of 3 PU") logging.info("Start auto test of 3 PU")
return {"status": "started", "pu": 3} return {"status": "started", "pu": 3}
# PATIENT SKID HELPERS # PATIENT SKID HELPERS
async def update_latest_flow(): async def update_latest_flow():
global active_PUs global active_PUs
@ -455,11 +452,21 @@ async def update_latest_flow():
latest_flow = int(data["log"]) latest_flow = int(data["log"])
logging.debug(f"Updated flow: {latest_flow}") logging.debug(f"Updated flow: {latest_flow}")
latest_data["PatientSkid"]["QSkid"] = latest_flow latest_data["PatientSkid"]["QSkid"] = latest_flow
except Exception as e: except Exception as e:
logging.error(f"Error fetching flow: {e}") logging.error(f"Error fetching flow: {e}")
await asyncio.sleep(1.0) await asyncio.sleep(1.0)
def stop_patient_skid():
try:
url = f"http://192.168.1.28:8000/stop_test"
response = httpx.get(url, timeout=5.0)
if response.status_code == 200:
return {"status": "success", "detail": response.json()}
else:
raise HTTPException(status_code=502, detail=f"Remote server error: {response.text}")
except httpx.RequestError as e:
raise HTTPException(status_code=500, detail=f"Request to external server failed: {str(e)}")
def set_patient_skid_users(count: int = 1): def set_patient_skid_users(count: int = 1):
try: try:
@ -469,20 +476,14 @@ def set_patient_skid_users(count: int = 1):
if response.status_code == 200: if response.status_code == 200:
return {"status": "success", "detail": response.json()} return {"status": "success", "detail": response.json()}
else: else:
raise HTTPException( raise HTTPException(status_code=502, detail=f"Remote server error: {response.text}")
status_code=502, detail=f"Remote server error: {response.text}"
)
except httpx.RequestError as e: except httpx.RequestError as e:
raise HTTPException( raise HTTPException(status_code=500, detail=f"Request to external server failed: {str(e)}")
status_code=500, detail=f"Request to external server failed: {str(e)}"
)
app.include_router(router) app.include_router(router)
if __name__ == "__main__": if __name__ == "__main__":
import uvicorn import uvicorn
uvicorn.run( uvicorn.run(
"main:app", "main:app",
host="127.0.0.1", host="127.0.0.1",

94
protocol_decoder.py Normal file
View File

@ -0,0 +1,94 @@
from typing import Dict, Any, List, Tuple
import re
RE_PU_VP = re.compile(r'^P(?P<pu>[1-3])VP$')
RE_PU_CO = re.compile(r'^P(?P<pu>[1-3])CO$')
RE_DOCK_VP = re.compile(r'^D0VP$')
RE_DOCK_CO = re.compile(r'^(D0CO|DOCO)$') # be tolerant
def _to_i(s: str) -> int:
try: return int(s.strip())
except: return 0
def _to_pct(s: str) -> int:
try: return int(s.strip())
except:
try: return int(float(s))
except: return 0
def _to_bool(s: str) -> bool:
return str(s).strip() in ("1","true","True","TRUE")
def _dock_vp(vals: List[str]) -> Dict[str, Any]:
names = ["mv01","mv09","mv10","mv11","mmv01","mmv02","mmv03","sv01","sv02","sv03"]
out: Dict[str, Any] = {}
for k, v in zip(names, vals):
out[k] = _to_bool(v) if k.startswith("sv") else _to_pct(v)
return out
def _dock_co(vals: List[str]) -> Dict[str, Any]:
out: Dict[str, Any] = {}
for name, v in zip(["cs01","cs02"], vals):
q = _to_i(v) # 0.1 µS
out[f"{name}_0p1uS"] = q
out[f"{name}_uS"] = q*0.1
return out
def _pu_vp(pu: int, vals: List[str]) -> Dict[str, Any]:
out: Dict[str, Any] = {"pu": pu}
for k, v in zip(["mv02","mv03","mv04","mv05","mv06","mv07","mv08"], vals):
out[k] = _to_pct(v)
return out
def _pu_co(pu: int, vals: List[str]) -> Dict[str, Any]:
out: Dict[str, Any] = {"pu": pu}
for name, v in zip(["cs03","cs04","cs05"], vals):
q = _to_i(v)
out[f"{name}_0p1uS"] = q
out[f"{name}_uS"] = q*0.1
return out
def decode_frames(buffer: bytes) -> Tuple[List[Tuple[bytes, Dict[str, Any]]], bytes, int]:
msgs: List[Tuple[bytes, Dict[str, Any]]] = []
errors = 0
parts = buffer.split(b"\n")
remaining = parts[-1]
for line in parts[:-1]:
raw = line.strip().rstrip(b"\r")
if not raw: continue
try:
t = raw.decode("utf-8")
fields = [f.strip() for f in t.split(",")]
if len(fields) < 3: raise ValueError("too few fields")
version, msg_id, ts_ms = fields[0], fields[1], fields[2]
data = fields[3:]
parsed: Dict[str, Any] = {"version":version, "msg_id":msg_id, "ts_ms": int(ts_ms)}
if RE_DOCK_VP.match(msg_id):
parsed.update({"src":"dock","type":"valves"})
parsed.update(_dock_vp(data))
elif RE_DOCK_CO.match(msg_id):
parsed.update({"src":"dock","type":"cond"})
parsed.update(_dock_co(data))
else:
m = RE_PU_VP.match(msg_id)
if m:
pu = int(m.group("pu"))
parsed.update({"src":"pu","type":"valves","pu":pu})
parsed.update(_pu_vp(pu, data))
else:
m = RE_PU_CO.match(msg_id)
if m:
pu = int(m.group("pu"))
parsed.update({"src":"pu","type":"cond","pu":pu})
parsed.update(_pu_co(pu, data))
else:
parsed.update({"src":"unknown","type":"raw","data":data})
msgs.append((raw, parsed))
except Exception:
errors += 1
return msgs, remaining, errors

120
serial_csv_logger.py Normal file
View File

@ -0,0 +1,120 @@
# serial_csv_logger.py
import os, csv, datetime, json
from typing import Dict, Any, Tuple, Optional
class SerialCsvLogger:
"""
Writes parsed serial frames to CSV, segregated by message type:
- D0VP_YYYY-MM-DD.csv (Docking valves)
- D0CO_YYYY-MM-DD.csv (Docking conductivity)
- P1VP_YYYY-MM-DD.csv (PU1 valves), P2VP..., P3VP...
- P1CO_YYYY-MM-DD.csv (PU1 conductivity), etc.
- Unknown_YYYY-MM-DD.csv (for anything unmatched)
"""
def __init__(self, out_dir: str = "serial_logs", rotate_daily: bool = True):
self.out_dir = out_dir
self.rotate_daily = rotate_daily
self._writers: Dict[str, Tuple[csv.DictWriter, Any, str]] = {} # key -> (writer, file, date_str)
os.makedirs(self.out_dir, exist_ok=True)
def close(self):
for _, (_, f, _) in self._writers.items():
try: f.close()
except: pass
self._writers.clear()
# ---------- public API ----------
def log(self, parsed: Dict[str, Any]):
msg_id = parsed.get("msg_id", "Unknown")
date_str = datetime.date.today().isoformat() if self.rotate_daily else "all"
key = f"{msg_id}"
# rotate if day changed
if key in self._writers and self._writers[key][2] != date_str:
self._writers[key][1].close()
del self._writers[key]
writer, _, _ = self._ensure_writer(key, msg_id, date_str)
row = self._build_row(msg_id, parsed)
writer.writerow(row)
# ---------- internals ----------
def _ensure_writer(self, key: str, msg_id: str, date_str: str):
if key in self._writers:
return self._writers[key]
fname = f"{msg_id}_{date_str}.csv"
path = os.path.join(self.out_dir, fname)
f = open(path, "a", newline="")
headers = self._headers_for(msg_id)
writer = csv.DictWriter(f, fieldnames=headers)
# write header only if file is empty
if f.tell() == 0:
writer.writeheader()
self._writers[key] = (writer, f, date_str)
return self._writers[key]
def _headers_for(self, msg_id: str):
# Common heads
base = ["ts_iso", "ts_ms", "version", "msg_id"]
if msg_id == "D0VP":
return base + ["mv01","mv09","mv10","mv11","mmv01","mmv02","mmv03","sv01","sv02","sv03"]
if msg_id in ("D0CO", "DOCO"):
# write both scaled (uS) and raw (0.1 uS) for traceability
return base + ["cs01_uS","cs01_0p1uS","cs02_uS","cs02_0p1uS"]
if msg_id.endswith("VP") and len(msg_id) == 4 and msg_id[0] == "P":
# P1VP / P2VP / P3VP
return base + ["pu","mv02","mv03","mv04","mv05","mv06","mv07","mv08"]
if msg_id.endswith("CO") and len(msg_id) == 4 and msg_id[0] == "P":
# P1CO / P2CO / P3CO
return base + ["pu","cs03_uS","cs03_0p1uS","cs04_uS","cs04_0p1uS","cs05_uS","cs05_0p1uS"]
# fallback
return base + ["payload_json"]
def _build_row(self, msg_id: str, p: Dict[str, Any]) -> Dict[str, Any]:
ts_iso = datetime.datetime.fromtimestamp(p.get("ts_ms", 0)/1000.0).isoformat() if "ts_ms" in p else ""
row = {"ts_iso": ts_iso, "ts_ms": p.get("ts_ms", ""), "version": p.get("version",""), "msg_id": msg_id}
if msg_id == "D0VP":
row.update({
"mv01": p.get("mv01"), "mv09": p.get("mv09"), "mv10": p.get("mv10"), "mv11": p.get("mv11"),
"mmv01": p.get("mmv01"), "mmv02": p.get("mmv02"), "mmv03": p.get("mmv03"),
"sv01": p.get("sv01"), "sv02": p.get("sv02"), "sv03": p.get("sv03"),
})
return row
if msg_id in ("D0CO", "DOCO"):
row.update({
"cs01_uS": p.get("cs01_uS"), "cs01_0p1uS": p.get("cs01_0p1uS"),
"cs02_uS": p.get("cs02_uS"), "cs02_0p1uS": p.get("cs02_0p1uS"),
})
return row
if msg_id.endswith("VP") and len(msg_id) == 4 and msg_id[0] == "P":
row.update({
"pu": p.get("pu"),
"mv02": p.get("mv02"), "mv03": p.get("mv03"), "mv04": p.get("mv04"),
"mv05": p.get("mv05"), "mv06": p.get("mv06"), "mv07": p.get("mv07"), "mv08": p.get("mv08"),
})
return row
if msg_id.endswith("CO") and len(msg_id) == 4 and msg_id[0] == "P":
row.update({
"pu": p.get("pu"),
"cs03_uS": p.get("cs03_uS"), "cs03_0p1uS": p.get("cs03_0p1uS"),
"cs04_uS": p.get("cs04_uS"), "cs04_0p1uS": p.get("cs04_0p1uS"),
"cs05_uS": p.get("cs05_uS"), "cs05_0p1uS": p.get("cs05_0p1uS"),
})
return row
# Unknown → keep full payload as JSON for later inspection
pay = {k:v for k,v in p.items() if k not in ("version","msg_id","ts_ms")}
row["payload_json"] = json.dumps(pay, separators=(",",":"))
return row

234
serial_manager.py Normal file
View File

@ -0,0 +1,234 @@
# serial_manager.py
import threading
import time
import csv
from collections import deque
from dataclasses import dataclass
from typing import Any, Callable, Deque, Dict, List, Optional, Tuple
import serial # provided by python3-serial
@dataclass
class SerialConfig:
"""
Configuration for the read-only serial intake.
"""
port: str = "/dev/ttyUSB0"
baudrate: int = 115200
bytesize: int = serial.EIGHTBITS
parity: str = serial.PARITY_NONE
stopbits: int = serial.STOPBITS_ONE
timeout: float = 0.05
rtscts: bool = False
dsrdtr: bool = False
xonxoff: bool = False
ring_capacity: int = 5000
# If set, a single "generic" CSV will be written here (append mode).
# If you want segregated CSVs per message type, leave this as None and
# supply an `on_message` callback that writes where you want.
csv_log_path: Optional[str] = None # e.g. "/home/pi/hmi/serial_log.csv"
class SerialStore:
"""
Thread-safe store for recent parsed messages and intake stats.
Stores parsed dicts as returned by the decoder.
"""
def __init__(self, capacity: int):
self._buf: Deque[Dict[str, Any]] = deque(maxlen=capacity)
self._lock = threading.Lock()
self._stats = {
"frames_in": 0,
"frames_ok": 0,
"frames_bad": 0,
"restarts": 0,
"last_err": "",
}
self._latest_by_id: Dict[str, Dict[str, Any]] = {}
def add(self, msg: Dict[str, Any], ok: bool = True):
with self._lock:
self._buf.append(msg)
self._stats["frames_in"] += 1
if ok:
self._stats["frames_ok"] += 1
else:
self._stats["frames_bad"] += 1
mid = msg.get("msg_id")
if mid:
self._latest_by_id[mid] = msg
def latest(self, n: int = 100) -> List[Dict[str, Any]]:
with self._lock:
return list(self._buf)[-n:]
def latest_by_id(self) -> Dict[str, Dict[str, Any]]:
with self._lock:
return dict(self._latest_by_id)
def stats(self) -> Dict[str, Any]:
with self._lock:
return dict(self._stats)
def set_error(self, err: str):
with self._lock:
self._stats["last_err"] = err
def inc_restart(self):
with self._lock:
self._stats["restarts"] += 1
class SerialReader:
"""
Background read-only serial reader.
Args:
cfg: SerialConfig
store: SerialStore
decoder: function(buffer: bytes) ->
(messages: List[Tuple[raw_frame: bytes, parsed: Dict]], remaining: bytes, errors: int)
on_message: optional callback called for each parsed dict (e.g., segregated CSV logger)
"""
def __init__(
self,
cfg: SerialConfig,
store: SerialStore,
decoder: Callable[[bytes], Tuple[List[Tuple[bytes, Dict[str, Any]]], bytes, int]],
on_message: Optional[Callable[[Dict[str, Any]], None]] = None,
):
self.cfg = cfg
self.store = store
self.decoder = decoder
self.on_message = on_message
self._ser: Optional[serial.Serial] = None
self._th: Optional[threading.Thread] = None
self._stop = threading.Event()
self._buffer = b""
# Optional generic CSV (single file) if cfg.csv_log_path is set
self._csv_file = None
self._csv_writer = None
# ---------- lifecycle ----------
def start(self):
self._stop.clear()
self._open_serial()
self._open_csv()
self._th = threading.Thread(target=self._run, name="SerialReader", daemon=True)
self._th.start()
def stop(self):
self._stop.set()
if self._th and self._th.is_alive():
self._th.join(timeout=2.0)
self._close_serial()
self._close_csv()
# ---------- internals ----------
def _open_serial(self):
try:
self._ser = serial.Serial(
port=self.cfg.port,
baudrate=self.cfg.baudrate,
bytesize=self.cfg.bytesize,
parity=self.cfg.parity,
stopbits=self.cfg.stopbits,
timeout=self.cfg.timeout,
rtscts=self.cfg.rtscts,
dsrdtr=self.cfg.dsrdtr,
xonxoff=self.cfg.xonxoff,
)
except Exception as e:
self.store.set_error(f"Open error: {e}")
self._ser = None
def _close_serial(self):
try:
if self._ser and self._ser.is_open:
self._ser.close()
except Exception:
pass
self._ser = None
def _open_csv(self):
if not self.cfg.csv_log_path:
return
try:
self._csv_file = open(self.cfg.csv_log_path, "a", newline="")
self._csv_writer = csv.writer(self._csv_file)
# Write header only if file is empty (avoid duplicates on restart)
if self._csv_file.tell() == 0:
self._csv_writer.writerow(["ts_ms", "msg_id", "raw_hex", "parsed"])
self._csv_file.flush()
except Exception as e:
self.store.set_error(f"CSV open error: {e}")
self._csv_file = None
self._csv_writer = None
def _close_csv(self):
try:
if self._csv_file:
self._csv_file.close()
except Exception:
pass
self._csv_file = None
self._csv_writer = None
def _log_csv(self, raw: bytes, parsed: Dict[str, Any]):
"""Write to the optional single generic CSV."""
if not self._csv_writer:
return
try:
self._csv_writer.writerow(
[parsed.get("ts_ms"), parsed.get("msg_id"), raw.hex(), parsed]
)
self._csv_file.flush()
except Exception as e:
self.store.set_error(f"CSV write error: {e}")
def _run(self):
backoff = 0.5
while not self._stop.is_set():
if not self._ser or not self._ser.is_open:
# reconnect with exponential backoff (capped)
self._close_serial()
time.sleep(backoff)
self.store.inc_restart()
self._open_serial()
backoff = min(backoff * 1.5, 5.0)
continue
backoff = 0.5
try:
data = self._ser.read(4096) # non-blocking due to timeout
if data:
self._buffer += data
frames, remaining, errors = self.decoder(self._buffer)
self._buffer = remaining
for raw, parsed in frames:
# store
self.store.add(parsed, ok=True)
# optional generic CSV
self._log_csv(raw, parsed)
# optional segregated sink
if self.on_message:
try:
self.on_message(parsed)
except Exception as e:
self.store.set_error(f"CSV sink error: {e}")
# count decode errors
for _ in range(errors):
self.store.add({"error": "decode"}, ok=False)
else:
time.sleep(0.01)
except Exception as e:
self.store.set_error(f"Read/Decode error: {e}")
self._close_serial()
time.sleep(0.5)

View File

@ -1,12 +0,0 @@
#!/bin/bash
set -e
echo "[UPDATE] Pulling latest code..."
cd /home/hmi/Desktop/HMI || exit 1
git reset --hard HEAD
git pull origin main
echo "[RESTART] Restarting HMI service..."
sudo /bin/systemctl restart hmi.service
echo "[DONE] HMI updated."

View File

@ -1,30 +0,0 @@
import canopen
import os
class ValveBackend:
def __init__(self, eds_file: str, node_id: int = 0x0F):
self.eds_file = eds_file
self.node_id = node_id
self.network = None
self.node = None
def connect(self):
try:
self.network = canopen.Network()
self.network.connect(channel='can0', bustype='socketcan')
self.node = canopen.RemoteNode(self.node_id, self.eds_file)
self.network.add_node(self.node)
return True
except Exception as e:
print(f"[VALVE CONNECT ERROR] {e}")
return False
def send_command(self, opening: int):
try:
if self.node is None:
raise RuntimeError("Valve node not initialized")
self.node.sdo[0x6000].raw = opening
print(f"[VALVE] Opening set to {opening}")
except Exception as e:
print(f"[VALVE CMD ERROR] {e}")