NorthStar-HMI/main.py

488 lines
15 KiB
Python

from fastapi import FastAPI, HTTPException, Query, Form, Depends
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
import logging
import os
from fastapi import Request, APIRouter
import platform
from fastapi.templating import (
Jinja2Templates,
) # pip install fastapi uvicorn jinja2 python-multipart passlib
from starlette.middleware.sessions import SessionMiddleware
from starlette.exceptions import HTTPException as StarletteHTTPException
from starlette.status import HTTP_302_FOUND
import json
from pathlib import Path
from typing import Optional, Dict, Any
from fastapi import Query
import asyncio
import datetime
import csv
from collections import deque
import numpy as np
import aiohttp
import httpx
if platform.system() in ["Darwin"]: # macOS or Windows
from MockCAN import CANBackend
logging.basicConfig(level=logging.INFO)
else:
from classCAN import CANBackend # Your real backend
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
app = FastAPI()
app.add_middleware(SessionMiddleware, secret_key="your_super_secret_key")
router = APIRouter()
templates = Jinja2Templates(directory="templates")
can_backend = CANBackend()
# Serve static files (HTML, JS, CSS)
app.mount("/static", StaticFiles(directory="static"), name="static")
## BUFFER CREATION AND HELPER
# Global object to store the latest data
latest_data: Dict[str, Any] = {
"PU_1": None,
"PU_2": None,
"PU_3": None,
"DS": None,
"PatientSkid": {"QSkid": 0.0},
}
active_PUs: list[int] = []
DEFAULT_FEED_VALVE = 0.0
# RECORDER
recording_flag = False
recording_task = None
recording_writer = None
recording_file = None
write_buffer = deque()
flush_interval = 1.0 # flush every 1 second
last_flush_time = datetime.datetime.now()
## LOGGING
def format_PU_data(data):
return {
"timestamp": datetime.datetime.now().isoformat(),
"Qperm": np.round(data.get("FM2", 0.0), 1),
"Qdilute": np.round(data.get("FM1", 0.0), 1),
"Qdrain": np.round(data.get("FM4", 0.0), 1),
"Qrecirc": np.round(data.get("FM3", 0.0), 1),
"QdrainEDI": np.round(data.get("FM2", 0.0), 1)
- np.round(data.get("FM1", 0.0), 1),
"Pro": np.round(data.get("PS2", 0.0), 2),
"Pdilute": np.round(data.get("PS3", 0.0), 2),
"Pretentate": np.round(data.get("PS1", 0.0), 2),
"Cfeed": data.get("Conductivity_Feed", 0.0),
"Cperm": data.get("Conductivity_Permeate", 0.0),
"Cdilute": data.get("Conductivity_Product", 0.0),
"MV02": np.round(data.get("MV02", 0.0), 1),
"MV02_sp": np.round(data.get("MV02_sp", 0.0), 1),
"MV03": np.round(data.get("MV03", 0.0), 1),
"MV03_sp": np.round(data.get("MV03_sp", 0.0), 1),
"MV04": np.round(data.get("MV04", 0.0), 1),
"MV04_sp": np.round(data.get("MV04_sp", 0.0), 1),
"MV05": np.round(data.get("MV05", 0.0), 1),
"MV05_sp": np.round(data.get("MV05_sp", 0.0), 1),
"MV06": np.round(data.get("MV06", 0.0), 1),
"MV06_sp": np.round(data.get("MV06_sp", 0.0), 1),
"MV07": np.round(data.get("MV07", 0.0), 1),
"MV07_sp": np.round(data.get("MV07_sp", 0.0), 1),
"MV08": np.round(data.get("MV08", 0.0), 1),
"MV08_sp": np.round(data.get("MV08_sp", 0.0), 1),
}
def format_DS_data(data):
return {
"timestamp": datetime.datetime.now().isoformat(),
"Q_conso_filt": np.round(data.get("Qdrain_sp", 0.0), 1),
"TankLevel": np.round(data.get("TankLevel", 0.0), 2),
"Qinlet": np.round(data.get("Inlet_flow", 0.0), 1),
"Qoutlet": np.round(data.get("Outlet_flow", 0.0), 1),
"Q_conso_filt": np.round(data.get("Qdrain_sp", 0.0), 1),
"Q_conso_filt": np.round(data.get("Qdrain_sp", 0.0), 1),
"Q_conso_filt": np.round(data.get("Qdrain_sp", 0.0), 1),
}
# CREDENTIALS
# Load users from JSON file at startup
CREDENTIAL_PATH = Path("credentials.json")
if CREDENTIAL_PATH.exists():
with CREDENTIAL_PATH.open("r") as f:
CREDENTIALS = json.load(f)
else:
CREDENTIALS = {}
USERNAME = CREDENTIALS["username"]
PASSWORD = CREDENTIALS["password"]
# ======== LOGIN & SESSION HANDLING ========
def require_login(request: Request):
user = request.session.get("user")
if user != USERNAME:
# raise 302 to trigger redirection manually (FastAPI doesn't support redirects from Depends directly)
raise StarletteHTTPException(status_code=302, detail="Redirect to login")
return user
@app.get("/", response_class=HTMLResponse)
def login_form(request: Request):
return templates.TemplateResponse("login.html", {"request": request})
@app.post("/login")
def login(request: Request, username: str = Form(...), password: str = Form(...)):
if username == USERNAME and password == PASSWORD:
request.session["user"] = username
return RedirectResponse("/control", status_code=HTTP_302_FOUND)
return templates.TemplateResponse(
"login.html", {"request": request, "error": "Invalid credentials.json"}
)
@app.get("/logout")
def logout(request: Request):
request.session.clear()
return RedirectResponse("/", status_code=HTTP_302_FOUND)
# ======== PROTECTED INTERFACE ========
@app.on_event("startup")
async def startup_event():
asyncio.create_task(update_latest_data())
asyncio.create_task(update_latest_flow())
@app.get("/control", response_class=HTMLResponse)
def control_page(request: Request):
can_backend.connect()
if request.session.get("user") != USERNAME:
return RedirectResponse("/", status_code=HTTP_302_FOUND)
return templates.TemplateResponse("control.html", {"request": request})
@app.get("/monitor-DS", response_class=HTMLResponse)
def monitor_page(request: Request):
with open("static/monitor_DS.html") as f:
return HTMLResponse(f.read())
@app.get("/monitor-PU", response_class=HTMLResponse)
def monitor_page(request: Request):
with open("static/monitor_PU.html") as f:
return HTMLResponse(f.read())
@app.get("/multi-monitor-PU", response_class=HTMLResponse)
def monitor_page(request: Request):
with open("static/multi_pu_dashboard.html") as f:
return HTMLResponse(f.read())
# ======== CAN + BACKEND ROUTES ========
@app.post("/connect_toggle")
def connect_toggle():
logging.info(f"Toggling CAN connection, CAN is {can_backend.connected}")
if can_backend.connected:
can_backend.shutdown()
logging.info("Shutting down CAN connection...")
return {"connected": False}
else:
success = can_backend.connect()
if not success:
raise HTTPException(status_code=500, detail="Connection failed.")
return {"connected": can_backend.connected}
@app.get("/is_connected")
def is_connected():
return {"connected": can_backend.connected}
# PU CONTROL
@app.post("/command/{state}/pu/{pu_number}")
def send_command(state: str, pu_number: int, ploop_setpoint: float = Query(...)):
global DEFAULT_FEED_VALVE
VALID_STATES = {
"IDLE",
"PRE-PRODUCTION",
"PRODUCTION",
"FIRST_START",
"THERMALLOOPCLEANING",
"DISINFECTION",
"SLEEP",
}
state = state.upper()
if state not in VALID_STATES:
raise HTTPException(status_code=400, detail=f"Invalid state '{state}'")
logging.info(f"Sending state '{state}' to PU {pu_number}")
try:
can_backend.send_state_command(state, pu_number, ploop_setpoint)
current_state = can_backend.read_current_state(pu_number)
return {
"status": "success",
"command": state,
"pu": pu_number,
"ploop_setpoint": ploop_setpoint,
"current_state": current_state,
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
## MONITORING
@app.get("/api/pu_status")
def get_pu_status():
global active_PUs
states = {
"PU1": can_backend.read_current_state(1),
"PU2": can_backend.read_current_state(2),
"PU3": can_backend.read_current_state(3),
}
logging.info(f"[PU STATUS] {states}")
active_PUs = [
index + 1
for index, (pu, status) in enumerate(states.items())
if status != "Offline"
]
logging.info(f"[ACTIVE PU] {active_PUs}")
return JSONResponse(content=states)
async def update_latest_data():
global active_PUs
while True:
# DS
data = can_backend.get_latest_data(pu_number=0)
latest_data[f"DS"] = format_DS_data(data)
# PUs
for pu in active_PUs:
data = can_backend.get_latest_data(pu_number=pu)
latest_data[f"PU_{pu}"] = format_PU_data(data)
logging.debug(f"[MONITOR DS BUFFER] latest_data: {latest_data}")
await asyncio.sleep(0.05)
@app.get("/monitor")
async def get_monitor_data():
return latest_data
# return JSONResponse(content=latest_data)
# LOCAL RECORDER
@app.post("/start_recording")
async def start_recording():
global recording_flag, recording_task, recording_file, recording_writer
if recording_flag:
raise HTTPException(status_code=400, detail="Already recording.")
now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"recording_{now}.csv"
os.makedirs("recordings", exist_ok=True)
filepath = os.path.join("recordings", filename)
recording_file = open(filepath, "w", newline="")
fieldnames = ["timestamp", "pu", "QSkid"] + list(format_PU_data({}).keys())
recording_writer = csv.DictWriter(recording_file, fieldnames=fieldnames)
recording_writer.writeheader()
recording_flag = True
recording_task = asyncio.create_task(record_data_loop())
logging.info(f"[RECORDING STARTED] File: {filepath}")
return {"status": "recording started", "file": filename}
@app.post("/stop_recording")
async def stop_recording():
global recording_flag, recording_task, recording_file
if not recording_flag:
raise HTTPException(status_code=400, detail="Not recording.")
recording_flag = False
if recording_task:
await recording_task
recording_task = None
if recording_file:
recording_file.close()
recording_file = None
logging.info("[RECORDING STOPPED]")
return {"status": "recording stopped"}
async def record_data_loop():
global recording_writer, recording_file, write_buffer, last_flush_time
while recording_flag:
timestamp = datetime.datetime.now().isoformat()
for pu, data in latest_data.items():
if data:
row = {"timestamp": timestamp, "pu": pu, **data}
recording_writer.writerow(row)
# Flush every flush_interval seconds
if (
datetime.datetime.now() - last_flush_time
).total_seconds() >= flush_interval:
recording_file.flush()
last_flush_time = datetime.datetime.now()
await asyncio.sleep(0.05) # 10 Hz
## AUTOMATIC TESTING
async def send_command_with_delay(
state: str, pu: int, delay_s: int = 0, ploop_setpoint: float = 0.0
):
await asyncio.sleep(delay_s)
logging.info(f"[AUTO TEST] Sending {state} to PU{pu} after {delay_s}s")
can_backend.send_state_command(state, pu, ploop_setpoint)
async def set_patients_with_delay(count: int, delay_s: int):
await asyncio.sleep(delay_s)
logging.info(
f"[AUTO TEST] Sending {count} patients to patient skid after {delay_s}s"
)
set_patient_skid_users(count)
@router.post("/test/auto/1")
async def auto_test_pu1(ploop_setpoint: float = Query(0.0)):
pu = 1
logging.info("[AUTO TEST] Starting automatic test for 1 PU")
asyncio.create_task(run_auto_test_pu1(pu, ploop_setpoint))
return {"status": "started", "pu": pu}
@router.post("/test/auto/2")
async def auto_test_pu2(ploop_setpoint: float = Query(0.0)):
logging.info("[AUTO TEST] Starting automatic test for 2 PUs")
asyncio.create_task(run_auto_test_pu2(ploop_setpoint))
return {"status": "started", "pu": [1, 2]}
async def run_auto_test_pu1(pu: int, ploop_setpoint: float):
await send_command_with_delay(
"PRE-PRODUCTION", pu, delay_s=0, ploop_setpoint=ploop_setpoint
)
await send_command_with_delay(
"PRODUCTION", pu, delay_s=180, ploop_setpoint=ploop_setpoint
)
await set_patients_with_delay(5, delay_s=60)
await set_patients_with_delay(10, delay_s=60)
await send_command_with_delay("IDLE", pu, delay_s=60, ploop_setpoint=ploop_setpoint)
logging.info("[AUTO TEST] Finished PU1 test")
async def run_auto_test_pu2(ploop_setpoint: float):
# Step 1: Run PU1 test
await run_auto_test_pu1(1, ploop_setpoint)
# Step 2: PU2 sequence
await send_command_with_delay(
"PRE-PRODUCTION", 2, delay_s=0, ploop_setpoint=ploop_setpoint
)
await send_command_with_delay(
"PRODUCTION", 2, delay_s=180, ploop_setpoint=ploop_setpoint
)
await set_patients_with_delay(15, delay_s=60)
await set_patients_with_delay(0, delay_s=60)
await send_command_with_delay("IDLE", 2, delay_s=60, ploop_setpoint=ploop_setpoint)
await send_command_with_delay("IDLE", 1, delay_s=60, ploop_setpoint=ploop_setpoint)
logging.info("[AUTO TEST] Finished PU1 + PU2 test")
@router.post("/test/auto/3")
async def auto_test_pu3():
# Call the function for PU3 auto test
logging.info("Start auto test of 3 PU")
return {"status": "started", "pu": 3}
# PATIENT SKID HELPERS
async def update_latest_flow():
global active_PUs
async with aiohttp.ClientSession() as session:
while True:
try:
async with session.get("http://192.168.1.28:8000/instant_flow") as resp:
data = await resp.json()
latest_flow = int(data["log"]["flow"])
logging.debug(f"Updated flow: {latest_flow}")
latest_data["PatientSkid"]["QSkid"] = latest_flow
# for index in active_PUs :
# logging.debug("PU_"+str(index))
# latest_data["PU_"+str(index)]["QSkid"] = latest_flow # Adding the data to all actives PUs
except Exception as e:
logging.error(f"Error fetching flow: {e}")
await asyncio.sleep(1.0)
def set_patient_skid_users(count: int = 1):
try:
url = f"http://192.168.1.28:8000/set_users/{count}"
response = httpx.get(url, timeout=5.0)
if response.status_code == 200:
return {"status": "success", "detail": response.json()}
else:
raise HTTPException(
status_code=502, detail=f"Remote server error: {response.text}"
)
except httpx.RequestError as e:
raise HTTPException(
status_code=500, detail=f"Request to external server failed: {str(e)}"
)
app.include_router(router)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="127.0.0.1",
port=8080,
reload=True,
reload_dirs=["."],
)