Compare commits
4 Commits
f808b88f93
...
2446fb7b59
| Author | SHA1 | Date | |
|---|---|---|---|
| 2446fb7b59 | |||
| 41c8d49d31 | |||
| e8755bd1de | |||
| 117265586c |
169
main.py
169
main.py
|
|
@ -5,9 +5,7 @@ import logging
|
||||||
import os
|
import os
|
||||||
from fastapi import Request, APIRouter
|
from fastapi import Request, APIRouter
|
||||||
import platform
|
import platform
|
||||||
from fastapi.templating import (
|
from fastapi.templating import Jinja2Templates
|
||||||
Jinja2Templates,
|
|
||||||
) # pip install fastapi uvicorn jinja2 python-multipart passlib
|
|
||||||
from starlette.middleware.sessions import SessionMiddleware
|
from starlette.middleware.sessions import SessionMiddleware
|
||||||
from starlette.exceptions import HTTPException as StarletteHTTPException
|
from starlette.exceptions import HTTPException as StarletteHTTPException
|
||||||
from starlette.status import HTTP_302_FOUND
|
from starlette.status import HTTP_302_FOUND
|
||||||
|
|
@ -23,15 +21,16 @@ import numpy as np
|
||||||
import aiohttp
|
import aiohttp
|
||||||
import httpx
|
import httpx
|
||||||
|
|
||||||
|
from serial_manager import SerialConfig, SerialStore, SerialReader
|
||||||
|
from protocol_decoder import decode_frames
|
||||||
|
from serial_csv_logger import SerialCsvLogger # <-- CSV logger
|
||||||
|
|
||||||
if platform.system() in ["Darwin"]: # macOS or Windows
|
if platform.system() in ["Darwin"]: # macOS or Windows
|
||||||
from MockCAN import CANBackend
|
from MockCAN import CANBackend
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO)
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
from classCAN import CANBackend # Your real backend
|
from classCAN import CANBackend # Your real backend
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
|
||||||
|
|
||||||
logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
|
logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
|
||||||
|
|
||||||
|
|
@ -67,10 +66,12 @@ write_buffer = deque()
|
||||||
flush_interval = 1.0 # flush every 1 second
|
flush_interval = 1.0 # flush every 1 second
|
||||||
last_flush_time = datetime.datetime.now()
|
last_flush_time = datetime.datetime.now()
|
||||||
|
|
||||||
|
# ---- Serial intake globals ----
|
||||||
|
serial_store = SerialStore(capacity=5000)
|
||||||
|
serial_reader: SerialReader | None = None
|
||||||
|
serial_csv: SerialCsvLogger | None = None # <-- added
|
||||||
|
|
||||||
## LOGGING
|
## LOGGING
|
||||||
|
|
||||||
|
|
||||||
def format_PU_data(data):
|
def format_PU_data(data):
|
||||||
return {
|
return {
|
||||||
"timestamp": datetime.datetime.now().isoformat(),
|
"timestamp": datetime.datetime.now().isoformat(),
|
||||||
|
|
@ -78,8 +79,7 @@ def format_PU_data(data):
|
||||||
"Qdilute": np.round(data.get("FM1", 0.0), 1),
|
"Qdilute": np.round(data.get("FM1", 0.0), 1),
|
||||||
"Qdrain": np.round(data.get("FM4", 0.0), 1),
|
"Qdrain": np.round(data.get("FM4", 0.0), 1),
|
||||||
"Qrecirc": np.round(data.get("FM3", 0.0), 1),
|
"Qrecirc": np.round(data.get("FM3", 0.0), 1),
|
||||||
"QdrainEDI": np.round(data.get("FM2", 0.0), 1)
|
"QdrainEDI": np.round(data.get("FM2", 0.0), 1) - np.round(data.get("FM1", 0.0), 1),
|
||||||
- np.round(data.get("FM1", 0.0), 1),
|
|
||||||
"Pro": np.round(data.get("PS2", 0.0), 2),
|
"Pro": np.round(data.get("PS2", 0.0), 2),
|
||||||
"Pdilute": np.round(data.get("PS3", 0.0), 2),
|
"Pdilute": np.round(data.get("PS3", 0.0), 2),
|
||||||
"Pretentate": np.round(data.get("PS1", 0.0), 2),
|
"Pretentate": np.round(data.get("PS1", 0.0), 2),
|
||||||
|
|
@ -102,7 +102,6 @@ def format_PU_data(data):
|
||||||
"MV08_sp": np.round(data.get("MV08_sp", 0.0), 1),
|
"MV08_sp": np.round(data.get("MV08_sp", 0.0), 1),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def format_DS_data(data):
|
def format_DS_data(data):
|
||||||
return {
|
return {
|
||||||
"timestamp": datetime.datetime.now().isoformat(),
|
"timestamp": datetime.datetime.now().isoformat(),
|
||||||
|
|
@ -112,10 +111,7 @@ def format_DS_data(data):
|
||||||
"Qoutlet": np.round(data.get("Outlet_flow", 0.0), 1),
|
"Qoutlet": np.round(data.get("Outlet_flow", 0.0), 1),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
# CREDENTIALS
|
# CREDENTIALS
|
||||||
|
|
||||||
# Load users from JSON file at startup
|
|
||||||
CREDENTIAL_PATH = Path("credentials.json")
|
CREDENTIAL_PATH = Path("credentials.json")
|
||||||
if CREDENTIAL_PATH.exists():
|
if CREDENTIAL_PATH.exists():
|
||||||
with CREDENTIAL_PATH.open("r") as f:
|
with CREDENTIAL_PATH.open("r") as f:
|
||||||
|
|
@ -126,7 +122,6 @@ else:
|
||||||
USERNAME = CREDENTIALS["username"]
|
USERNAME = CREDENTIALS["username"]
|
||||||
PASSWORD = CREDENTIALS["password"]
|
PASSWORD = CREDENTIALS["password"]
|
||||||
|
|
||||||
|
|
||||||
# ======== LOGIN & SESSION HANDLING ========
|
# ======== LOGIN & SESSION HANDLING ========
|
||||||
def require_login(request: Request):
|
def require_login(request: Request):
|
||||||
user = request.session.get("user")
|
user = request.session.get("user")
|
||||||
|
|
@ -135,12 +130,10 @@ def require_login(request: Request):
|
||||||
raise StarletteHTTPException(status_code=302, detail="Redirect to login")
|
raise StarletteHTTPException(status_code=302, detail="Redirect to login")
|
||||||
return user
|
return user
|
||||||
|
|
||||||
|
|
||||||
@app.get("/", response_class=HTMLResponse)
|
@app.get("/", response_class=HTMLResponse)
|
||||||
def login_form(request: Request):
|
def login_form(request: Request):
|
||||||
return templates.TemplateResponse("login.html", {"request": request})
|
return templates.TemplateResponse("login.html", {"request": request})
|
||||||
|
|
||||||
|
|
||||||
@app.post("/login")
|
@app.post("/login")
|
||||||
def login(request: Request, username: str = Form(...), password: str = Form(...)):
|
def login(request: Request, username: str = Form(...), password: str = Form(...)):
|
||||||
if username == USERNAME and password == PASSWORD:
|
if username == USERNAME and password == PASSWORD:
|
||||||
|
|
@ -150,22 +143,46 @@ def login(request: Request, username: str = Form(...), password: str = Form(...)
|
||||||
"login.html", {"request": request, "error": "Invalid credentials.json"}
|
"login.html", {"request": request, "error": "Invalid credentials.json"}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@app.get("/logout")
|
@app.get("/logout")
|
||||||
def logout(request: Request):
|
def logout(request: Request):
|
||||||
request.session.clear()
|
request.session.clear()
|
||||||
return RedirectResponse("/", status_code=HTTP_302_FOUND)
|
return RedirectResponse("/", status_code=HTTP_302_FOUND)
|
||||||
|
|
||||||
|
# ======== PROTECTED INTERFACE / STARTUP-SHUTDOWN ========
|
||||||
# ======== PROTECTED INTERFACE ========
|
|
||||||
|
|
||||||
|
|
||||||
@app.on_event("startup")
|
@app.on_event("startup")
|
||||||
async def startup_event():
|
async def startup_event():
|
||||||
|
# ----- CSV logger -----
|
||||||
|
global serial_csv
|
||||||
|
serial_csv = SerialCsvLogger(out_dir="serial_logs", rotate_daily=True)
|
||||||
|
|
||||||
|
# ----- start the serial reader -----
|
||||||
|
global serial_reader
|
||||||
|
cfg = SerialConfig(
|
||||||
|
port=os.getenv("SERIAL_PORT", "/dev/ttyUSB0"),
|
||||||
|
baudrate=int(os.getenv("SERIAL_BAUD", "115200")),
|
||||||
|
csv_log_path=None, # disable the generic CSV inside reader; use segregated logger instead
|
||||||
|
ring_capacity=int(os.getenv("SERIAL_RING", "5000")),
|
||||||
|
)
|
||||||
|
serial_reader = SerialReader(
|
||||||
|
cfg,
|
||||||
|
serial_store,
|
||||||
|
decoder=decode_frames,
|
||||||
|
on_message=(lambda p: serial_csv.log(p)) # write CSV per message type
|
||||||
|
)
|
||||||
|
serial_reader.start()
|
||||||
|
|
||||||
|
# ----- your existing tasks -----
|
||||||
asyncio.create_task(update_latest_data())
|
asyncio.create_task(update_latest_data())
|
||||||
asyncio.create_task(update_latest_flow())
|
asyncio.create_task(update_latest_flow())
|
||||||
|
|
||||||
|
@app.on_event("shutdown")
|
||||||
|
def _serial_stop():
|
||||||
|
if serial_reader:
|
||||||
|
serial_reader.stop()
|
||||||
|
if serial_csv:
|
||||||
|
serial_csv.close()
|
||||||
|
|
||||||
|
# ======== PAGES ========
|
||||||
@app.get("/control", response_class=HTMLResponse)
|
@app.get("/control", response_class=HTMLResponse)
|
||||||
def control_page(request: Request):
|
def control_page(request: Request):
|
||||||
can_backend.connect()
|
can_backend.connect()
|
||||||
|
|
@ -173,28 +190,35 @@ def control_page(request: Request):
|
||||||
return RedirectResponse("/", status_code=HTTP_302_FOUND)
|
return RedirectResponse("/", status_code=HTTP_302_FOUND)
|
||||||
return templates.TemplateResponse("control.html", {"request": request})
|
return templates.TemplateResponse("control.html", {"request": request})
|
||||||
|
|
||||||
|
|
||||||
@app.get("/monitor-DS", response_class=HTMLResponse)
|
@app.get("/monitor-DS", response_class=HTMLResponse)
|
||||||
def monitor_page(request: Request):
|
def monitor_page(request: Request):
|
||||||
with open("static/monitor_DS.html") as f:
|
with open("static/monitor_DS.html") as f:
|
||||||
return HTMLResponse(f.read())
|
return HTMLResponse(f.read())
|
||||||
|
|
||||||
|
|
||||||
@app.get("/monitor-PU", response_class=HTMLResponse)
|
@app.get("/monitor-PU", response_class=HTMLResponse)
|
||||||
def monitor_page(request: Request):
|
def monitor_page(request: Request):
|
||||||
with open("static/monitor_PU.html") as f:
|
with open("static/monitor_PU.html") as f:
|
||||||
return HTMLResponse(f.read())
|
return HTMLResponse(f.read())
|
||||||
|
|
||||||
|
|
||||||
@app.get("/multi-monitor-PU", response_class=HTMLResponse)
|
@app.get("/multi-monitor-PU", response_class=HTMLResponse)
|
||||||
def monitor_page(request: Request):
|
def monitor_page(request: Request):
|
||||||
with open("static/multi_pu_dashboard.html") as f:
|
with open("static/multi_pu_dashboard.html") as f:
|
||||||
return HTMLResponse(f.read())
|
return HTMLResponse(f.read())
|
||||||
|
|
||||||
|
# ======== SERIAL API ========
|
||||||
|
@app.get("/serial/messages")
|
||||||
|
def serial_messages(n: int = 100):
|
||||||
|
return serial_store.latest(min(max(n, 1), 1000))
|
||||||
|
|
||||||
|
@app.get("/serial/stats")
|
||||||
|
def serial_stats():
|
||||||
|
return serial_store.stats()
|
||||||
|
|
||||||
|
@app.get("/serial/snapshot")
|
||||||
|
def serial_snapshot():
|
||||||
|
return serial_store.latest_by_id()
|
||||||
|
|
||||||
# ======== CAN + BACKEND ROUTES ========
|
# ======== CAN + BACKEND ROUTES ========
|
||||||
|
|
||||||
|
|
||||||
@app.post("/connect_toggle")
|
@app.post("/connect_toggle")
|
||||||
def connect_toggle():
|
def connect_toggle():
|
||||||
logging.info(f"Toggling CAN connection, CAN is {can_backend.connected}")
|
logging.info(f"Toggling CAN connection, CAN is {can_backend.connected}")
|
||||||
|
|
@ -209,15 +233,11 @@ def connect_toggle():
|
||||||
raise HTTPException(status_code=500, detail="Connection failed.")
|
raise HTTPException(status_code=500, detail="Connection failed.")
|
||||||
return {"connected": can_backend.connected}
|
return {"connected": can_backend.connected}
|
||||||
|
|
||||||
|
|
||||||
@app.get("/is_connected")
|
@app.get("/is_connected")
|
||||||
def is_connected():
|
def is_connected():
|
||||||
return {"connected": can_backend.connected}
|
return {"connected": can_backend.connected}
|
||||||
|
|
||||||
|
|
||||||
# PU CONTROL
|
# PU CONTROL
|
||||||
|
|
||||||
|
|
||||||
@app.post("/command/{state}/pu/{pu_number}")
|
@app.post("/command/{state}/pu/{pu_number}")
|
||||||
def send_command(state: str, pu_number: int, ploop_setpoint: float = Query(...)):
|
def send_command(state: str, pu_number: int, ploop_setpoint: float = Query(...)):
|
||||||
global DEFAULT_FEED_VALVE
|
global DEFAULT_FEED_VALVE
|
||||||
|
|
@ -238,6 +258,16 @@ def send_command(state: str, pu_number: int, ploop_setpoint: float = Query(...))
|
||||||
|
|
||||||
logging.info(f"Sending state '{state}' to PU {pu_number}")
|
logging.info(f"Sending state '{state}' to PU {pu_number}")
|
||||||
|
|
||||||
|
if state == "IDLE":
|
||||||
|
set_patient_skid_users(0)
|
||||||
|
url = f"http://192.168.1.28:8000/stop_test"
|
||||||
|
response = httpx.get(url, timeout=1.0)
|
||||||
|
logging.info(f"Stopping test on Patient Skid: {response.status_code}")
|
||||||
|
|
||||||
|
url = f"http://192.168.1.28:8000/close_valves"
|
||||||
|
response = httpx.get(url, timeout=1.0)
|
||||||
|
logging.info(f"Closing valves on Patient Skid: {response.status_code}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
can_backend.send_state_command(state, pu_number, ploop_setpoint)
|
can_backend.send_state_command(state, pu_number, ploop_setpoint)
|
||||||
current_state = can_backend.read_current_state(pu_number)
|
current_state = can_backend.read_current_state(pu_number)
|
||||||
|
|
@ -252,13 +282,7 @@ def send_command(state: str, pu_number: int, ploop_setpoint: float = Query(...))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise HTTPException(status_code=500, detail=str(e))
|
raise HTTPException(status_code=500, detail=str(e))
|
||||||
|
|
||||||
if state == "IDLE":
|
|
||||||
set_patient_skid_users(0)
|
|
||||||
|
|
||||||
|
|
||||||
## MONITORING
|
## MONITORING
|
||||||
|
|
||||||
|
|
||||||
@app.get("/api/pu_status")
|
@app.get("/api/pu_status")
|
||||||
def get_pu_status():
|
def get_pu_status():
|
||||||
global active_PUs
|
global active_PUs
|
||||||
|
|
@ -278,13 +302,12 @@ def get_pu_status():
|
||||||
|
|
||||||
return JSONResponse(content=states)
|
return JSONResponse(content=states)
|
||||||
|
|
||||||
|
|
||||||
async def update_latest_data():
|
async def update_latest_data():
|
||||||
global active_PUs
|
global active_PUs
|
||||||
while True:
|
while True:
|
||||||
# DS
|
# DS
|
||||||
data = can_backend.get_latest_data(pu_number=0)
|
data = can_backend.get_latest_data(pu_number=0)
|
||||||
latest_data[f"DS"] = format_DS_data(data)
|
latest_data["DS"] = format_DS_data(data)
|
||||||
|
|
||||||
# PUs
|
# PUs
|
||||||
for pu in active_PUs:
|
for pu in active_PUs:
|
||||||
|
|
@ -294,13 +317,11 @@ async def update_latest_data():
|
||||||
logging.debug(f"[MONITOR DS BUFFER] latest_data: {latest_data}")
|
logging.debug(f"[MONITOR DS BUFFER] latest_data: {latest_data}")
|
||||||
await asyncio.sleep(0.05)
|
await asyncio.sleep(0.05)
|
||||||
|
|
||||||
|
|
||||||
@app.get("/monitor")
|
@app.get("/monitor")
|
||||||
async def get_monitor_data():
|
async def get_monitor_data():
|
||||||
return latest_data
|
return latest_data
|
||||||
# return JSONResponse(content=latest_data)
|
# return JSONResponse(content=latest_data)
|
||||||
|
|
||||||
|
|
||||||
# LOCAL RECORDER
|
# LOCAL RECORDER
|
||||||
@app.post("/start_recording")
|
@app.post("/start_recording")
|
||||||
async def start_recording():
|
async def start_recording():
|
||||||
|
|
@ -315,11 +336,11 @@ async def start_recording():
|
||||||
filepath = os.path.join("recordings", filename)
|
filepath = os.path.join("recordings", filename)
|
||||||
|
|
||||||
recording_file = open(filepath, "w", newline="")
|
recording_file = open(filepath, "w", newline="")
|
||||||
fieldnames_common = ["timestamp", "pu", "QSkid"]
|
fieldnames_common = ["timestamp", "pu", "QSkid"]
|
||||||
fieldnames_DS = list(format_DS_data({}).keys())
|
fieldnames_DS = list(format_DS_data({}).keys())
|
||||||
fieldnames_DS.pop(0) # .pop(0) removing extra timestamp
|
fieldnames_DS.pop(0) # removing extra timestamp
|
||||||
fieldnames_PUs = list(format_PU_data({}).keys())
|
fieldnames_PUs = list(format_PU_data({}).keys())
|
||||||
fieldnames_PUs.pop(0) # .pop(0) removing extra timestamp
|
fieldnames_PUs.pop(0) # removing extra timestamp
|
||||||
|
|
||||||
fieldnames = fieldnames_common + fieldnames_DS + fieldnames_PUs
|
fieldnames = fieldnames_common + fieldnames_DS + fieldnames_PUs
|
||||||
|
|
||||||
|
|
@ -331,7 +352,6 @@ async def start_recording():
|
||||||
logging.info(f"[RECORDING STARTED] File: {filepath}")
|
logging.info(f"[RECORDING STARTED] File: {filepath}")
|
||||||
return {"status": "recording started", "file": filename}
|
return {"status": "recording started", "file": filename}
|
||||||
|
|
||||||
|
|
||||||
@app.post("/stop_recording")
|
@app.post("/stop_recording")
|
||||||
async def stop_recording():
|
async def stop_recording():
|
||||||
global recording_flag, recording_task, recording_file
|
global recording_flag, recording_task, recording_file
|
||||||
|
|
@ -351,7 +371,6 @@ async def stop_recording():
|
||||||
logging.info("[RECORDING STOPPED]")
|
logging.info("[RECORDING STOPPED]")
|
||||||
return {"status": "recording stopped"}
|
return {"status": "recording stopped"}
|
||||||
|
|
||||||
|
|
||||||
async def record_data_loop():
|
async def record_data_loop():
|
||||||
global recording_writer, recording_file, write_buffer, last_flush_time
|
global recording_writer, recording_file, write_buffer, last_flush_time
|
||||||
|
|
||||||
|
|
@ -363,18 +382,13 @@ async def record_data_loop():
|
||||||
recording_writer.writerow(row)
|
recording_writer.writerow(row)
|
||||||
|
|
||||||
# Flush every flush_interval seconds
|
# Flush every flush_interval seconds
|
||||||
if (
|
if (datetime.datetime.now() - last_flush_time).total_seconds() >= flush_interval:
|
||||||
datetime.datetime.now() - last_flush_time
|
|
||||||
).total_seconds() >= flush_interval:
|
|
||||||
recording_file.flush()
|
recording_file.flush()
|
||||||
last_flush_time = datetime.datetime.now()
|
last_flush_time = datetime.datetime.now()
|
||||||
|
|
||||||
await asyncio.sleep(0.05) # 10 Hz
|
await asyncio.sleep(0.05) # 10 Hz
|
||||||
|
|
||||||
|
|
||||||
## AUTOMATIC TESTING
|
## AUTOMATIC TESTING
|
||||||
|
|
||||||
|
|
||||||
async def send_command_with_delay(
|
async def send_command_with_delay(
|
||||||
state: str, pu: int, delay_s: int = 0, ploop_setpoint: float = 0.0
|
state: str, pu: int, delay_s: int = 0, ploop_setpoint: float = 0.0
|
||||||
):
|
):
|
||||||
|
|
@ -382,15 +396,11 @@ async def send_command_with_delay(
|
||||||
logging.info(f"[AUTO TEST] Sending {state} to PU{pu} after {delay_s}s")
|
logging.info(f"[AUTO TEST] Sending {state} to PU{pu} after {delay_s}s")
|
||||||
can_backend.send_state_command(state, pu, ploop_setpoint)
|
can_backend.send_state_command(state, pu, ploop_setpoint)
|
||||||
|
|
||||||
|
|
||||||
async def set_patients_with_delay(count: int, delay_s: int):
|
async def set_patients_with_delay(count: int, delay_s: int):
|
||||||
await asyncio.sleep(delay_s)
|
await asyncio.sleep(delay_s)
|
||||||
logging.info(
|
logging.info(f"[AUTO TEST] Sending {count} patients to patient skid after {delay_s}s")
|
||||||
f"[AUTO TEST] Sending {count} patients to patient skid after {delay_s}s"
|
|
||||||
)
|
|
||||||
set_patient_skid_users(count)
|
set_patient_skid_users(count)
|
||||||
|
|
||||||
|
|
||||||
@router.post("/test/auto/1")
|
@router.post("/test/auto/1")
|
||||||
async def auto_test_pu1(ploop_setpoint: float = Query(0.0)):
|
async def auto_test_pu1(ploop_setpoint: float = Query(0.0)):
|
||||||
pu = 1
|
pu = 1
|
||||||
|
|
@ -398,52 +408,39 @@ async def auto_test_pu1(ploop_setpoint: float = Query(0.0)):
|
||||||
asyncio.create_task(run_auto_test_pu1(pu, ploop_setpoint))
|
asyncio.create_task(run_auto_test_pu1(pu, ploop_setpoint))
|
||||||
return {"status": "started", "pu": pu}
|
return {"status": "started", "pu": pu}
|
||||||
|
|
||||||
|
|
||||||
@router.post("/test/auto/2")
|
@router.post("/test/auto/2")
|
||||||
async def auto_test_pu2(ploop_setpoint: float = Query(0.0)):
|
async def auto_test_pu2(ploop_setpoint: float = Query(0.0)):
|
||||||
logging.info("[AUTO TEST] Starting automatic test for 2 PUs")
|
logging.info("[AUTO TEST] Starting automatic test for 2 PUs")
|
||||||
asyncio.create_task(run_auto_test_pu2(ploop_setpoint))
|
asyncio.create_task(run_auto_test_pu2(ploop_setpoint))
|
||||||
return {"status": "started", "pu": [1, 2]}
|
return {"status": "started", "pu": [1, 2]}
|
||||||
|
|
||||||
|
|
||||||
async def run_auto_test_pu1(pu: int, ploop_setpoint: float):
|
async def run_auto_test_pu1(pu: int, ploop_setpoint: float):
|
||||||
await send_command_with_delay(
|
await send_command_with_delay("PRE-PRODUCTION", pu, delay_s=0, ploop_setpoint=ploop_setpoint)
|
||||||
"PRE-PRODUCTION", pu, delay_s=0, ploop_setpoint=ploop_setpoint
|
await send_command_with_delay("PRODUCTION", pu, delay_s=180, ploop_setpoint=ploop_setpoint)
|
||||||
)
|
|
||||||
await send_command_with_delay(
|
|
||||||
"PRODUCTION", pu, delay_s=180, ploop_setpoint=ploop_setpoint
|
|
||||||
)
|
|
||||||
await set_patients_with_delay(5, delay_s=60)
|
await set_patients_with_delay(5, delay_s=60)
|
||||||
await set_patients_with_delay(10, delay_s=60)
|
await set_patients_with_delay(10, delay_s=60)
|
||||||
await send_command_with_delay("IDLE", pu, delay_s=60, ploop_setpoint=ploop_setpoint)
|
await send_command_with_delay("IDLE", pu, delay_s=60, ploop_setpoint=ploop_setpoint)
|
||||||
logging.info("[AUTO TEST] Finished PU1 test")
|
logging.info("[AUTO TEST] Finished PU1 test")
|
||||||
|
|
||||||
|
|
||||||
async def run_auto_test_pu2(ploop_setpoint: float):
|
async def run_auto_test_pu2(ploop_setpoint: float):
|
||||||
# Step 1: Run PU1 test
|
# Step 1: Run PU1 test
|
||||||
await run_auto_test_pu1(1, ploop_setpoint)
|
await run_auto_test_pu1(1, ploop_setpoint)
|
||||||
|
|
||||||
# Step 2: PU2 sequence
|
# Step 2: PU2 sequence
|
||||||
await send_command_with_delay(
|
await send_command_with_delay("PRE-PRODUCTION", 2, delay_s=0, ploop_setpoint=ploop_setpoint)
|
||||||
"PRE-PRODUCTION", 2, delay_s=0, ploop_setpoint=ploop_setpoint
|
await send_command_with_delay("PRODUCTION", 2, delay_s=180, ploop_setpoint=ploop_setpoint)
|
||||||
)
|
|
||||||
await send_command_with_delay(
|
|
||||||
"PRODUCTION", 2, delay_s=180, ploop_setpoint=ploop_setpoint
|
|
||||||
)
|
|
||||||
await set_patients_with_delay(15, delay_s=60)
|
await set_patients_with_delay(15, delay_s=60)
|
||||||
await set_patients_with_delay(0, delay_s=60)
|
await set_patients_with_delay(0, delay_s=60)
|
||||||
await send_command_with_delay("IDLE", 2, delay_s=60, ploop_setpoint=ploop_setpoint)
|
await send_command_with_delay("IDLE", 2, delay_s=60, ploop_setpoint=ploop_setpoint)
|
||||||
await send_command_with_delay("IDLE", 1, delay_s=60, ploop_setpoint=ploop_setpoint)
|
await send_command_with_delay("IDLE", 1, delay_s=60, ploop_setpoint=ploop_setpoint)
|
||||||
logging.info("[AUTO TEST] Finished PU1 + PU2 test")
|
logging.info("[AUTO TEST] Finished PU1 + PU2 test")
|
||||||
|
|
||||||
|
|
||||||
@router.post("/test/auto/3")
|
@router.post("/test/auto/3")
|
||||||
async def auto_test_pu3():
|
async def auto_test_pu3():
|
||||||
# Call the function for PU3 auto test
|
# Call the function for PU3 auto test
|
||||||
logging.info("Start auto test of 3 PU")
|
logging.info("Start auto test of 3 PU")
|
||||||
return {"status": "started", "pu": 3}
|
return {"status": "started", "pu": 3}
|
||||||
|
|
||||||
|
|
||||||
# PATIENT SKID HELPERS
|
# PATIENT SKID HELPERS
|
||||||
async def update_latest_flow():
|
async def update_latest_flow():
|
||||||
global active_PUs
|
global active_PUs
|
||||||
|
|
@ -455,11 +452,21 @@ async def update_latest_flow():
|
||||||
latest_flow = int(data["log"])
|
latest_flow = int(data["log"])
|
||||||
logging.debug(f"Updated flow: {latest_flow}")
|
logging.debug(f"Updated flow: {latest_flow}")
|
||||||
latest_data["PatientSkid"]["QSkid"] = latest_flow
|
latest_data["PatientSkid"]["QSkid"] = latest_flow
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Error fetching flow: {e}")
|
logging.error(f"Error fetching flow: {e}")
|
||||||
await asyncio.sleep(1.0)
|
await asyncio.sleep(1.0)
|
||||||
|
|
||||||
|
def stop_patient_skid():
|
||||||
|
try:
|
||||||
|
url = f"http://192.168.1.28:8000/stop_test"
|
||||||
|
response = httpx.get(url, timeout=5.0)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
return {"status": "success", "detail": response.json()}
|
||||||
|
else:
|
||||||
|
raise HTTPException(status_code=502, detail=f"Remote server error: {response.text}")
|
||||||
|
except httpx.RequestError as e:
|
||||||
|
raise HTTPException(status_code=500, detail=f"Request to external server failed: {str(e)}")
|
||||||
|
|
||||||
def set_patient_skid_users(count: int = 1):
|
def set_patient_skid_users(count: int = 1):
|
||||||
try:
|
try:
|
||||||
|
|
@ -469,20 +476,14 @@ def set_patient_skid_users(count: int = 1):
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
return {"status": "success", "detail": response.json()}
|
return {"status": "success", "detail": response.json()}
|
||||||
else:
|
else:
|
||||||
raise HTTPException(
|
raise HTTPException(status_code=502, detail=f"Remote server error: {response.text}")
|
||||||
status_code=502, detail=f"Remote server error: {response.text}"
|
|
||||||
)
|
|
||||||
except httpx.RequestError as e:
|
except httpx.RequestError as e:
|
||||||
raise HTTPException(
|
raise HTTPException(status_code=500, detail=f"Request to external server failed: {str(e)}")
|
||||||
status_code=500, detail=f"Request to external server failed: {str(e)}"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
app.include_router(router)
|
app.include_router(router)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
import uvicorn
|
import uvicorn
|
||||||
|
|
||||||
uvicorn.run(
|
uvicorn.run(
|
||||||
"main:app",
|
"main:app",
|
||||||
host="127.0.0.1",
|
host="127.0.0.1",
|
||||||
|
|
|
||||||
94
protocol_decoder.py
Normal file
94
protocol_decoder.py
Normal file
|
|
@ -0,0 +1,94 @@
|
||||||
|
from typing import Dict, Any, List, Tuple
|
||||||
|
import re
|
||||||
|
|
||||||
|
RE_PU_VP = re.compile(r'^P(?P<pu>[1-3])VP$')
|
||||||
|
RE_PU_CO = re.compile(r'^P(?P<pu>[1-3])CO$')
|
||||||
|
RE_DOCK_VP = re.compile(r'^D0VP$')
|
||||||
|
RE_DOCK_CO = re.compile(r'^(D0CO|DOCO)$') # be tolerant
|
||||||
|
|
||||||
|
def _to_i(s: str) -> int:
|
||||||
|
try: return int(s.strip())
|
||||||
|
except: return 0
|
||||||
|
|
||||||
|
def _to_pct(s: str) -> int:
|
||||||
|
try: return int(s.strip())
|
||||||
|
except:
|
||||||
|
try: return int(float(s))
|
||||||
|
except: return 0
|
||||||
|
|
||||||
|
def _to_bool(s: str) -> bool:
|
||||||
|
return str(s).strip() in ("1","true","True","TRUE")
|
||||||
|
|
||||||
|
def _dock_vp(vals: List[str]) -> Dict[str, Any]:
|
||||||
|
names = ["mv01","mv09","mv10","mv11","mmv01","mmv02","mmv03","sv01","sv02","sv03"]
|
||||||
|
out: Dict[str, Any] = {}
|
||||||
|
for k, v in zip(names, vals):
|
||||||
|
out[k] = _to_bool(v) if k.startswith("sv") else _to_pct(v)
|
||||||
|
return out
|
||||||
|
|
||||||
|
def _dock_co(vals: List[str]) -> Dict[str, Any]:
|
||||||
|
out: Dict[str, Any] = {}
|
||||||
|
for name, v in zip(["cs01","cs02"], vals):
|
||||||
|
q = _to_i(v) # 0.1 µS
|
||||||
|
out[f"{name}_0p1uS"] = q
|
||||||
|
out[f"{name}_uS"] = q*0.1
|
||||||
|
return out
|
||||||
|
|
||||||
|
def _pu_vp(pu: int, vals: List[str]) -> Dict[str, Any]:
|
||||||
|
out: Dict[str, Any] = {"pu": pu}
|
||||||
|
for k, v in zip(["mv02","mv03","mv04","mv05","mv06","mv07","mv08"], vals):
|
||||||
|
out[k] = _to_pct(v)
|
||||||
|
return out
|
||||||
|
|
||||||
|
def _pu_co(pu: int, vals: List[str]) -> Dict[str, Any]:
|
||||||
|
out: Dict[str, Any] = {"pu": pu}
|
||||||
|
for name, v in zip(["cs03","cs04","cs05"], vals):
|
||||||
|
q = _to_i(v)
|
||||||
|
out[f"{name}_0p1uS"] = q
|
||||||
|
out[f"{name}_uS"] = q*0.1
|
||||||
|
return out
|
||||||
|
|
||||||
|
def decode_frames(buffer: bytes) -> Tuple[List[Tuple[bytes, Dict[str, Any]]], bytes, int]:
|
||||||
|
msgs: List[Tuple[bytes, Dict[str, Any]]] = []
|
||||||
|
errors = 0
|
||||||
|
parts = buffer.split(b"\n")
|
||||||
|
remaining = parts[-1]
|
||||||
|
|
||||||
|
for line in parts[:-1]:
|
||||||
|
raw = line.strip().rstrip(b"\r")
|
||||||
|
if not raw: continue
|
||||||
|
try:
|
||||||
|
t = raw.decode("utf-8")
|
||||||
|
fields = [f.strip() for f in t.split(",")]
|
||||||
|
if len(fields) < 3: raise ValueError("too few fields")
|
||||||
|
version, msg_id, ts_ms = fields[0], fields[1], fields[2]
|
||||||
|
data = fields[3:]
|
||||||
|
|
||||||
|
parsed: Dict[str, Any] = {"version":version, "msg_id":msg_id, "ts_ms": int(ts_ms)}
|
||||||
|
|
||||||
|
if RE_DOCK_VP.match(msg_id):
|
||||||
|
parsed.update({"src":"dock","type":"valves"})
|
||||||
|
parsed.update(_dock_vp(data))
|
||||||
|
elif RE_DOCK_CO.match(msg_id):
|
||||||
|
parsed.update({"src":"dock","type":"cond"})
|
||||||
|
parsed.update(_dock_co(data))
|
||||||
|
else:
|
||||||
|
m = RE_PU_VP.match(msg_id)
|
||||||
|
if m:
|
||||||
|
pu = int(m.group("pu"))
|
||||||
|
parsed.update({"src":"pu","type":"valves","pu":pu})
|
||||||
|
parsed.update(_pu_vp(pu, data))
|
||||||
|
else:
|
||||||
|
m = RE_PU_CO.match(msg_id)
|
||||||
|
if m:
|
||||||
|
pu = int(m.group("pu"))
|
||||||
|
parsed.update({"src":"pu","type":"cond","pu":pu})
|
||||||
|
parsed.update(_pu_co(pu, data))
|
||||||
|
else:
|
||||||
|
parsed.update({"src":"unknown","type":"raw","data":data})
|
||||||
|
|
||||||
|
msgs.append((raw, parsed))
|
||||||
|
except Exception:
|
||||||
|
errors += 1
|
||||||
|
|
||||||
|
return msgs, remaining, errors
|
||||||
120
serial_csv_logger.py
Normal file
120
serial_csv_logger.py
Normal file
|
|
@ -0,0 +1,120 @@
|
||||||
|
# serial_csv_logger.py
|
||||||
|
import os, csv, datetime, json
|
||||||
|
from typing import Dict, Any, Tuple, Optional
|
||||||
|
|
||||||
|
class SerialCsvLogger:
|
||||||
|
"""
|
||||||
|
Writes parsed serial frames to CSV, segregated by message type:
|
||||||
|
- D0VP_YYYY-MM-DD.csv (Docking valves)
|
||||||
|
- D0CO_YYYY-MM-DD.csv (Docking conductivity)
|
||||||
|
- P1VP_YYYY-MM-DD.csv (PU1 valves), P2VP..., P3VP...
|
||||||
|
- P1CO_YYYY-MM-DD.csv (PU1 conductivity), etc.
|
||||||
|
- Unknown_YYYY-MM-DD.csv (for anything unmatched)
|
||||||
|
"""
|
||||||
|
def __init__(self, out_dir: str = "serial_logs", rotate_daily: bool = True):
|
||||||
|
self.out_dir = out_dir
|
||||||
|
self.rotate_daily = rotate_daily
|
||||||
|
self._writers: Dict[str, Tuple[csv.DictWriter, Any, str]] = {} # key -> (writer, file, date_str)
|
||||||
|
os.makedirs(self.out_dir, exist_ok=True)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
for _, (_, f, _) in self._writers.items():
|
||||||
|
try: f.close()
|
||||||
|
except: pass
|
||||||
|
self._writers.clear()
|
||||||
|
|
||||||
|
# ---------- public API ----------
|
||||||
|
def log(self, parsed: Dict[str, Any]):
|
||||||
|
msg_id = parsed.get("msg_id", "Unknown")
|
||||||
|
date_str = datetime.date.today().isoformat() if self.rotate_daily else "all"
|
||||||
|
key = f"{msg_id}"
|
||||||
|
|
||||||
|
# rotate if day changed
|
||||||
|
if key in self._writers and self._writers[key][2] != date_str:
|
||||||
|
self._writers[key][1].close()
|
||||||
|
del self._writers[key]
|
||||||
|
|
||||||
|
writer, _, _ = self._ensure_writer(key, msg_id, date_str)
|
||||||
|
row = self._build_row(msg_id, parsed)
|
||||||
|
writer.writerow(row)
|
||||||
|
|
||||||
|
# ---------- internals ----------
|
||||||
|
def _ensure_writer(self, key: str, msg_id: str, date_str: str):
|
||||||
|
if key in self._writers:
|
||||||
|
return self._writers[key]
|
||||||
|
|
||||||
|
fname = f"{msg_id}_{date_str}.csv"
|
||||||
|
path = os.path.join(self.out_dir, fname)
|
||||||
|
f = open(path, "a", newline="")
|
||||||
|
headers = self._headers_for(msg_id)
|
||||||
|
writer = csv.DictWriter(f, fieldnames=headers)
|
||||||
|
|
||||||
|
# write header only if file is empty
|
||||||
|
if f.tell() == 0:
|
||||||
|
writer.writeheader()
|
||||||
|
|
||||||
|
self._writers[key] = (writer, f, date_str)
|
||||||
|
return self._writers[key]
|
||||||
|
|
||||||
|
def _headers_for(self, msg_id: str):
|
||||||
|
# Common heads
|
||||||
|
base = ["ts_iso", "ts_ms", "version", "msg_id"]
|
||||||
|
|
||||||
|
if msg_id == "D0VP":
|
||||||
|
return base + ["mv01","mv09","mv10","mv11","mmv01","mmv02","mmv03","sv01","sv02","sv03"]
|
||||||
|
|
||||||
|
if msg_id in ("D0CO", "DOCO"):
|
||||||
|
# write both scaled (uS) and raw (0.1 uS) for traceability
|
||||||
|
return base + ["cs01_uS","cs01_0p1uS","cs02_uS","cs02_0p1uS"]
|
||||||
|
|
||||||
|
if msg_id.endswith("VP") and len(msg_id) == 4 and msg_id[0] == "P":
|
||||||
|
# P1VP / P2VP / P3VP
|
||||||
|
return base + ["pu","mv02","mv03","mv04","mv05","mv06","mv07","mv08"]
|
||||||
|
|
||||||
|
if msg_id.endswith("CO") and len(msg_id) == 4 and msg_id[0] == "P":
|
||||||
|
# P1CO / P2CO / P3CO
|
||||||
|
return base + ["pu","cs03_uS","cs03_0p1uS","cs04_uS","cs04_0p1uS","cs05_uS","cs05_0p1uS"]
|
||||||
|
|
||||||
|
# fallback
|
||||||
|
return base + ["payload_json"]
|
||||||
|
|
||||||
|
def _build_row(self, msg_id: str, p: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
|
ts_iso = datetime.datetime.fromtimestamp(p.get("ts_ms", 0)/1000.0).isoformat() if "ts_ms" in p else ""
|
||||||
|
row = {"ts_iso": ts_iso, "ts_ms": p.get("ts_ms", ""), "version": p.get("version",""), "msg_id": msg_id}
|
||||||
|
|
||||||
|
if msg_id == "D0VP":
|
||||||
|
row.update({
|
||||||
|
"mv01": p.get("mv01"), "mv09": p.get("mv09"), "mv10": p.get("mv10"), "mv11": p.get("mv11"),
|
||||||
|
"mmv01": p.get("mmv01"), "mmv02": p.get("mmv02"), "mmv03": p.get("mmv03"),
|
||||||
|
"sv01": p.get("sv01"), "sv02": p.get("sv02"), "sv03": p.get("sv03"),
|
||||||
|
})
|
||||||
|
return row
|
||||||
|
|
||||||
|
if msg_id in ("D0CO", "DOCO"):
|
||||||
|
row.update({
|
||||||
|
"cs01_uS": p.get("cs01_uS"), "cs01_0p1uS": p.get("cs01_0p1uS"),
|
||||||
|
"cs02_uS": p.get("cs02_uS"), "cs02_0p1uS": p.get("cs02_0p1uS"),
|
||||||
|
})
|
||||||
|
return row
|
||||||
|
|
||||||
|
if msg_id.endswith("VP") and len(msg_id) == 4 and msg_id[0] == "P":
|
||||||
|
row.update({
|
||||||
|
"pu": p.get("pu"),
|
||||||
|
"mv02": p.get("mv02"), "mv03": p.get("mv03"), "mv04": p.get("mv04"),
|
||||||
|
"mv05": p.get("mv05"), "mv06": p.get("mv06"), "mv07": p.get("mv07"), "mv08": p.get("mv08"),
|
||||||
|
})
|
||||||
|
return row
|
||||||
|
|
||||||
|
if msg_id.endswith("CO") and len(msg_id) == 4 and msg_id[0] == "P":
|
||||||
|
row.update({
|
||||||
|
"pu": p.get("pu"),
|
||||||
|
"cs03_uS": p.get("cs03_uS"), "cs03_0p1uS": p.get("cs03_0p1uS"),
|
||||||
|
"cs04_uS": p.get("cs04_uS"), "cs04_0p1uS": p.get("cs04_0p1uS"),
|
||||||
|
"cs05_uS": p.get("cs05_uS"), "cs05_0p1uS": p.get("cs05_0p1uS"),
|
||||||
|
})
|
||||||
|
return row
|
||||||
|
|
||||||
|
# Unknown → keep full payload as JSON for later inspection
|
||||||
|
pay = {k:v for k,v in p.items() if k not in ("version","msg_id","ts_ms")}
|
||||||
|
row["payload_json"] = json.dumps(pay, separators=(",",":"))
|
||||||
|
return row
|
||||||
234
serial_manager.py
Normal file
234
serial_manager.py
Normal file
|
|
@ -0,0 +1,234 @@
|
||||||
|
# serial_manager.py
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
import csv
|
||||||
|
from collections import deque
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Any, Callable, Deque, Dict, List, Optional, Tuple
|
||||||
|
|
||||||
|
import serial # provided by python3-serial
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SerialConfig:
|
||||||
|
"""
|
||||||
|
Configuration for the read-only serial intake.
|
||||||
|
"""
|
||||||
|
port: str = "/dev/ttyUSB0"
|
||||||
|
baudrate: int = 115200
|
||||||
|
bytesize: int = serial.EIGHTBITS
|
||||||
|
parity: str = serial.PARITY_NONE
|
||||||
|
stopbits: int = serial.STOPBITS_ONE
|
||||||
|
timeout: float = 0.05
|
||||||
|
rtscts: bool = False
|
||||||
|
dsrdtr: bool = False
|
||||||
|
xonxoff: bool = False
|
||||||
|
ring_capacity: int = 5000
|
||||||
|
# If set, a single "generic" CSV will be written here (append mode).
|
||||||
|
# If you want segregated CSVs per message type, leave this as None and
|
||||||
|
# supply an `on_message` callback that writes where you want.
|
||||||
|
csv_log_path: Optional[str] = None # e.g. "/home/pi/hmi/serial_log.csv"
|
||||||
|
|
||||||
|
|
||||||
|
class SerialStore:
|
||||||
|
"""
|
||||||
|
Thread-safe store for recent parsed messages and intake stats.
|
||||||
|
Stores parsed dicts as returned by the decoder.
|
||||||
|
"""
|
||||||
|
def __init__(self, capacity: int):
|
||||||
|
self._buf: Deque[Dict[str, Any]] = deque(maxlen=capacity)
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
self._stats = {
|
||||||
|
"frames_in": 0,
|
||||||
|
"frames_ok": 0,
|
||||||
|
"frames_bad": 0,
|
||||||
|
"restarts": 0,
|
||||||
|
"last_err": "",
|
||||||
|
}
|
||||||
|
self._latest_by_id: Dict[str, Dict[str, Any]] = {}
|
||||||
|
|
||||||
|
def add(self, msg: Dict[str, Any], ok: bool = True):
|
||||||
|
with self._lock:
|
||||||
|
self._buf.append(msg)
|
||||||
|
self._stats["frames_in"] += 1
|
||||||
|
if ok:
|
||||||
|
self._stats["frames_ok"] += 1
|
||||||
|
else:
|
||||||
|
self._stats["frames_bad"] += 1
|
||||||
|
mid = msg.get("msg_id")
|
||||||
|
if mid:
|
||||||
|
self._latest_by_id[mid] = msg
|
||||||
|
|
||||||
|
def latest(self, n: int = 100) -> List[Dict[str, Any]]:
|
||||||
|
with self._lock:
|
||||||
|
return list(self._buf)[-n:]
|
||||||
|
|
||||||
|
def latest_by_id(self) -> Dict[str, Dict[str, Any]]:
|
||||||
|
with self._lock:
|
||||||
|
return dict(self._latest_by_id)
|
||||||
|
|
||||||
|
def stats(self) -> Dict[str, Any]:
|
||||||
|
with self._lock:
|
||||||
|
return dict(self._stats)
|
||||||
|
|
||||||
|
def set_error(self, err: str):
|
||||||
|
with self._lock:
|
||||||
|
self._stats["last_err"] = err
|
||||||
|
|
||||||
|
def inc_restart(self):
|
||||||
|
with self._lock:
|
||||||
|
self._stats["restarts"] += 1
|
||||||
|
|
||||||
|
|
||||||
|
class SerialReader:
|
||||||
|
"""
|
||||||
|
Background read-only serial reader.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
cfg: SerialConfig
|
||||||
|
store: SerialStore
|
||||||
|
decoder: function(buffer: bytes) ->
|
||||||
|
(messages: List[Tuple[raw_frame: bytes, parsed: Dict]], remaining: bytes, errors: int)
|
||||||
|
on_message: optional callback called for each parsed dict (e.g., segregated CSV logger)
|
||||||
|
"""
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
cfg: SerialConfig,
|
||||||
|
store: SerialStore,
|
||||||
|
decoder: Callable[[bytes], Tuple[List[Tuple[bytes, Dict[str, Any]]], bytes, int]],
|
||||||
|
on_message: Optional[Callable[[Dict[str, Any]], None]] = None,
|
||||||
|
):
|
||||||
|
self.cfg = cfg
|
||||||
|
self.store = store
|
||||||
|
self.decoder = decoder
|
||||||
|
self.on_message = on_message
|
||||||
|
|
||||||
|
self._ser: Optional[serial.Serial] = None
|
||||||
|
self._th: Optional[threading.Thread] = None
|
||||||
|
self._stop = threading.Event()
|
||||||
|
self._buffer = b""
|
||||||
|
|
||||||
|
# Optional generic CSV (single file) if cfg.csv_log_path is set
|
||||||
|
self._csv_file = None
|
||||||
|
self._csv_writer = None
|
||||||
|
|
||||||
|
# ---------- lifecycle ----------
|
||||||
|
def start(self):
|
||||||
|
self._stop.clear()
|
||||||
|
self._open_serial()
|
||||||
|
self._open_csv()
|
||||||
|
self._th = threading.Thread(target=self._run, name="SerialReader", daemon=True)
|
||||||
|
self._th.start()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self._stop.set()
|
||||||
|
if self._th and self._th.is_alive():
|
||||||
|
self._th.join(timeout=2.0)
|
||||||
|
self._close_serial()
|
||||||
|
self._close_csv()
|
||||||
|
|
||||||
|
# ---------- internals ----------
|
||||||
|
def _open_serial(self):
|
||||||
|
try:
|
||||||
|
self._ser = serial.Serial(
|
||||||
|
port=self.cfg.port,
|
||||||
|
baudrate=self.cfg.baudrate,
|
||||||
|
bytesize=self.cfg.bytesize,
|
||||||
|
parity=self.cfg.parity,
|
||||||
|
stopbits=self.cfg.stopbits,
|
||||||
|
timeout=self.cfg.timeout,
|
||||||
|
rtscts=self.cfg.rtscts,
|
||||||
|
dsrdtr=self.cfg.dsrdtr,
|
||||||
|
xonxoff=self.cfg.xonxoff,
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
self.store.set_error(f"Open error: {e}")
|
||||||
|
self._ser = None
|
||||||
|
|
||||||
|
def _close_serial(self):
|
||||||
|
try:
|
||||||
|
if self._ser and self._ser.is_open:
|
||||||
|
self._ser.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
self._ser = None
|
||||||
|
|
||||||
|
def _open_csv(self):
|
||||||
|
if not self.cfg.csv_log_path:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
self._csv_file = open(self.cfg.csv_log_path, "a", newline="")
|
||||||
|
self._csv_writer = csv.writer(self._csv_file)
|
||||||
|
# Write header only if file is empty (avoid duplicates on restart)
|
||||||
|
if self._csv_file.tell() == 0:
|
||||||
|
self._csv_writer.writerow(["ts_ms", "msg_id", "raw_hex", "parsed"])
|
||||||
|
self._csv_file.flush()
|
||||||
|
except Exception as e:
|
||||||
|
self.store.set_error(f"CSV open error: {e}")
|
||||||
|
self._csv_file = None
|
||||||
|
self._csv_writer = None
|
||||||
|
|
||||||
|
def _close_csv(self):
|
||||||
|
try:
|
||||||
|
if self._csv_file:
|
||||||
|
self._csv_file.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
self._csv_file = None
|
||||||
|
self._csv_writer = None
|
||||||
|
|
||||||
|
def _log_csv(self, raw: bytes, parsed: Dict[str, Any]):
|
||||||
|
"""Write to the optional single generic CSV."""
|
||||||
|
if not self._csv_writer:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
self._csv_writer.writerow(
|
||||||
|
[parsed.get("ts_ms"), parsed.get("msg_id"), raw.hex(), parsed]
|
||||||
|
)
|
||||||
|
self._csv_file.flush()
|
||||||
|
except Exception as e:
|
||||||
|
self.store.set_error(f"CSV write error: {e}")
|
||||||
|
|
||||||
|
def _run(self):
|
||||||
|
backoff = 0.5
|
||||||
|
while not self._stop.is_set():
|
||||||
|
if not self._ser or not self._ser.is_open:
|
||||||
|
# reconnect with exponential backoff (capped)
|
||||||
|
self._close_serial()
|
||||||
|
time.sleep(backoff)
|
||||||
|
self.store.inc_restart()
|
||||||
|
self._open_serial()
|
||||||
|
backoff = min(backoff * 1.5, 5.0)
|
||||||
|
continue
|
||||||
|
|
||||||
|
backoff = 0.5
|
||||||
|
|
||||||
|
try:
|
||||||
|
data = self._ser.read(4096) # non-blocking due to timeout
|
||||||
|
if data:
|
||||||
|
self._buffer += data
|
||||||
|
frames, remaining, errors = self.decoder(self._buffer)
|
||||||
|
self._buffer = remaining
|
||||||
|
|
||||||
|
for raw, parsed in frames:
|
||||||
|
# store
|
||||||
|
self.store.add(parsed, ok=True)
|
||||||
|
# optional generic CSV
|
||||||
|
self._log_csv(raw, parsed)
|
||||||
|
# optional segregated sink
|
||||||
|
if self.on_message:
|
||||||
|
try:
|
||||||
|
self.on_message(parsed)
|
||||||
|
except Exception as e:
|
||||||
|
self.store.set_error(f"CSV sink error: {e}")
|
||||||
|
|
||||||
|
# count decode errors
|
||||||
|
for _ in range(errors):
|
||||||
|
self.store.add({"error": "decode"}, ok=False)
|
||||||
|
else:
|
||||||
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.store.set_error(f"Read/Decode error: {e}")
|
||||||
|
self._close_serial()
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
@ -1,12 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
set -e
|
|
||||||
|
|
||||||
echo "[UPDATE] Pulling latest code..."
|
|
||||||
cd /home/hmi/Desktop/HMI || exit 1
|
|
||||||
git reset --hard HEAD
|
|
||||||
git pull origin main
|
|
||||||
|
|
||||||
echo "[RESTART] Restarting HMI service..."
|
|
||||||
sudo /bin/systemctl restart hmi.service
|
|
||||||
|
|
||||||
echo "[DONE] HMI updated."
|
|
||||||
|
|
@ -1,30 +0,0 @@
|
||||||
import canopen
|
|
||||||
import os
|
|
||||||
|
|
||||||
|
|
||||||
class ValveBackend:
|
|
||||||
def __init__(self, eds_file: str, node_id: int = 0x0F):
|
|
||||||
self.eds_file = eds_file
|
|
||||||
self.node_id = node_id
|
|
||||||
self.network = None
|
|
||||||
self.node = None
|
|
||||||
|
|
||||||
def connect(self):
|
|
||||||
try:
|
|
||||||
self.network = canopen.Network()
|
|
||||||
self.network.connect(channel='can0', bustype='socketcan')
|
|
||||||
self.node = canopen.RemoteNode(self.node_id, self.eds_file)
|
|
||||||
self.network.add_node(self.node)
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
|
||||||
print(f"[VALVE CONNECT ERROR] {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def send_command(self, opening: int):
|
|
||||||
try:
|
|
||||||
if self.node is None:
|
|
||||||
raise RuntimeError("Valve node not initialized")
|
|
||||||
self.node.sdo[0x6000].raw = opening
|
|
||||||
print(f"[VALVE] Opening set to {opening}")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"[VALVE CMD ERROR] {e}")
|
|
||||||
Loading…
Reference in New Issue
Block a user