from fastapi import FastAPI, HTTPException, Query, Form, Depends from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse import logging import os from fastapi import Request, APIRouter import platform from fastapi.templating import ( Jinja2Templates, ) # pip install fastapi uvicorn jinja2 python-multipart passlib from starlette.middleware.sessions import SessionMiddleware from starlette.exceptions import HTTPException as StarletteHTTPException from starlette.status import HTTP_302_FOUND import json from pathlib import Path from typing import Optional, Dict, Any from fastapi import Query import asyncio import datetime from valveBackend import ValveBackend import csv from collections import deque import numpy as np import aiohttp if platform.system() in ["Darwin"]: # macOS or Windows from MockCAN import CANBackend logging.basicConfig(level=logging.INFO) else: from classCAN import CANBackend # Your real backend logging.basicConfig(level=logging.ERROR) app = FastAPI() app.add_middleware(SessionMiddleware, secret_key="your_super_secret_key") router = APIRouter() templates = Jinja2Templates(directory="templates") can_backend = CANBackend() valve_backend = ValveBackend( eds_file="/home/hmi/Desktop/HMI/eds_file/inletvalveboard.eds" ) # Serve static files (HTML, JS, CSS) app.mount("/static", StaticFiles(directory="static"), name="static") ## BUFFER CREATION AND HELPER # Global object to store the latest data latest_data: Dict[str, Any] = { "PU_1": None, "PU_2": None, "PU_3": None, "PatientSkid": {"QSkid": 0.0}, } DEFAULT_FEED_VALVE = 0.0 # RECORDER recording_flag = False recording_task = None recording_writer = None recording_file = None write_buffer = deque() flush_interval = 1.0 # flush every 1 second last_flush_time = datetime.datetime.now() ## LOGGING QSkid async def update_latest_flow(): async with aiohttp.ClientSession() as session: while True: try: async with session.get("http://192.168.1.28:8000/instant_flow") as resp: data = await resp.json() latest_flow = int(data["log"]["flow"]) logging.debug(f"Updated flow: {latest_flow}") latest_data["PatientSkid"]["QSkid"] = latest_flow except Exception as e: logging.error(f"Error fetching flow: {e}") await asyncio.sleep(1.0) def format_data(data): return { "timestamp": datetime.datetime.now().isoformat(), "Qperm": np.round(data.get("FM2", 0.0), 1), "Qdilute": np.round(data.get("FM1", 0.0), 1), "Qdrain": np.round(data.get("FM4", 0.0), 1), "Qrecirc": np.round(data.get("FM3", 0.0), 1), "Pro": np.round(data.get("PS2", 0.0), 1), "Pdilute": np.round(data.get("PS3", 0.0), 1), "Pretentate": np.round(data.get("PS1", 0.0), 1), "Conductivity": np.round(data.get("Cond", 0.0), 1), "MV02": np.round(data.get("MV02", 0.0), 1), "MV02_sp": np.round(data.get("MV02_sp", 0.0), 1), "MV03": np.round(data.get("MV03", 0.0), 1), "MV03_sp": np.round(data.get("MV03_sp", 0.0), 1), "MV04": np.round(data.get("MV05", 0.0), 1), "MV04_sp": np.round(data.get("MV04_sp", 0.0), 1), "MV05": np.round(data.get("MV05", 0.0), 1), "MV05_sp": np.round(data.get("MV05_sp", 0.0), 1), "MV06": np.round(data.get("MV06", 0.0), 1), "MV06_sp": np.round(data.get("MV06_sp", 0.0), 1), "MV07": np.round(data.get("MV07", 0.0), 1), "MV07_sp": np.round(data.get("MV07_sp", 0.0), 1), "MV08": np.round(data.get("MV08", 0.0), 1), "MV08_sp": np.round(data.get("MV08_sp", 0.0), 1), } # CREDENTIALS # Load users from JSON file at startup CREDENTIAL_PATH = Path("credentials.json") if CREDENTIAL_PATH.exists(): with CREDENTIAL_PATH.open("r") as f: CREDENTIALS = json.load(f) else: CREDENTIALS = {} USERNAME = CREDENTIALS["username"] PASSWORD = CREDENTIALS["password"] # ======== LOGIN & SESSION HANDLING ======== def require_login(request: Request): user = request.session.get("user") if user != USERNAME: # raise 302 to trigger redirection manually (FastAPI doesn't support redirects from Depends directly) raise StarletteHTTPException(status_code=302, detail="Redirect to login") return user @app.get("/", response_class=HTMLResponse) def login_form(request: Request): return templates.TemplateResponse("login.html", {"request": request}) @app.post("/login") def login(request: Request, username: str = Form(...), password: str = Form(...)): if username == USERNAME and password == PASSWORD: request.session["user"] = username return RedirectResponse("/control", status_code=HTTP_302_FOUND) return templates.TemplateResponse( "login.html", {"request": request, "error": "Invalid credentials.json"} ) @app.get("/logout") def logout(request: Request): request.session.clear() return RedirectResponse("/", status_code=HTTP_302_FOUND) # ======== PROTECTED INTERFACE ======== @app.get("/control", response_class=HTMLResponse) def control_page(request: Request): if request.session.get("user") != USERNAME: return RedirectResponse("/", status_code=HTTP_302_FOUND) return templates.TemplateResponse("control.html", {"request": request}) @app.get("/monitor-page", response_class=HTMLResponse) def monitor_page(request: Request): with open("static/monitor.html") as f: return HTMLResponse(f.read()) @app.get("/multi-monitor-page", response_class=HTMLResponse) def monitor_page(request: Request): with open("static/multi_pu_dashboard.html") as f: return HTMLResponse(f.read()) # ======== CAN + BACKEND ROUTES ======== @app.post("/connect_toggle") def connect_toggle(): logging.info("Toggling CAN connection...") if can_backend.connected: can_backend.shutdown() return {"connected": False} else: success = can_backend.connect() try: valve_backend.connect() except Exception as e: print(f"Connection error : {e}") if not success: raise HTTPException(status_code=500, detail="Connection failed.") return {"connected": True} @app.post("/command/{state}/pu/{pu_number}") def send_command(state: str, pu_number: int, ploop_setpoint: float = Query(...)): global DEFAULT_FEED_VALVE VALID_STATES = { "IDLE", "PRE-PRODUCTION", "PRODUCTION", "FIRST_START", "THERMALLOOPCLEANING", "DISINFECTION", "SLEEP", } state = state.upper() if state not in VALID_STATES: raise HTTPException(status_code=400, detail=f"Invalid state '{state}'") logging.info(f"Sending state '{state}' to PU {pu_number}") if state == "PRE-PRODUCTION": valve_backend.send_command(70) elif "IDLE": valve_backend.send_command(DEFAULT_FEED_VALVE) try: can_backend.send_state_command(state, pu_number, ploop_setpoint) current_state = can_backend.read_current_state(pu_number) return { "status": "success", "command": state, "pu": pu_number, "ploop_setpoint": ploop_setpoint, "current_state": current_state, } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/pu_status") def get_pu_status(): states = { "PU1": can_backend.read_current_state(1), "PU2": can_backend.read_current_state(2), "PU3": can_backend.read_current_state(3), } logging.info(f"[PU STATUS] {states}") return JSONResponse(content=states) async def update_latest_data(): while True: for pu in [ 1, 2, ]: # TODO: REPLACE THIS WITH CONNECTED PUs, IS GET PU STATUS SLOW? data = can_backend.get_latest_data(pu_number=pu) latest_data[f"PU_{pu}"] = format_data(data) current_data = latest_data[f"PU_{pu}"] logging.debug(f"[MONITOR BUFFER] PU{pu}: {current_data}") # logging.info(f"[MONITOR BUFFER] latest_data: {latest_data}") await asyncio.sleep(0.1) @app.get("/monitor") async def get_monitor_data(pu_number: Optional[float] = Query(None)): print(f"pu_number is {pu_number}") if pu_number is not None: return latest_data.get(f"PU_{pu_number}", {}) else: # print(latest_data) return latest_data @app.on_event("startup") async def startup_event(): asyncio.create_task(update_latest_data()) asyncio.create_task(update_latest_flow()) @app.get("/can_status") def can_status(): """Return current CAN connection status.""" return {"connected": can_backend.connected} @app.post("/command/feed_valve") def feedvalve_control(MV01_opening: int = Query(...)): """Control MV01 feed valve""" global DEFAULT_FEED_VALVE DEFAULT_FEED_VALVE = MV01_opening valve_backend.send_command(MV01_opening) logging.info(f"Feed valve opening to {MV01_opening}") return {"status": "ok"} # LOCAL RECORDER @app.post("/start_recording") async def start_recording(): global recording_flag, recording_task, recording_file, recording_writer if recording_flag: raise HTTPException(status_code=400, detail="Already recording.") now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"recording_{now}.csv" os.makedirs("recordings", exist_ok=True) filepath = os.path.join("recordings", filename) recording_file = open(filepath, "w", newline="") fieldnames = ["timestamp", "pu", "QSkid"] + list(format_data({}).keys()) recording_writer = csv.DictWriter(recording_file, fieldnames=fieldnames) recording_writer.writeheader() recording_flag = True recording_task = asyncio.create_task(record_data_loop()) logging.info(f"[RECORDING STARTED] File: {filepath}") return {"status": "recording started", "file": filename} @app.post("/stop_recording") async def stop_recording(): global recording_flag, recording_task, recording_file if not recording_flag: raise HTTPException(status_code=400, detail="Not recording.") recording_flag = False if recording_task: await recording_task recording_task = None if recording_file: recording_file.close() recording_file = None logging.info("[RECORDING STOPPED]") return {"status": "recording stopped"} async def record_data_loop(): global recording_writer, recording_file, write_buffer, last_flush_time while recording_flag: timestamp = datetime.datetime.now().isoformat() for pu, data in latest_data.items(): if data: row = { "timestamp": timestamp, "pu": pu, **data } recording_writer.writerow(row) # Flush every flush_interval seconds if ( datetime.datetime.now() - last_flush_time ).total_seconds() >= flush_interval: recording_file.flush() last_flush_time = datetime.datetime.now() await asyncio.sleep(0.1) # 10 Hz app.include_router(router) if __name__ == "__main__": import uvicorn uvicorn.run( "main:app", host="127.0.0.1", port=8080, reload=True, reload_dirs=["."], )