NorthStar-HMI/main.py
Etienne Chassaing 7fd287fd06 2 PUs sequence
2025-08-28 14:30:36 +02:00

611 lines
20 KiB
Python

from fastapi import FastAPI, HTTPException, Form
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
import logging
import os
from fastapi import Request, APIRouter
import platform
from fastapi.templating import Jinja2Templates
from starlette.middleware.sessions import SessionMiddleware
from starlette.exceptions import HTTPException as StarletteHTTPException
from starlette.status import HTTP_302_FOUND
import json
from pathlib import Path
from typing import Dict, Any
from fastapi import Query
import asyncio
import datetime
import csv
from collections import deque
import numpy as np
import aiohttp
from hardware.patient_skid import handle_patient_skid_for_idle, set_patient_skid_users
from serial_manager import SerialConfig, SerialStore, SerialReader
from protocol_decoder import decode_frames
from serial_csv_logger import SerialCsvLogger # <-- CSV logger
if platform.system() in ["Darwin"]: # macOS or Windows
from MockCAN import CANBackend
logging.basicConfig(level=logging.INFO)
else:
from hardware.classCAN import CANBackend # Your real backend
logging.basicConfig(level=logging.INFO)
logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
app = FastAPI()
app.add_middleware(SessionMiddleware, secret_key="your_super_secret_key")
router = APIRouter()
templates = Jinja2Templates(directory="templates")
can_backend = CANBackend()
# Serve static files (HTML, JS, CSS)
app.mount("/static", StaticFiles(directory="static"), name="static")
## BUFFER CREATION AND HELPER
# Global object to store the latest data
latest_data: Dict[str, Any] = {
"PU_1": None,
"PU_2": None,
"PU_3": None,
"DS": None,
"PatientSkid": {"QSkid": 0.0},
}
latest_setpoints: Dict[str, Any] = {
"PU_1": {"Ploop_sp": 0.0, "Qperm_sp": 0.0},
"PU_2": {"Ploop_sp": 0.0, "Qperm_sp": 0.0},
"PU_3": {"Ploop_sp": 0.0, "Qperm_sp": 0.0},
}
active_PUs: list[int] = []
VALID_STATES = {
"IDLE",
"PRE-PRODUCTION",
"PRODUCTION",
"FIRST_START",
"THERMALLOOPCLEANING",
"DISINFECTION",
"SLEEP",
}
# Dictionary to hold running tasks
tasks: dict[str, asyncio.Task] = {}
# RECORDER
recording_flag = False
recording_task = None
recording_writer = None
recording_file = None
write_buffer = deque()
flush_interval = 1.0 # flush every 1 second
last_flush_time = datetime.datetime.now()
# ---- Serial intake globals ----
serial_store = SerialStore(capacity=5000)
serial_reader: SerialReader | None = None
serial_csv: SerialCsvLogger | None = None # <-- added
## LOGGING
def format_PU_data(data):
return {
"timestamp": datetime.datetime.now().isoformat(),
"Qperm": np.round(data.get("FM2", 0.0), 1),
"Qdilute": np.round(data.get("FM1", 0.0), 1),
"Qdrain": np.round(data.get("FM4", 0.0), 1),
"Qrecirc": np.round(data.get("FM3", 0.0), 1),
"QdrainEDI": np.round(data.get("FM2", 0.0), 1) - np.round(data.get("FM1", 0.0), 1),
"Pro": np.round(data.get("PS2", 0.0), 2),
"Pdilute": np.round(data.get("PS3", 0.0), 2),
"Pretentate": np.round(data.get("PS1", 0.0), 2),
"Cfeed": data.get("Conductivity_Feed", 0.0),
"Cperm": data.get("Conductivity_Permeate", 0.0),
"Cdilute": data.get("Conductivity_Product", 0.0),
"MV02": np.round(data.get("MV02", 0.0), 1),
"MV02_sp": np.round(data.get("MV02_sp", 0.0), 1),
"MV03": np.round(data.get("MV03", 0.0), 1),
"MV03_sp": np.round(data.get("MV03_sp", 0.0), 1),
"MV04": np.round(data.get("MV04", 0.0), 1),
"MV04_sp": np.round(data.get("MV04_sp", 0.0), 1),
"MV05": np.round(data.get("MV05", 0.0), 1),
"MV05_sp": np.round(data.get("MV05_sp", 0.0), 1),
"MV06": np.round(data.get("MV06", 0.0), 1),
"MV06_sp": np.round(data.get("MV06_sp", 0.0), 1),
"MV07": np.round(data.get("MV07", 0.0), 1),
"MV07_sp": np.round(data.get("MV07_sp", 0.0), 1),
"MV08": np.round(data.get("MV08", 0.0), 1),
"MV08_sp": np.round(data.get("MV08_sp", 0.0), 1),
"Qdrain_sp" : max(60*np.round(data.get("Qdrain_sp", 0.0), 2),350.0),
}
def format_DS_data(data):
q_conso = max(np.round(data.get("Inlet_flow", 0.0), 1) - np.round(data.get("Outlet_flow", 0.0), 1),0)
return {
"timestamp": datetime.datetime.now().isoformat(),
"Qconso": q_conso ,
"TankLevel": np.round(data.get("TankLevel", 0.0), 2),
"Qinlet": np.round(data.get("Inlet_flow", 0.0), 1),
"Qoutlet": np.round(data.get("Outlet_flow", 0.0), 1),
}
## Fetch setpoints
def update_setpoints(p_loop_setpoint : float, q_perm_setpoint : float, pu : int):
global latest_setpoints
pu_key = "PU_"+str(pu)
latest_setpoints[pu_key]["Ploop_sp"] = p_loop_setpoint
latest_setpoints[pu_key]["Qperm_sp"] = q_perm_setpoint
def format_setpoints(pu: int): # THis is a bit convoluted to pass from an object to another but it works
global latest_setpoints, latest_data
pu_key = "PU_" + str(pu)
latest_data[pu_key]["Ploop_sp"] = latest_setpoints[pu_key]["Ploop_sp"]
latest_data[pu_key]["Qperm_sp"] = latest_setpoints[pu_key]["Qperm_sp"]
# CREDENTIALS
CREDENTIAL_PATH = Path("credentials.json")
if CREDENTIAL_PATH.exists():
with CREDENTIAL_PATH.open("r") as f:
CREDENTIALS = json.load(f)
else:
CREDENTIALS = {}
USERNAME = CREDENTIALS["username"]
PASSWORD = CREDENTIALS["password"]
# ======== LOGIN & SESSION HANDLING ========
def require_login(request: Request):
user = request.session.get("user")
if user != USERNAME:
# raise 302 to trigger redirection manually (FastAPI doesn't support redirects from Depends directly)
raise StarletteHTTPException(status_code=302, detail="Redirect to login")
return user
@app.get("/", response_class=HTMLResponse)
def login_form(request: Request):
return templates.TemplateResponse("login.html", {"request": request})
@app.post("/login")
def login(request: Request, username: str = Form(...), password: str = Form(...)):
if username == USERNAME and password == PASSWORD:
request.session["user"] = username
return RedirectResponse("/control", status_code=HTTP_302_FOUND)
return templates.TemplateResponse(
"login.html", {"request": request, "error": "Invalid credentials.json"}
)
@app.get("/logout")
def logout(request: Request):
request.session.clear()
return RedirectResponse("/", status_code=HTTP_302_FOUND)
# ======== PROTECTED INTERFACE / STARTUP-SHUTDOWN ========
@app.on_event("startup")
async def startup_event():
# ----- CSV logger -----
global serial_csv
serial_csv = SerialCsvLogger(out_dir="serial_logs", rotate_daily=True)
# ----- start the serial reader -----
global serial_reader
cfg = SerialConfig(
port=os.getenv("SERIAL_PORT", "/dev/ttyUSB0"),
baudrate=int(os.getenv("SERIAL_BAUD", "115200")),
csv_log_path=None, # disable the generic CSV inside reader; use segregated logger instead
ring_capacity=int(os.getenv("SERIAL_RING", "5000")),
)
serial_reader = SerialReader(
cfg,
serial_store,
decoder=decode_frames,
on_message=(lambda p: serial_csv.log(p)) # write CSV per message type
)
serial_reader.start()
# ----- your existing tasks -----
asyncio.create_task(update_latest_data())
asyncio.create_task(update_latest_flow())
@app.on_event("shutdown")
def _serial_stop():
if serial_reader:
serial_reader.stop()
if serial_csv:
serial_csv.close()
# ======== PAGES ========
@app.get("/control", response_class=HTMLResponse)
def control_page(request: Request):
can_backend.connect()
if request.session.get("user") != USERNAME:
return RedirectResponse("/", status_code=HTTP_302_FOUND)
return templates.TemplateResponse("control.html", {"request": request})
@app.get("/monitor-DS", response_class=HTMLResponse)
def monitor_page(request: Request):
with open("static/monitor_DS.html") as f:
return HTMLResponse(f.read())
@app.get("/monitor-PU", response_class=HTMLResponse)
def monitor_page(request: Request):
with open("static/monitor_PU.html") as f:
return HTMLResponse(f.read())
@app.get("/multi-monitor-PU", response_class=HTMLResponse)
def monitor_page(request: Request):
with open("static/multi_pu_dashboard.html") as f:
return HTMLResponse(f.read())
# ======== SERIAL API ========
@app.get("/serial/messages")
def serial_messages(n: int = 100):
return serial_store.latest(min(max(n, 1), 1000))
@app.get("/serial/stats")
def serial_stats():
return serial_store.stats()
@app.get("/serial/snapshot")
def serial_snapshot():
return serial_store.latest_by_id()
# ======== CAN + BACKEND ROUTES ========
@app.post("/connect_toggle")
def connect_toggle():
logging.info(f"Toggling CAN connection, CAN is {can_backend.connected}")
if can_backend.connected:
can_backend.shutdown()
logging.info("Shutting down CAN connection...")
return {"connected": False}
else:
success = can_backend.connect()
if not success:
raise HTTPException(status_code=500, detail="Connection failed.")
return {"connected": can_backend.connected}
@app.get("/is_connected")
def is_connected():
return {"connected": can_backend.connected}
# PU CONTROL
def validate_state(state: str) -> str:
"""Normalize and validate the requested state."""
state = state.upper()
if state not in VALID_STATES:
raise HTTPException(status_code=400, detail=f"Invalid state '{state}'")
return state
def expand_pu_number(pu_number: int) -> list[int]:
"""Temporary rule: if PU = 3 → run on [1, 2]."""
return [pu_number] if pu_number != 3 else [1, 2]
def send_command_to_pu(
pu: int, state: str, ploop_setpoint: float, qperm_setpoint: float
) -> dict:
"""Send a state command + update setpoints for one PU."""
state = validate_state(state)
if state == "IDLE":
handle_patient_skid_for_idle()
update_setpoints(ploop_setpoint, qperm_setpoint, pu)
can_backend.send_state_command(state, pu, ploop_setpoint, qperm_setpoint)
current_state = can_backend.read_current_state(pu)
return {
"pu": pu,
"command": state,
"ploop_setpoint": ploop_setpoint,
"qperm_setpoint": qperm_setpoint,
"current_state": current_state,
}
@app.post("/command/{state}/pu/{pu_number}")
def send_command_endpoint(
state: str,
pu_number: int,
ploop_setpoint: float = Query(...),
qperm_setpoint: float = Query(...),
):
logging.info(f"Sending state '{state}' to PU {pu_number}")
pus = expand_pu_number(pu_number)
try:
results = []
for pu in pus:
result = send_command_to_pu(pu, state, ploop_setpoint, qperm_setpoint)
results.append(result)
return {"status": "success", "results": results}
except Exception as e:
logging.error(str(e))
raise HTTPException(status_code=500, detail=str(e))
## MONITORING
@app.get("/api/pu_status")
def get_pu_status():
global active_PUs, latest_setpoints
states = {
"PU1": can_backend.read_current_state(1),
"PU2": can_backend.read_current_state(2),
"PU3": can_backend.read_current_state(3),
}
logging.debug(f"[PU STATUS] {states}")
if states["PU1"] == "SYSTEM_MODE_READY":
send_command_to_pu(state="PRODUCTION", pu_number = 1, ploop_setpoint = latest_setpoints["PU_1"]["Ploop_sp"] , qperm_setpoint=latest_setpoints["PU_1"]["Qperm_sp"])
if states["PU2"] == "SYSTEM_MODE_READY":
send_command_to_pu(state="PRODUCTION", pu_number = 2, ploop_setpoint = latest_setpoints["PU_2"]["Ploop_sp"] , qperm_setpoint=latest_setpoints["PU_2"]["Qperm_sp"])
if states["PU3"] == "SYSTEM_MODE_READY":
send_command_to_pu(state="PRODUCTION", pu_number = 3, ploop_setpoint = latest_setpoints["PU_3"]["Ploop_sp"] , qperm_setpoint=latest_setpoints["PU_3"]["Qperm_sp"])
active_PUs = [
index + 1
for index, (pu, status) in enumerate(states.items())
if status != "Offline"
]
logging.debug(f"[ACTIVE PU] {active_PUs}")
return JSONResponse(content=states)
async def update_latest_data():
global active_PUs
while True:
# DS
data = can_backend.get_latest_data(pu_number=0)
latest_data["DS"] = format_DS_data(data)
# PUs
for pu in active_PUs:
data = can_backend.get_latest_data(pu_number=pu)
latest_data[f"PU_{pu}"] = format_PU_data(data)
format_setpoints(pu)
logging.debug(f"[MONITOR DS BUFFER] latest_data: {latest_data}")
await asyncio.sleep(0.05)
@app.get("/monitor")
async def get_monitor_data():
return latest_data
# LOCAL RECORDER
# --- internal helpers (not endpoints) ---
async def start_recording_internal():
global recording_flag, recording_task, recording_file, recording_writer
if recording_flag:
logging.warning("Recording already in progress.")
return None
now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"recording_{now}.csv"
os.makedirs("recordings", exist_ok=True)
filepath = os.path.join("recordings", filename)
recording_file = open(filepath, "w", newline="")
fieldnames_common = ["timestamp", "pu", "QSkid"]
fieldnames_DS = list(format_DS_data({}).keys())
fieldnames_DS.pop(0)
fieldnames_PUs = list(format_PU_data({}).keys())
fieldnames_PUs.pop(0)
fieldnames = fieldnames_common + fieldnames_DS + fieldnames_PUs + ["Qperm_sp", "Ploop_sp"]
recording_writer = csv.DictWriter(recording_file, fieldnames=fieldnames)
recording_writer.writeheader()
recording_flag = True
recording_task = asyncio.create_task(record_data_loop())
logging.info(f"[RECORDING STARTED] File: {filepath}")
return filename
async def stop_recording_internal():
global recording_flag, recording_task, recording_file
if not recording_flag:
logging.warning("No active recording to stop.")
return False
recording_flag = False
if recording_task:
await recording_task
recording_task = None
if recording_file:
recording_file.close()
recording_file = None
logging.info("[RECORDING STOPPED]")
return True
# --- API endpoints ---
@app.post("/start_recording")
async def start_recording():
filename = await start_recording_internal()
if not filename:
raise HTTPException(status_code=400, detail="Already recording.")
return {"status": "recording started", "file": filename}
@app.post("/stop_recording")
async def stop_recording():
success = await stop_recording_internal()
if not success:
raise HTTPException(status_code=400, detail="Not recording.")
return {"status": "recording stopped"}
@app.get("/is_recording")
async def is_recording():
"""Return True if recording is on, False otherwise"""
return JSONResponse(content={"recording": recording_flag})
async def record_data_loop():
global recording_writer, recording_file, write_buffer, last_flush_time
while recording_flag:
timestamp = datetime.datetime.now().isoformat()
for pu, data in latest_data.items():
if data:
row = {"timestamp": timestamp, "pu": pu, **data}
recording_writer.writerow(row)
# Flush every flush_interval seconds
if (datetime.datetime.now() - last_flush_time).total_seconds() >= flush_interval:
recording_file.flush()
last_flush_time = datetime.datetime.now()
await asyncio.sleep(0.05) # 10 Hz
## AUTOMATIC TESTING
async def send_command_with_delay(
state: str,
pu: int,
delay_s: int = 0,
ploop_setpoint: float = 2.5,
qperm_setpoint: float = 1200.0,
):
await asyncio.sleep(delay_s)
logging.info(f"[AUTO TEST] Sending {state} to PU{pu} after {delay_s}s")
try:
result = send_command_to_pu(pu, state, ploop_setpoint, qperm_setpoint)
except Exception as e:
logging.error(f"[AUTO TEST] Failed to send {state} to PU{pu}: {e}")
return {"status": "error", "detail": str(e)}
async def set_patients_with_delay(count: int, delay_s: int):
await asyncio.sleep(delay_s)
logging.info(f"[AUTO TEST] Sending {count} patients to patient skid after {delay_s}s")
set_patient_skid_users(count)
@router.post("/test/auto/{pu_number}")
async def auto_test(pu_number: int ):
"""
Start automatic test for PU1 or PU2.
"""
global tasks
logging.info(f"[AUTO TEST] Starting automatic test for PU{pu_number}")
key = f"pu{pu_number}"
if key in tasks and not tasks[key].done():
tasks[key].cancel()
logging.info(f"[AUTO TEST] PU{pu_number} Cancelled")
await start_recording_internal()
logging.info("[AUTO TEST] Recorder started")
if pu_number == 1:
task = asyncio.create_task(run_auto_test_1())
result = {"status": "started", "pu": 1}
elif pu_number == 2:
task = asyncio.create_task(run_auto_test_2())
result = {"status": "started", "pu": [2]}
elif pu_number == 3:
task = asyncio.create_task(run_auto_test_3())
result = {"status": "started", "pu": [2]}
else:
return {"status": "error", "message": "Invalid PU number"}
tasks[key] = task
return result
@router.post("/test/auto/stop/{pu}")
async def stop_auto_test(pu: int):
global tasks
key = f"pu{pu}"
logging.info(f"[AUTO TEST] Stopping {pu}")
await stop_recording_internal()
logging.info("[AUTO TEST] Recorder stopped")
if key in tasks and not tasks[key].done():
tasks[key].cancel()
await send_command_with_delay("IDLE", pu =pu, delay_s=0)
logging.info(f"[AUTO TEST] Test of {key} canceled and PU stopped")
return {"status": "stopped", "pu": pu}
logging.info(f"[AUTO TEST] Stopping {pu} No test Runining")
return {"status": "no task running", "pu": pu}
async def run_auto_test_1(pu: int = 1):
try:
await send_command_with_delay("PRE-PRODUCTION", pu = pu, delay_s=0, ploop_setpoint=2.5, qperm_setpoint=1200.0)
await asyncio.sleep(180) # Starting time of the machine
await set_patients_with_delay(5, delay_s=10)
await set_patients_with_delay(10, delay_s=20)
await set_patients_with_delay(0, delay_s=20)
await send_command_with_delay("IDLE", pu =pu, delay_s=20, ploop_setpoint=2.5, qperm_setpoint=1200.0)
logging.info("[AUTO TEST] Finished PU1 test")
await stop_recording_internal()
logging.info("[AUTO TEST] Recorder stopped")
except asyncio.CancelledError:
logging.info(f"[AUTO TEST] PU 1 task cancelled")
raise
async def run_auto_test_2():
try:
await send_command_with_delay("PRE-PRODUCTION", pu=1, delay_s=0, ploop_setpoint=2.5, qperm_setpoint=1200.0)
await send_command_with_delay("PRE-PRODUCTION", pu=1, delay_s=90, ploop_setpoint=2.5, qperm_setpoint=1200.0)
await asyncio.sleep(90) # Starting time of the machine
await set_patients_with_delay(5, delay_s=10)
await set_patients_with_delay(10, delay_s=40)
await asyncio.sleep(100)
await send_command_with_delay("IDLE", pu=1, delay_s=0, ploop_setpoint=2.5, qperm_setpoint=1200.0)
await send_command_with_delay("IDLE", pu=2, delay_s=10, ploop_setpoint=2.5, qperm_setpoint=1200.0)
logging.info("[AUTO TEST] Finished PU1 + PU2 test")
except asyncio.CancelledError:
logging.info(f"[AUTO TEST] PU 2 task cancelled")
# optional cleanup
raise
async def run_auto_test_3():
try:
# Step 1: Run PU1 test
# await run_auto_test_1()
# TODO : TODO
logging.info("[AUTO TEST] Finished PU1 + PU2 test")
except asyncio.CancelledError:
logging.info(f"[AUTO TEST] PU 2 task cancelled")
# optional cleanup
raise
# PATIENT SKID HELPERS
async def update_latest_flow():
global active_PUs
async with aiohttp.ClientSession() as session:
while True:
try:
async with session.get("http://192.168.1.28:8000/instant_flow") as resp:
data = await resp.json()
latest_flow = int(data["log"])
logging.debug(f"Updated flow: {latest_flow}")
latest_data["PatientSkid"]["QSkid"] = latest_flow
except Exception as e:
logging.error(f"Error fetching flow: {e}")
await asyncio.sleep(1.0)
app.include_router(router)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="127.0.0.1",
port=8080,
reload=True,
reload_dirs=["."],
)