from fastapi import FastAPI, HTTPException, Query, Form, Depends from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse import logging import os from fastapi import Request, APIRouter import platform from fastapi.templating import Jinja2Templates from starlette.middleware.sessions import SessionMiddleware from starlette.exceptions import HTTPException as StarletteHTTPException from starlette.status import HTTP_302_FOUND import json from pathlib import Path from typing import Optional, Dict, Any from fastapi import Query import asyncio import datetime import csv from collections import deque import numpy as np import aiohttp import httpx from serial_manager import SerialConfig, SerialStore, SerialReader from protocol_decoder import decode_frames from serial_csv_logger import SerialCsvLogger # <-- CSV logger if platform.system() in ["Darwin"]: # macOS or Windows from MockCAN import CANBackend logging.basicConfig(level=logging.INFO) else: from classCAN import CANBackend # Your real backend logging.basicConfig(level=logging.INFO) logging.getLogger("uvicorn.access").setLevel(logging.WARNING) app = FastAPI() app.add_middleware(SessionMiddleware, secret_key="your_super_secret_key") router = APIRouter() templates = Jinja2Templates(directory="templates") can_backend = CANBackend() # Serve static files (HTML, JS, CSS) app.mount("/static", StaticFiles(directory="static"), name="static") ## BUFFER CREATION AND HELPER # Global object to store the latest data latest_data: Dict[str, Any] = { "PU_1": {"Ploop_sp": 0.0, "Qperm_sp": 0.0}, "PU_2": {"Ploop_sp": 0.0, "Qperm_sp": 0.0}, "PU_3": {"Ploop_sp": 0.0, "Qperm_sp": 0.0}, "DS": None, "PatientSkid": {"QSkid": 0.0}, } latest_setpoints: Dict[str, Any] = { "PU_1": {"Ploop_sp": 2.5, "Qperm_sp": 1200}, "PU_2": {"Ploop_sp": 2.5, "Qperm_sp": 1200}, "PU_3": {"Ploop_sp": 2.5, "Qperm_sp": 1200}, } active_PUs: list[int] = [] # RECORDER recording_flag = False recording_task = None recording_writer = None recording_file = None write_buffer = deque() flush_interval = 1.0 # flush every 1 second last_flush_time = datetime.datetime.now() # ---- Serial intake globals ---- serial_store = SerialStore(capacity=5000) serial_reader: SerialReader | None = None serial_csv: SerialCsvLogger | None = None # <-- added ## LOGGING def format_PU_data(data): return { "timestamp": datetime.datetime.now().isoformat(), "Qperm": np.round(data.get("FM2", 0.0), 1), "Qdilute": np.round(data.get("FM1", 0.0), 1), "Qdrain": np.round(data.get("FM4", 0.0), 1), "Qrecirc": np.round(data.get("FM3", 0.0), 1), "QdrainEDI": np.round(data.get("FM2", 0.0), 1) - np.round(data.get("FM1", 0.0), 1), "Pro": np.round(data.get("PS2", 0.0), 2), "Pdilute": np.round(data.get("PS3", 0.0), 2), "Pretentate": np.round(data.get("PS1", 0.0), 2), "Cfeed": data.get("Conductivity_Feed", 0.0), "Cperm": data.get("Conductivity_Permeate", 0.0), "Cdilute": data.get("Conductivity_Product", 0.0), "MV02": np.round(data.get("MV02", 0.0), 1), "MV02_sp": np.round(data.get("MV02_sp", 0.0), 1), "MV03": np.round(data.get("MV03", 0.0), 1), "MV03_sp": np.round(data.get("MV03_sp", 0.0), 1), "MV04": np.round(data.get("MV04", 0.0), 1), "MV04_sp": np.round(data.get("MV04_sp", 0.0), 1), "MV05": np.round(data.get("MV05", 0.0), 1), "MV05_sp": np.round(data.get("MV05_sp", 0.0), 1), "MV06": np.round(data.get("MV06", 0.0), 1), "MV06_sp": np.round(data.get("MV06_sp", 0.0), 1), "MV07": np.round(data.get("MV07", 0.0), 1), "MV07_sp": np.round(data.get("MV07_sp", 0.0), 1), "MV08": np.round(data.get("MV08", 0.0), 1), "MV08_sp": np.round(data.get("MV08_sp", 0.0), 1), "Qdrain_sp" : 60*np.round(data.get("Qdrain_sp", 0.0), 2), } def format_DS_data(data): return { "timestamp": datetime.datetime.now().isoformat(), "Qconso": np.round(data.get("Qdrain_sp", 0.0), 1), "TankLevel": np.round(data.get("TankLevel", 0.0), 2), "Qinlet": np.round(data.get("Inlet_flow", 0.0), 1), "Qoutlet": np.round(data.get("Outlet_flow", 0.0), 1), } ## Fetch setpoints def update_setpoints(p_loop_setpoint : float, q_perm_setpoint : float, pu : int): global latest_setpoints pu_key = "PU_"+str(pu) latest_setpoints[pu_key]["Ploop_sp"] = p_loop_setpoint latest_setpoints[pu_key]["Qperm_sp"] = q_perm_setpoint def format_setpoints(pu: int): # THis is a bit convoluted to pass from an object to another but it works global latest_setpoints, latest_data pu_key = "PU_" + str(pu) latest_data[pu_key]["Ploop_sp"] = latest_setpoints[pu_key]["Ploop_sp"] latest_data[pu_key]["Qperm_sp"] = latest_setpoints[pu_key]["Qperm_sp"] # CREDENTIALS CREDENTIAL_PATH = Path("credentials.json") if CREDENTIAL_PATH.exists(): with CREDENTIAL_PATH.open("r") as f: CREDENTIALS = json.load(f) else: CREDENTIALS = {} USERNAME = CREDENTIALS["username"] PASSWORD = CREDENTIALS["password"] # ======== LOGIN & SESSION HANDLING ======== def require_login(request: Request): user = request.session.get("user") if user != USERNAME: # raise 302 to trigger redirection manually (FastAPI doesn't support redirects from Depends directly) raise StarletteHTTPException(status_code=302, detail="Redirect to login") return user @app.get("/", response_class=HTMLResponse) def login_form(request: Request): return templates.TemplateResponse("login.html", {"request": request}) @app.post("/login") def login(request: Request, username: str = Form(...), password: str = Form(...)): if username == USERNAME and password == PASSWORD: request.session["user"] = username return RedirectResponse("/control", status_code=HTTP_302_FOUND) return templates.TemplateResponse( "login.html", {"request": request, "error": "Invalid credentials.json"} ) @app.get("/logout") def logout(request: Request): request.session.clear() return RedirectResponse("/", status_code=HTTP_302_FOUND) # ======== PROTECTED INTERFACE / STARTUP-SHUTDOWN ======== @app.on_event("startup") async def startup_event(): # ----- CSV logger ----- global serial_csv serial_csv = SerialCsvLogger(out_dir="serial_logs", rotate_daily=True) # ----- start the serial reader ----- global serial_reader cfg = SerialConfig( port=os.getenv("SERIAL_PORT", "/dev/ttyUSB0"), baudrate=int(os.getenv("SERIAL_BAUD", "115200")), csv_log_path=None, # disable the generic CSV inside reader; use segregated logger instead ring_capacity=int(os.getenv("SERIAL_RING", "5000")), ) serial_reader = SerialReader( cfg, serial_store, decoder=decode_frames, on_message=(lambda p: serial_csv.log(p)) # write CSV per message type ) serial_reader.start() # ----- your existing tasks ----- asyncio.create_task(update_latest_data()) asyncio.create_task(update_latest_flow()) @app.on_event("shutdown") def _serial_stop(): if serial_reader: serial_reader.stop() if serial_csv: serial_csv.close() # ======== PAGES ======== @app.get("/control", response_class=HTMLResponse) def control_page(request: Request): can_backend.connect() if request.session.get("user") != USERNAME: return RedirectResponse("/", status_code=HTTP_302_FOUND) return templates.TemplateResponse("control.html", {"request": request}) @app.get("/monitor-DS", response_class=HTMLResponse) def monitor_page(request: Request): with open("static/monitor_DS.html") as f: return HTMLResponse(f.read()) @app.get("/monitor-PU", response_class=HTMLResponse) def monitor_page(request: Request): with open("static/monitor_PU.html") as f: return HTMLResponse(f.read()) @app.get("/multi-monitor-PU", response_class=HTMLResponse) def monitor_page(request: Request): with open("static/multi_pu_dashboard.html") as f: return HTMLResponse(f.read()) # ======== SERIAL API ======== @app.get("/serial/messages") def serial_messages(n: int = 100): return serial_store.latest(min(max(n, 1), 1000)) @app.get("/serial/stats") def serial_stats(): return serial_store.stats() @app.get("/serial/snapshot") def serial_snapshot(): return serial_store.latest_by_id() # ======== CAN + BACKEND ROUTES ======== @app.post("/connect_toggle") def connect_toggle(): logging.info(f"Toggling CAN connection, CAN is {can_backend.connected}") if can_backend.connected: can_backend.shutdown() logging.info("Shutting down CAN connection...") return {"connected": False} else: success = can_backend.connect() if not success: raise HTTPException(status_code=500, detail="Connection failed.") return {"connected": can_backend.connected} @app.get("/is_connected") def is_connected(): return {"connected": can_backend.connected} # PU CONTROL @app.post("/command/{state}/pu/{pu_number}") def send_command(state: str, pu_number: int, ploop_setpoint: float = Query(...), qperm_setpoint: float = Query(...)): VALID_STATES = { "IDLE", "PRE-PRODUCTION", "PRODUCTION", "FIRST_START", "THERMALLOOPCLEANING", "DISINFECTION", "SLEEP", } state = state.upper() if state not in VALID_STATES: raise HTTPException(status_code=400, detail=f"Invalid state '{state}'") logging.info(f"Sending state '{state}' to PU {pu_number}") update_setpoints(ploop_setpoint, qperm_setpoint, pu_number) pu_number = [pu_number] if pu_number !=3 else [1,2] # Temporary way of starting two pus if state == "IDLE": set_patient_skid_users(0) url = f"http://192.168.1.28:8000/stop_test" response = httpx.get(url, timeout=1.0) logging.info(f"Stopping test on Patient Skid: {response.status_code}") url = f"http://192.168.1.28:8000/close_valves" response = httpx.get(url, timeout=1.0) logging.info(f"Closing valves on Patient Skid: {response.status_code}") try: for pu in pu_number: # current_state = can_backend.read_current_state(pu) # while current_state != state: can_backend.send_state_command(state, pu, ploop_setpoint, qperm_setpoint) current_state = can_backend.read_current_state(pu) return { "status": "success", "command": state, "pu": pu, "ploop_setpoint": ploop_setpoint, "qperm_setpoint": qperm_setpoint, "current_state": current_state, } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) ## MONITORING @app.get("/api/pu_status") def get_pu_status(): global active_PUs states = { "PU1": can_backend.read_current_state(1), "PU2": can_backend.read_current_state(2), "PU3": can_backend.read_current_state(3), } logging.debug(f"[PU STATUS] {states}") active_PUs = [ index + 1 for index, (pu, status) in enumerate(states.items()) if status != "Offline" ] logging.debug(f"[ACTIVE PU] {active_PUs}") return JSONResponse(content=states) async def update_latest_data(): global active_PUs while True: # DS data = can_backend.get_latest_data(pu_number=0) latest_data["DS"] = format_DS_data(data) # PUs for pu in active_PUs: data = can_backend.get_latest_data(pu_number=pu) latest_data[f"PU_{pu}"] = format_PU_data(data) format_setpoints(pu) logging.debug(f"[MONITOR DS BUFFER] latest_data: {latest_data}") await asyncio.sleep(0.05) @app.get("/monitor") async def get_monitor_data(): return latest_data # LOCAL RECORDER @app.post("/start_recording") async def start_recording(): global recording_flag, recording_task, recording_file, recording_writer if recording_flag: raise HTTPException(status_code=400, detail="Already recording.") now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"recording_{now}.csv" os.makedirs("recordings", exist_ok=True) filepath = os.path.join("recordings", filename) recording_file = open(filepath, "w", newline="") fieldnames_common = ["timestamp", "pu", "QSkid"] fieldnames_DS = list(format_DS_data({}).keys()) fieldnames_DS.pop(0) # removing extra timestamp fieldnames_PUs = list(format_PU_data({}).keys()) fieldnames_PUs.pop(0) # removing extra timestamp fieldnames = fieldnames_common + fieldnames_DS + fieldnames_PUs recording_writer = csv.DictWriter(recording_file, fieldnames=fieldnames) recording_writer.writeheader() recording_flag = True recording_task = asyncio.create_task(record_data_loop()) logging.info(f"[RECORDING STARTED] File: {filepath}") return {"status": "recording started", "file": filename} @app.post("/stop_recording") async def stop_recording(): global recording_flag, recording_task, recording_file if not recording_flag: raise HTTPException(status_code=400, detail="Not recording.") recording_flag = False if recording_task: await recording_task recording_task = None if recording_file: recording_file.close() recording_file = None logging.info("[RECORDING STOPPED]") return {"status": "recording stopped"} async def record_data_loop(): global recording_writer, recording_file, write_buffer, last_flush_time while recording_flag: timestamp = datetime.datetime.now().isoformat() for pu, data in latest_data.items(): if data: row = {"timestamp": timestamp, "pu": pu, **data} recording_writer.writerow(row) # Flush every flush_interval seconds if (datetime.datetime.now() - last_flush_time).total_seconds() >= flush_interval: recording_file.flush() last_flush_time = datetime.datetime.now() await asyncio.sleep(0.05) # 10 Hz ## AUTOMATIC TESTING async def send_command_with_delay( state: str, pu: int, delay_s: int = 0, ploop_setpoint: float = 0.0 ): await asyncio.sleep(delay_s) logging.info(f"[AUTO TEST] Sending {state} to PU{pu} after {delay_s}s") can_backend.send_state_command(state, pu, ploop_setpoint) async def set_patients_with_delay(count: int, delay_s: int): await asyncio.sleep(delay_s) logging.info(f"[AUTO TEST] Sending {count} patients to patient skid after {delay_s}s") set_patient_skid_users(count) @router.post("/test/auto/1") async def auto_test_pu1(ploop_setpoint: float = Query(0.0)): pu = 1 logging.info("[AUTO TEST] Starting automatic test for 1 PU") asyncio.create_task(run_auto_test_pu1(pu, ploop_setpoint)) return {"status": "started", "pu": pu} @router.post("/test/auto/2") async def auto_test_pu2(ploop_setpoint: float = Query(0.0)): logging.info("[AUTO TEST] Starting automatic test for 2 PUs") asyncio.create_task(run_auto_test_pu2(ploop_setpoint)) return {"status": "started", "pu": [1, 2]} async def run_auto_test_pu1(pu: int, ploop_setpoint: float): await send_command_with_delay("PRE-PRODUCTION", pu, delay_s=0, ploop_setpoint=ploop_setpoint) await send_command_with_delay("PRODUCTION", pu, delay_s=180, ploop_setpoint=ploop_setpoint) await set_patients_with_delay(5, delay_s=60) await set_patients_with_delay(10, delay_s=60) await send_command_with_delay("IDLE", pu, delay_s=60, ploop_setpoint=ploop_setpoint) logging.info("[AUTO TEST] Finished PU1 test") async def run_auto_test_pu2(ploop_setpoint: float): # Step 1: Run PU1 test await run_auto_test_pu1(1, ploop_setpoint) # Step 2: PU2 sequence await send_command_with_delay("PRE-PRODUCTION", 2, delay_s=0, ploop_setpoint=ploop_setpoint) await send_command_with_delay("PRODUCTION", 2, delay_s=180, ploop_setpoint=ploop_setpoint) await set_patients_with_delay(15, delay_s=60) await set_patients_with_delay(0, delay_s=60) await send_command_with_delay("IDLE", 2, delay_s=60, ploop_setpoint=ploop_setpoint) await send_command_with_delay("IDLE", 1, delay_s=60, ploop_setpoint=ploop_setpoint) logging.info("[AUTO TEST] Finished PU1 + PU2 test") @router.post("/test/auto/3") async def auto_test_pu3(): # Call the function for PU3 auto test logging.info("Start auto test of 3 PU") return {"status": "started", "pu": 3} # PATIENT SKID HELPERS async def update_latest_flow(): global active_PUs async with aiohttp.ClientSession() as session: while True: try: async with session.get("http://192.168.1.28:8000/instant_flow") as resp: data = await resp.json() latest_flow = int(data["log"]) logging.debug(f"Updated flow: {latest_flow}") latest_data["PatientSkid"]["QSkid"] = latest_flow except Exception as e: logging.error(f"Error fetching flow: {e}") await asyncio.sleep(1.0) def stop_patient_skid(): try: url = f"http://192.168.1.28:8000/stop_test" response = httpx.get(url, timeout=5.0) if response.status_code == 200: return {"status": "success", "detail": response.json()} else: raise HTTPException(status_code=502, detail=f"Remote server error: {response.text}") except httpx.RequestError as e: raise HTTPException(status_code=500, detail=f"Request to external server failed: {str(e)}") def set_patient_skid_users(count: int = 1): try: url = f"http://192.168.1.28:8000/set_users/{count}" response = httpx.get(url, timeout=5.0) if response.status_code == 200: return {"status": "success", "detail": response.json()} else: raise HTTPException(status_code=502, detail=f"Remote server error: {response.text}") except httpx.RequestError as e: raise HTTPException(status_code=500, detail=f"Request to external server failed: {str(e)}") app.include_router(router) if __name__ == "__main__": import uvicorn uvicorn.run( "main:app", host="127.0.0.1", port=8080, reload=True, reload_dirs=["."], )