437 lines
14 KiB
Python
437 lines
14 KiB
Python
from fastapi import FastAPI, HTTPException, Query, Form, Depends
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
|
|
import logging
|
|
import os
|
|
from fastapi import Request, APIRouter
|
|
import platform
|
|
from fastapi.templating import (
|
|
Jinja2Templates,
|
|
) # pip install fastapi uvicorn jinja2 python-multipart passlib
|
|
from starlette.middleware.sessions import SessionMiddleware
|
|
from starlette.exceptions import HTTPException as StarletteHTTPException
|
|
from starlette.status import HTTP_302_FOUND
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any
|
|
from fastapi import Query
|
|
import asyncio
|
|
import datetime
|
|
import csv
|
|
from collections import deque
|
|
import numpy as np
|
|
import aiohttp
|
|
import httpx
|
|
|
|
if platform.system() in ["Darwin"]: # macOS or Windows
|
|
from MockCAN import CANBackend
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
else:
|
|
from classCAN import CANBackend # Your real backend
|
|
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
|
|
|
|
app = FastAPI()
|
|
app.add_middleware(SessionMiddleware, secret_key="your_super_secret_key")
|
|
router = APIRouter()
|
|
templates = Jinja2Templates(directory="templates")
|
|
can_backend = CANBackend()
|
|
|
|
# Serve static files (HTML, JS, CSS)
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
|
|
## BUFFER CREATION AND HELPER
|
|
# Global object to store the latest data
|
|
latest_data: Dict[str, Any] = {
|
|
"PU_1": None,
|
|
"PU_2": None,
|
|
"PU_3": None,
|
|
"PatientSkid": {"QSkid": 0.0},
|
|
}
|
|
|
|
active_PUs : list[int] = []
|
|
|
|
DEFAULT_FEED_VALVE = 0.0
|
|
|
|
# RECORDER
|
|
recording_flag = False
|
|
recording_task = None
|
|
recording_writer = None
|
|
recording_file = None
|
|
write_buffer = deque()
|
|
flush_interval = 1.0 # flush every 1 second
|
|
last_flush_time = datetime.datetime.now()
|
|
|
|
|
|
## LOGGING
|
|
|
|
|
|
def format_data(data):
|
|
return {
|
|
"timestamp": datetime.datetime.now().isoformat(),
|
|
"Qperm": np.round(data.get("FM2", 0.0), 1),
|
|
"Qdilute": np.round(data.get("FM1", 0.0), 1),
|
|
"Qdrain": np.round(data.get("FM4", 0.0), 1),
|
|
"Qrecirc": np.round(data.get("FM3", 0.0), 1),
|
|
"QdrainEDI": np.round(data.get("FM2", 0.0), 1)- np.round(data.get("FM1", 0.0), 1),
|
|
"Pro": np.round(data.get("PS2", 0.0), 2),
|
|
"Pdilute": np.round(data.get("PS3", 0.0), 2),
|
|
"Pretentate": np.round(data.get("PS1", 0.0), 2),
|
|
"Cfeed": data.get("Conductivity_Feed", 0.0),
|
|
"Cperm": data.get("Conductivity_Permeate", 0.0),
|
|
"Cdilute": data.get("Conductivity_Product", 0.0),
|
|
"MV02": np.round(data.get("MV02", 0.0), 1),
|
|
"MV02_sp": np.round(data.get("MV02_sp", 0.0), 1),
|
|
"MV03": np.round(data.get("MV03", 0.0), 1),
|
|
"MV03_sp": np.round(data.get("MV03_sp", 0.0), 1),
|
|
"MV04": np.round(data.get("MV04", 0.0), 1),
|
|
"MV04_sp": np.round(data.get("MV04_sp", 0.0), 1),
|
|
"MV05": np.round(data.get("MV05", 0.0), 1),
|
|
"MV05_sp": np.round(data.get("MV05_sp", 0.0), 1),
|
|
"MV06": np.round(data.get("MV06", 0.0), 1),
|
|
"MV06_sp": np.round(data.get("MV06_sp", 0.0), 1),
|
|
"MV07": np.round(data.get("MV07", 0.0), 1),
|
|
"MV07_sp": np.round(data.get("MV07_sp", 0.0), 1),
|
|
"MV08": np.round(data.get("MV08", 0.0), 1),
|
|
"MV08_sp": np.round(data.get("MV08_sp", 0.0), 1),
|
|
}
|
|
|
|
|
|
# CREDENTIALS
|
|
|
|
# Load users from JSON file at startup
|
|
CREDENTIAL_PATH = Path("credentials.json")
|
|
if CREDENTIAL_PATH.exists():
|
|
with CREDENTIAL_PATH.open("r") as f:
|
|
CREDENTIALS = json.load(f)
|
|
else:
|
|
CREDENTIALS = {}
|
|
|
|
USERNAME = CREDENTIALS["username"]
|
|
PASSWORD = CREDENTIALS["password"]
|
|
|
|
|
|
# ======== LOGIN & SESSION HANDLING ========
|
|
def require_login(request: Request):
|
|
user = request.session.get("user")
|
|
if user != USERNAME:
|
|
# raise 302 to trigger redirection manually (FastAPI doesn't support redirects from Depends directly)
|
|
raise StarletteHTTPException(status_code=302, detail="Redirect to login")
|
|
return user
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
def login_form(request: Request):
|
|
return templates.TemplateResponse("login.html", {"request": request})
|
|
|
|
|
|
@app.post("/login")
|
|
def login(request: Request, username: str = Form(...), password: str = Form(...)):
|
|
if username == USERNAME and password == PASSWORD:
|
|
request.session["user"] = username
|
|
return RedirectResponse("/control", status_code=HTTP_302_FOUND)
|
|
return templates.TemplateResponse(
|
|
"login.html", {"request": request, "error": "Invalid credentials.json"}
|
|
)
|
|
|
|
|
|
@app.get("/logout")
|
|
def logout(request: Request):
|
|
request.session.clear()
|
|
return RedirectResponse("/", status_code=HTTP_302_FOUND)
|
|
|
|
|
|
# ======== PROTECTED INTERFACE ========
|
|
|
|
|
|
@app.get("/control", response_class=HTMLResponse)
|
|
def control_page(request: Request):
|
|
can_backend.connect()
|
|
if request.session.get("user") != USERNAME:
|
|
return RedirectResponse("/", status_code=HTTP_302_FOUND)
|
|
return templates.TemplateResponse("control.html", {"request": request})
|
|
|
|
|
|
@app.get("/monitor-page", response_class=HTMLResponse)
|
|
def monitor_page(request: Request):
|
|
with open("static/monitor.html") as f:
|
|
return HTMLResponse(f.read())
|
|
|
|
@app.get("/multi-monitor-page", response_class=HTMLResponse)
|
|
def monitor_page(request: Request):
|
|
with open("static/multi_pu_dashboard.html") as f:
|
|
return HTMLResponse(f.read())
|
|
|
|
|
|
# ======== CAN + BACKEND ROUTES ========
|
|
|
|
@app.post("/connect_toggle")
|
|
def connect_toggle():
|
|
logging.info(f"Toggling CAN connection, CAN is {can_backend.connected}")
|
|
if can_backend.connected:
|
|
can_backend.shutdown()
|
|
logging.info("Shutting down CAN connection...")
|
|
return {"connected": False}
|
|
else:
|
|
success = can_backend.connect()
|
|
|
|
if not success:
|
|
raise HTTPException(status_code=500, detail="Connection failed.")
|
|
return {"connected": can_backend.connected}
|
|
|
|
@app.get("/is_connected")
|
|
def is_connected():
|
|
return {"connected": can_backend.connected}
|
|
|
|
|
|
@app.post("/command/{state}/pu/{pu_number}")
|
|
def send_command(state: str, pu_number: int, ploop_setpoint: float = Query(...)):
|
|
global DEFAULT_FEED_VALVE
|
|
VALID_STATES = {
|
|
"IDLE",
|
|
"PRE-PRODUCTION",
|
|
"PRODUCTION",
|
|
"FIRST_START",
|
|
"THERMALLOOPCLEANING",
|
|
"DISINFECTION",
|
|
"SLEEP",
|
|
}
|
|
|
|
state = state.upper()
|
|
|
|
if state not in VALID_STATES:
|
|
raise HTTPException(status_code=400, detail=f"Invalid state '{state}'")
|
|
|
|
logging.info(f"Sending state '{state}' to PU {pu_number}")
|
|
|
|
try:
|
|
can_backend.send_state_command(state, pu_number, ploop_setpoint)
|
|
current_state = can_backend.read_current_state(pu_number)
|
|
return {
|
|
"status": "success",
|
|
"command": state,
|
|
"pu": pu_number,
|
|
"ploop_setpoint": ploop_setpoint,
|
|
"current_state": current_state,
|
|
}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.get("/api/pu_status")
|
|
def get_pu_status():
|
|
global active_PUs
|
|
states = {
|
|
"PU1": can_backend.read_current_state(1),
|
|
"PU2": can_backend.read_current_state(2),
|
|
"PU3": can_backend.read_current_state(3),
|
|
}
|
|
logging.info(f"[PU STATUS] {states}")
|
|
|
|
active_PUs = [index + 1 for index, (pu, status) in enumerate(states.items()) if status != 'Offline']
|
|
logging.info(f"[ACTIVE PU] {active_PUs}")
|
|
|
|
return JSONResponse(content=states)
|
|
|
|
|
|
async def update_latest_data():
|
|
global active_PUs
|
|
while True:
|
|
for pu in active_PUs: # TODO: test
|
|
data = can_backend.get_latest_data(pu_number=pu)
|
|
latest_data[f"PU_{pu}"] = format_data(data)
|
|
current_data = latest_data[f"PU_{pu}"]
|
|
logging.debug(f"[MONITOR BUFFER] PU{pu}: {current_data}")
|
|
# logging.info(f"[MONITOR BUFFER] latest_data: {latest_data}")
|
|
await asyncio.sleep(0.05)
|
|
|
|
|
|
@app.get("/monitor")
|
|
async def get_monitor_data(pu_number: Optional[float] = Query(None)):
|
|
if pu_number is not None:
|
|
return latest_data.get(f"PU_{pu_number}", {})
|
|
else:
|
|
# print(latest_data)
|
|
return latest_data
|
|
|
|
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
asyncio.create_task(update_latest_data())
|
|
asyncio.create_task(update_latest_flow())
|
|
|
|
|
|
@app.get("/can_status")
|
|
def can_status():
|
|
"""Return current CAN connection status."""
|
|
return {"connected": can_backend.connected}
|
|
|
|
|
|
# LOCAL RECORDER
|
|
@app.post("/start_recording")
|
|
async def start_recording():
|
|
global recording_flag, recording_task, recording_file, recording_writer
|
|
|
|
if recording_flag:
|
|
raise HTTPException(status_code=400, detail="Already recording.")
|
|
|
|
now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"recording_{now}.csv"
|
|
os.makedirs("recordings", exist_ok=True)
|
|
filepath = os.path.join("recordings", filename)
|
|
|
|
recording_file = open(filepath, "w", newline="")
|
|
fieldnames = ["timestamp", "pu", "QSkid"] + list(format_data({}).keys())
|
|
recording_writer = csv.DictWriter(recording_file, fieldnames=fieldnames)
|
|
recording_writer.writeheader()
|
|
|
|
recording_flag = True
|
|
recording_task = asyncio.create_task(record_data_loop())
|
|
logging.info(f"[RECORDING STARTED] File: {filepath}")
|
|
return {"status": "recording started", "file": filename}
|
|
|
|
|
|
@app.post("/stop_recording")
|
|
async def stop_recording():
|
|
global recording_flag, recording_task, recording_file
|
|
|
|
if not recording_flag:
|
|
raise HTTPException(status_code=400, detail="Not recording.")
|
|
|
|
recording_flag = False
|
|
if recording_task:
|
|
await recording_task
|
|
recording_task = None
|
|
|
|
if recording_file:
|
|
recording_file.close()
|
|
recording_file = None
|
|
|
|
logging.info("[RECORDING STOPPED]")
|
|
return {"status": "recording stopped"}
|
|
|
|
|
|
async def record_data_loop():
|
|
global recording_writer, recording_file, write_buffer, last_flush_time
|
|
|
|
while recording_flag:
|
|
timestamp = datetime.datetime.now().isoformat()
|
|
for pu, data in latest_data.items():
|
|
if data:
|
|
row = {
|
|
"timestamp": timestamp,
|
|
"pu": pu,
|
|
**data
|
|
}
|
|
recording_writer.writerow(row)
|
|
|
|
# Flush every flush_interval seconds
|
|
if (
|
|
datetime.datetime.now() - last_flush_time
|
|
).total_seconds() >= flush_interval:
|
|
recording_file.flush()
|
|
last_flush_time = datetime.datetime.now()
|
|
|
|
await asyncio.sleep(0.05) # 10 Hz
|
|
|
|
## AUTOMATIC TESTING
|
|
|
|
async def send_command_with_delay(state: str, pu: int, delay_s: int = 0, ploop_setpoint: float = 0.0):
|
|
await asyncio.sleep(delay_s)
|
|
logging.info(f"[AUTO TEST] Sending {state} to PU{pu} after {delay_s}s")
|
|
can_backend.send_state_command(state, pu, ploop_setpoint)
|
|
|
|
async def set_patients_with_delay(count: int, delay_s: int):
|
|
await asyncio.sleep(delay_s)
|
|
logging.info(f"[AUTO TEST] Sending {count} patients to patient skid after {delay_s}s")
|
|
set_patient_skid_users(count)
|
|
|
|
@router.post("/test/auto/1")
|
|
async def auto_test_pu1(ploop_setpoint: float = Query(0.0)):
|
|
pu = 1
|
|
logging.info("[AUTO TEST] Starting automatic test for 1 PU")
|
|
asyncio.create_task(run_auto_test_pu1(pu, ploop_setpoint))
|
|
return {"status": "started", "pu": pu}
|
|
|
|
@router.post("/test/auto/2")
|
|
async def auto_test_pu2(ploop_setpoint: float = Query(0.0)):
|
|
logging.info("[AUTO TEST] Starting automatic test for 2 PUs")
|
|
asyncio.create_task(run_auto_test_pu2(ploop_setpoint))
|
|
return {"status": "started", "pu": [1, 2]}
|
|
|
|
async def run_auto_test_pu1(pu: int, ploop_setpoint: float):
|
|
await send_command_with_delay("PRE-PRODUCTION", pu, delay_s=0, ploop_setpoint=ploop_setpoint)
|
|
await send_command_with_delay("PRODUCTION", pu, delay_s=180, ploop_setpoint=ploop_setpoint)
|
|
await set_patients_with_delay(5, delay_s=60)
|
|
await set_patients_with_delay(10, delay_s=60)
|
|
await send_command_with_delay("IDLE", pu, delay_s=60, ploop_setpoint=ploop_setpoint)
|
|
logging.info("[AUTO TEST] Finished PU1 test")
|
|
|
|
async def run_auto_test_pu2(ploop_setpoint: float):
|
|
# Step 1: Run PU1 test
|
|
await run_auto_test_pu1(1, ploop_setpoint)
|
|
|
|
# Step 2: PU2 sequence
|
|
await send_command_with_delay("PRE-PRODUCTION", 2, delay_s=0, ploop_setpoint=ploop_setpoint)
|
|
await send_command_with_delay("PRODUCTION", 2, delay_s=180, ploop_setpoint=ploop_setpoint)
|
|
await set_patients_with_delay(15, delay_s=60)
|
|
await set_patients_with_delay(0, delay_s=60)
|
|
await send_command_with_delay("IDLE", 2, delay_s=60, ploop_setpoint=ploop_setpoint)
|
|
await send_command_with_delay("IDLE", 1, delay_s=60, ploop_setpoint=ploop_setpoint)
|
|
logging.info("[AUTO TEST] Finished PU1 + PU2 test")
|
|
|
|
@router.post("/test/auto/3")
|
|
async def auto_test_pu3():
|
|
# Call the function for PU3 auto test
|
|
logging.info("Start auto test of 3 PU")
|
|
return {"status": "started", "pu": 3}
|
|
|
|
# PATIENT SKID HELPERS
|
|
async def update_latest_flow():
|
|
global active_PUs
|
|
async with aiohttp.ClientSession() as session:
|
|
while True:
|
|
try:
|
|
async with session.get("http://192.168.1.28:8000/instant_flow") as resp:
|
|
data = await resp.json()
|
|
latest_flow = int(data["log"]["flow"])
|
|
logging.debug(f"Updated flow: {latest_flow}")
|
|
latest_data["PatientSkid"]["QSkid"] = latest_flow
|
|
# for index in active_PUs :
|
|
# logging.debug("PU_"+str(index))
|
|
# latest_data["PU_"+str(index)]["QSkid"] = latest_flow # Adding the data to all actives PUs
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error fetching flow: {e}")
|
|
await asyncio.sleep(1.0)
|
|
|
|
def set_patient_skid_users(count: int = 1):
|
|
try:
|
|
url = f"http://192.168.1.28:8000/set_users/{count}"
|
|
response = httpx.get(url, timeout=5.0)
|
|
|
|
if response.status_code == 200:
|
|
return {"status": "success", "detail": response.json()}
|
|
else:
|
|
raise HTTPException(status_code=502, detail=f"Remote server error: {response.text}")
|
|
except httpx.RequestError as e:
|
|
raise HTTPException(status_code=500, detail=f"Request to external server failed: {str(e)}")
|
|
|
|
|
|
app.include_router(router)
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
uvicorn.run(
|
|
"main:app",
|
|
host="127.0.0.1",
|
|
port=8080,
|
|
reload=True,
|
|
reload_dirs=["."],
|
|
)
|