from fastapi import FastAPI, HTTPException, Form from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse import logging import os from fastapi import Request, APIRouter import platform from fastapi.templating import Jinja2Templates from starlette.middleware.sessions import SessionMiddleware from starlette.exceptions import HTTPException as StarletteHTTPException from starlette.status import HTTP_302_FOUND import json from pathlib import Path from typing import Dict, Any from fastapi import Query import asyncio import datetime import csv from collections import deque import numpy as np import aiohttp from hardware.patient_skid import handle_patient_skid_for_idle, set_patient_skid_users from serial_manager import SerialConfig, SerialStore, SerialReader from protocol_decoder import decode_frames from serial_csv_logger import SerialCsvLogger # <-- CSV logger if platform.system() in ["Darwin"]: # macOS or Windows from MockCAN import CANBackend logging.basicConfig(level=logging.INFO) else: from hardware.classCAN import CANBackend # Your real backend logging.basicConfig(level=logging.INFO) logging.getLogger("uvicorn.access").setLevel(logging.WARNING) app = FastAPI() app.add_middleware(SessionMiddleware, secret_key="your_super_secret_key") router = APIRouter() templates = Jinja2Templates(directory="templates") can_backend = CANBackend() # Serve static files (HTML, JS, CSS) app.mount("/static", StaticFiles(directory="static"), name="static") ## BUFFER CREATION AND HELPER # Global object to store the latest data latest_data: Dict[str, Any] = { "PU_1": None, "PU_2": None, "PU_3": None, "DS": None, "PatientSkid": {"QSkid": 0.0}, } latest_setpoints: Dict[str, Any] = { "PU_1": {"Ploop_sp": 0.0, "Qperm_sp": 0.0}, "PU_2": {"Ploop_sp": 0.0, "Qperm_sp": 0.0}, "PU_3": {"Ploop_sp": 0.0, "Qperm_sp": 0.0}, } active_PUs: list[int] = [] VALID_STATES = { "IDLE", "PRE-PRODUCTION", "PRODUCTION", "FIRST_START", "THERMALLOOPCLEANING", "DISINFECTION", "SLEEP", } # Dictionary to hold running tasks tasks: dict[str, asyncio.Task] = {} # RECORDER recording_flag = False recording_task = None recording_writer = None recording_file = None write_buffer = deque() flush_interval = 1.0 # flush every 1 second last_flush_time = datetime.datetime.now() # ---- Serial intake globals ---- serial_store = SerialStore(capacity=5000) serial_reader: SerialReader | None = None serial_csv: SerialCsvLogger | None = None # <-- added ## LOGGING def format_PU_data(data): return { "timestamp": datetime.datetime.now().isoformat(), "Qperm": np.round(data.get("FM2", 0.0), 1), "Qdilute": np.round(data.get("FM1", 0.0), 1), "Qdrain": np.round(data.get("FM4", 0.0), 1), "Qrecirc": np.round(data.get("FM3", 0.0), 1), "QdrainEDI": np.round(data.get("FM2", 0.0), 1) - np.round(data.get("FM1", 0.0), 1), "Pro": np.round(data.get("PS2", 0.0), 2), "Pdilute": np.round(data.get("PS3", 0.0), 2), "Pretentate": np.round(data.get("PS1", 0.0), 2), "Cfeed": data.get("Conductivity_Feed", 0.0), "Cperm": data.get("Conductivity_Permeate", 0.0), "Cdilute": data.get("Conductivity_Product", 0.0), "MV02": np.round(data.get("MV02", 0.0), 1), "MV02_sp": np.round(data.get("MV02_sp", 0.0), 1), "MV03": np.round(data.get("MV03", 0.0), 1), "MV03_sp": np.round(data.get("MV03_sp", 0.0), 1), "MV04": np.round(data.get("MV04", 0.0), 1), "MV04_sp": np.round(data.get("MV04_sp", 0.0), 1), "MV05": np.round(data.get("MV05", 0.0), 1), "MV05_sp": np.round(data.get("MV05_sp", 0.0), 1), "MV06": np.round(data.get("MV06", 0.0), 1), "MV06_sp": np.round(data.get("MV06_sp", 0.0), 1), "MV07": np.round(data.get("MV07", 0.0), 1), "MV07_sp": np.round(data.get("MV07_sp", 0.0), 1), "MV08": np.round(data.get("MV08", 0.0), 1), "MV08_sp": np.round(data.get("MV08_sp", 0.0), 1), "Qdrain_sp" : max(60*np.round(data.get("Qdrain_sp", 0.0), 2),350.0), } def format_DS_data(data): q_conso = max(np.round(data.get("Inlet_flow", 0.0), 1) - np.round(data.get("Outlet_flow", 0.0), 1),0) return { "timestamp": datetime.datetime.now().isoformat(), "Qconso": q_conso , "TankLevel": np.round(data.get("TankLevel", 0.0), 2), "Qinlet": np.round(data.get("Inlet_flow", 0.0), 1), "Qoutlet": np.round(data.get("Outlet_flow", 0.0), 1), } ## Fetch setpoints def update_setpoints(p_loop_setpoint : float, q_perm_setpoint : float, pu : int): global latest_setpoints pu_key = "PU_"+str(pu) latest_setpoints[pu_key]["Ploop_sp"] = p_loop_setpoint latest_setpoints[pu_key]["Qperm_sp"] = q_perm_setpoint def format_setpoints(pu: int): # THis is a bit convoluted to pass from an object to another but it works global latest_setpoints, latest_data pu_key = "PU_" + str(pu) latest_data[pu_key]["Ploop_sp"] = latest_setpoints[pu_key]["Ploop_sp"] latest_data[pu_key]["Qperm_sp"] = latest_setpoints[pu_key]["Qperm_sp"] # CREDENTIALS CREDENTIAL_PATH = Path("credentials.json") if CREDENTIAL_PATH.exists(): with CREDENTIAL_PATH.open("r") as f: CREDENTIALS = json.load(f) else: CREDENTIALS = {} USERNAME = CREDENTIALS["username"] PASSWORD = CREDENTIALS["password"] # ======== LOGIN & SESSION HANDLING ======== def require_login(request: Request): user = request.session.get("user") if user != USERNAME: # raise 302 to trigger redirection manually (FastAPI doesn't support redirects from Depends directly) raise StarletteHTTPException(status_code=302, detail="Redirect to login") return user @app.get("/", response_class=HTMLResponse) def login_form(request: Request): return templates.TemplateResponse("login.html", {"request": request}) @app.post("/login") def login(request: Request, username: str = Form(...), password: str = Form(...)): if username == USERNAME and password == PASSWORD: request.session["user"] = username return RedirectResponse("/control", status_code=HTTP_302_FOUND) return templates.TemplateResponse( "login.html", {"request": request, "error": "Invalid credentials.json"} ) @app.get("/logout") def logout(request: Request): request.session.clear() return RedirectResponse("/", status_code=HTTP_302_FOUND) # ======== PROTECTED INTERFACE / STARTUP-SHUTDOWN ======== @app.on_event("startup") async def startup_event(): # ----- CSV logger ----- global serial_csv serial_csv = SerialCsvLogger(out_dir="serial_logs", rotate_daily=True) # ----- start the serial reader ----- global serial_reader cfg = SerialConfig( port=os.getenv("SERIAL_PORT", "/dev/ttyUSB0"), baudrate=int(os.getenv("SERIAL_BAUD", "115200")), csv_log_path=None, # disable the generic CSV inside reader; use segregated logger instead ring_capacity=int(os.getenv("SERIAL_RING", "5000")), ) serial_reader = SerialReader( cfg, serial_store, decoder=decode_frames, on_message=(lambda p: serial_csv.log(p)) # write CSV per message type ) serial_reader.start() # ----- your existing tasks ----- asyncio.create_task(update_latest_data()) asyncio.create_task(update_latest_flow()) @app.on_event("shutdown") def _serial_stop(): if serial_reader: serial_reader.stop() if serial_csv: serial_csv.close() # ======== PAGES ======== @app.get("/control", response_class=HTMLResponse) def control_page(request: Request): can_backend.connect() if request.session.get("user") != USERNAME: return RedirectResponse("/", status_code=HTTP_302_FOUND) return templates.TemplateResponse("control.html", {"request": request}) @app.get("/monitor-DS", response_class=HTMLResponse) def monitor_page(request: Request): with open("static/monitor_DS.html") as f: return HTMLResponse(f.read()) @app.get("/monitor-PU", response_class=HTMLResponse) def monitor_page(request: Request): with open("static/monitor_PU.html") as f: return HTMLResponse(f.read()) @app.get("/multi-monitor-PU", response_class=HTMLResponse) def monitor_page(request: Request): with open("static/multi_pu_dashboard.html") as f: return HTMLResponse(f.read()) # ======== SERIAL API ======== @app.get("/serial/messages") def serial_messages(n: int = 100): return serial_store.latest(min(max(n, 1), 1000)) @app.get("/serial/stats") def serial_stats(): return serial_store.stats() @app.get("/serial/snapshot") def serial_snapshot(): return serial_store.latest_by_id() # ======== CAN + BACKEND ROUTES ======== @app.post("/connect_toggle") def connect_toggle(): logging.info(f"Toggling CAN connection, CAN is {can_backend.connected}") if can_backend.connected: can_backend.shutdown() logging.info("Shutting down CAN connection...") return {"connected": False} else: success = can_backend.connect() if not success: raise HTTPException(status_code=500, detail="Connection failed.") return {"connected": can_backend.connected} @app.get("/is_connected") def is_connected(): return {"connected": can_backend.connected} # PU CONTROL def validate_state(state: str) -> str: """Normalize and validate the requested state.""" state = state.upper() if state not in VALID_STATES: raise HTTPException(status_code=400, detail=f"Invalid state '{state}'") return state def expand_pu_number(pu_number: int) -> list[int]: """Temporary rule: if PU = 3 → run on [1, 2].""" return [pu_number] if pu_number != 3 else [1, 2] def send_command_to_pu( pu: int, state: str, ploop_setpoint: float, qperm_setpoint: float ) -> dict: """Send a state command + update setpoints for one PU.""" state = validate_state(state) if state == "IDLE": handle_patient_skid_for_idle() update_setpoints(ploop_setpoint, qperm_setpoint, pu) can_backend.send_state_command(state, pu, ploop_setpoint, qperm_setpoint) current_state = can_backend.read_current_state(pu) return { "pu": pu, "command": state, "ploop_setpoint": ploop_setpoint, "qperm_setpoint": qperm_setpoint, "current_state": current_state, } @app.post("/command/{state}/pu/{pu_number}") def send_command_endpoint( state: str, pu_number: int, ploop_setpoint: float = Query(...), qperm_setpoint: float = Query(...), ): logging.info(f"Sending state '{state}' to PU {pu_number}") pus = expand_pu_number(pu_number) try: results = [] for pu in pus: result = send_command_to_pu(pu, state, ploop_setpoint, qperm_setpoint) results.append(result) return {"status": "success", "results": results} except Exception as e: logging.error(str(e)) raise HTTPException(status_code=500, detail=str(e)) ## MONITORING @app.get("/api/pu_status") def get_pu_status(): global active_PUs, latest_setpoints states = { "PU1": can_backend.read_current_state(1), "PU2": can_backend.read_current_state(2), "PU3": can_backend.read_current_state(3), } logging.debug(f"[PU STATUS] {states}") if states["PU1"] == "SYSTEM_MODE_READY": send_command_to_pu(state="PRODUCTION", pu_number = 1, ploop_setpoint = latest_setpoints["PU_1"]["Ploop_sp"] , qperm_setpoint=latest_setpoints["PU_1"]["Qperm_sp"]) if states["PU2"] == "SYSTEM_MODE_READY": send_command_to_pu(state="PRODUCTION", pu_number = 2, ploop_setpoint = latest_setpoints["PU_2"]["Ploop_sp"] , qperm_setpoint=latest_setpoints["PU_2"]["Qperm_sp"]) if states["PU3"] == "SYSTEM_MODE_READY": send_command_to_pu(state="PRODUCTION", pu_number = 3, ploop_setpoint = latest_setpoints["PU_3"]["Ploop_sp"] , qperm_setpoint=latest_setpoints["PU_3"]["Qperm_sp"]) active_PUs = [ index + 1 for index, (pu, status) in enumerate(states.items()) if status != "Offline" ] logging.debug(f"[ACTIVE PU] {active_PUs}") return JSONResponse(content=states) async def update_latest_data(): global active_PUs while True: # DS data = can_backend.get_latest_data(pu_number=0) latest_data["DS"] = format_DS_data(data) # PUs for pu in active_PUs: data = can_backend.get_latest_data(pu_number=pu) latest_data[f"PU_{pu}"] = format_PU_data(data) format_setpoints(pu) logging.debug(f"[MONITOR DS BUFFER] latest_data: {latest_data}") await asyncio.sleep(0.05) @app.get("/monitor") async def get_monitor_data(): return latest_data # LOCAL RECORDER # --- internal helpers (not endpoints) --- async def start_recording_internal(): global recording_flag, recording_task, recording_file, recording_writer if recording_flag: logging.warning("Recording already in progress.") return None now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"recording_{now}.csv" os.makedirs("recordings", exist_ok=True) filepath = os.path.join("recordings", filename) recording_file = open(filepath, "w", newline="") fieldnames_common = ["timestamp", "pu", "QSkid"] fieldnames_DS = list(format_DS_data({}).keys()) fieldnames_DS.pop(0) fieldnames_PUs = list(format_PU_data({}).keys()) fieldnames_PUs.pop(0) fieldnames = fieldnames_common + fieldnames_DS + fieldnames_PUs + ["Qperm_sp", "Ploop_sp"] recording_writer = csv.DictWriter(recording_file, fieldnames=fieldnames) recording_writer.writeheader() recording_flag = True recording_task = asyncio.create_task(record_data_loop()) logging.info(f"[RECORDING STARTED] File: {filepath}") return filename async def stop_recording_internal(): global recording_flag, recording_task, recording_file if not recording_flag: logging.warning("No active recording to stop.") return False recording_flag = False if recording_task: await recording_task recording_task = None if recording_file: recording_file.close() recording_file = None logging.info("[RECORDING STOPPED]") return True # --- API endpoints --- @app.post("/start_recording") async def start_recording(): filename = await start_recording_internal() if not filename: raise HTTPException(status_code=400, detail="Already recording.") return {"status": "recording started", "file": filename} @app.post("/stop_recording") async def stop_recording(): success = await stop_recording_internal() if not success: raise HTTPException(status_code=400, detail="Not recording.") return {"status": "recording stopped"} @app.get("/is_recording") async def is_recording(): """Return True if recording is on, False otherwise""" return JSONResponse(content={"recording": recording_flag}) async def record_data_loop(): global recording_writer, recording_file, write_buffer, last_flush_time while recording_flag: timestamp = datetime.datetime.now().isoformat() for pu, data in latest_data.items(): if data: row = {"timestamp": timestamp, "pu": pu, **data} recording_writer.writerow(row) # Flush every flush_interval seconds if (datetime.datetime.now() - last_flush_time).total_seconds() >= flush_interval: recording_file.flush() last_flush_time = datetime.datetime.now() await asyncio.sleep(0.05) # 10 Hz ## AUTOMATIC TESTING async def send_command_with_delay( state: str, pu: int, delay_s: int = 0, ploop_setpoint: float = 2.5, qperm_setpoint: float = 1200.0, ): await asyncio.sleep(delay_s) logging.info(f"[AUTO TEST] Sending {state} to PU{pu} after {delay_s}s") try: result = send_command_to_pu(pu, state, ploop_setpoint, qperm_setpoint) except Exception as e: logging.error(f"[AUTO TEST] Failed to send {state} to PU{pu}: {e}") return {"status": "error", "detail": str(e)} async def set_patients_with_delay(count: int, delay_s: int): await asyncio.sleep(delay_s) logging.info(f"[AUTO TEST] Sending {count} patients to patient skid after {delay_s}s") set_patient_skid_users(count) @router.post("/test/auto/{pu_number}") async def auto_test(pu_number: int ): """ Start automatic test for PU1 or PU2. """ global tasks logging.info(f"[AUTO TEST] Starting automatic test for PU{pu_number}") key = f"pu{pu_number}" if key in tasks and not tasks[key].done(): tasks[key].cancel() logging.info(f"[AUTO TEST] PU{pu_number} Cancelled") await start_recording_internal() logging.info("[AUTO TEST] Recorder started") if pu_number == 1: task = asyncio.create_task(run_auto_test_1()) result = {"status": "started", "pu": 1} elif pu_number == 2: task = asyncio.create_task(run_auto_test_2()) result = {"status": "started", "pu": [2]} elif pu_number == 3: task = asyncio.create_task(run_auto_test_3()) result = {"status": "started", "pu": [2]} else: return {"status": "error", "message": "Invalid PU number"} tasks[key] = task return result @router.post("/test/auto/stop/{pu}") async def stop_auto_test(pu: int): global tasks key = f"pu{pu}" logging.info(f"[AUTO TEST] Stopping {pu}") await stop_recording_internal() logging.info("[AUTO TEST] Recorder stopped") if key in tasks and not tasks[key].done(): tasks[key].cancel() await send_command_with_delay("IDLE", pu =pu, delay_s=0) logging.info(f"[AUTO TEST] Test of {key} canceled and PU stopped") return {"status": "stopped", "pu": pu} logging.info(f"[AUTO TEST] Stopping {pu} No test Runining") return {"status": "no task running", "pu": pu} async def run_auto_test_1(pu: int = 1): try: await send_command_with_delay("PRE-PRODUCTION", pu = pu, delay_s=0, ploop_setpoint=2.5, qperm_setpoint=1200.0) await asyncio.sleep(180) # Starting time of the machine await set_patients_with_delay(5, delay_s=10) await set_patients_with_delay(10, delay_s=20) await set_patients_with_delay(0, delay_s=20) await send_command_with_delay("IDLE", pu =pu, delay_s=20, ploop_setpoint=2.5, qperm_setpoint=1200.0) logging.info("[AUTO TEST] Finished PU1 test") await stop_recording_internal() logging.info("[AUTO TEST] Recorder stopped") except asyncio.CancelledError: logging.info(f"[AUTO TEST] PU 1 task cancelled") raise async def run_auto_test_2(): try: await send_command_with_delay("PRE-PRODUCTION", pu=1, delay_s=0, ploop_setpoint=2.5, qperm_setpoint=1200.0) await send_command_with_delay("PRE-PRODUCTION", pu=1, delay_s=90, ploop_setpoint=2.5, qperm_setpoint=1200.0) await asyncio.sleep(90) # Starting time of the machine await set_patients_with_delay(5, delay_s=10) await set_patients_with_delay(10, delay_s=40) await asyncio.sleep(100) await send_command_with_delay("IDLE", pu=1, delay_s=0, ploop_setpoint=2.5, qperm_setpoint=1200.0) await send_command_with_delay("IDLE", pu=2, delay_s=10, ploop_setpoint=2.5, qperm_setpoint=1200.0) logging.info("[AUTO TEST] Finished PU1 + PU2 test") except asyncio.CancelledError: logging.info(f"[AUTO TEST] PU 2 task cancelled") # optional cleanup raise async def run_auto_test_3(): try: # Step 1: Run PU1 test # await run_auto_test_1() # TODO : TODO logging.info("[AUTO TEST] Finished PU1 + PU2 test") except asyncio.CancelledError: logging.info(f"[AUTO TEST] PU 2 task cancelled") # optional cleanup raise # PATIENT SKID HELPERS async def update_latest_flow(): global active_PUs async with aiohttp.ClientSession() as session: while True: try: async with session.get("http://192.168.1.28:8000/instant_flow") as resp: data = await resp.json() latest_flow = int(data["log"]) logging.debug(f"Updated flow: {latest_flow}") latest_data["PatientSkid"]["QSkid"] = latest_flow except Exception as e: logging.error(f"Error fetching flow: {e}") await asyncio.sleep(1.0) app.include_router(router) if __name__ == "__main__": import uvicorn uvicorn.run( "main:app", host="127.0.0.1", port=8080, reload=True, reload_dirs=["."], )