NorthStar-HMI/main.py

344 lines
10 KiB
Python

from fastapi import FastAPI, HTTPException, Query, Form, Depends
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
import logging
import os
from fastapi import Request, APIRouter
import platform
from fastapi.templating import (
Jinja2Templates,
) # pip install fastapi uvicorn jinja2 python-multipart passlib
from starlette.middleware.sessions import SessionMiddleware
from starlette.exceptions import HTTPException as StarletteHTTPException
from starlette.status import HTTP_302_FOUND
import json
from pathlib import Path
from typing import Optional, Dict, Any
from fastapi import Query
import asyncio
import datetime
from valveBackend import ValveBackend
import csv
from collections import deque
import numpy as np
import aiohttp
if platform.system() in ["Darwin"]: # macOS or Windows
from MockCAN import CANBackend
logging.basicConfig(level=logging.INFO)
else:
from classCAN import CANBackend # Your real backend
logging.basicConfig(level=logging.ERROR)
app = FastAPI()
app.add_middleware(SessionMiddleware, secret_key="your_super_secret_key")
router = APIRouter()
templates = Jinja2Templates(directory="templates")
can_backend = CANBackend()
valve_backend = ValveBackend(eds_file="/home/hmi/Desktop/HMI/eds_file/inletvalveboard.eds")
# Serve static files (HTML, JS, CSS)
app.mount("/static", StaticFiles(directory="static"), name="static")
## BUFFER CREATION AND HELPER
# Global object to store the latest data
latest_data: Dict[str, Any] = {"PU_1": None, "PU_2": None, "PU_3": None, "PatientSkid" : {"QSkid":0.0}}
# RECORDER
recording_flag = False
recording_task = None
recording_writer = None
recording_file = None
write_buffer = deque()
flush_interval = 1.0 # flush every 1 second
last_flush_time = datetime.datetime.now()
## LOGGING QSkid
async def update_latest_flow():
async with aiohttp.ClientSession() as session:
while True:
try:
async with session.get("http://192.168.1.28:8000/instant_flow") as resp:
data = await resp.json()
latest_flow = int(data["log"]["flow"])
logging.debug(f"Updated flow: {latest_flow}")
latest_data["PatientSkid"]["QSkid"] = latest_flow
except Exception as e:
logging.error(f"Error fetching flow: {e}")
await asyncio.sleep(1.0)
def format_data(data):
return {
"timestamp": datetime.datetime.now().isoformat(),
"Qperm": data.get("FM2", 0.0),
"Qdilute": data.get("FM1", 0.0),
"Qdrain": data.get("FM4", 0.0),
"Qrecirc": data.get("FM3", 0.0),
"Pro": data.get("PS1", 0.0),
"Pdilute": data.get("PS3", 0.0),
"Prentate": data.get("PS2", 0.0),
"Conductivity": data.get("Cond", 0.0),
"MV02": data.get("MV02", 0.0),
"MV02_sp": data.get("MV02_sp", 0.0),
"MV03": data.get("MV03", 0.0),
"MV03_sp": data.get("MV03_sp", 0.0),
"MV04": data.get("MV05", 0.0),
"MV04_sp": data.get("MV05_sp", 0.0),
"MV05": data.get("MV05", 0.0),
"MV05_sp": data.get("MV05_sp", 0.0),
"MV06": data.get("MV06", 0.0),
"MV06_sp": data.get("MV06_sp", 0.0),
"MV07": data.get("MV07", 0.0),
"MV07_sp": data.get("MV07_sp", 0.0),
"MV08": data.get("MV08", 0.0),
"MV08_sp": data.get("MV08_sp", 0.0),
}
# CREDENTIALS
# Load users from JSON file at startup
CREDENTIAL_PATH = Path("credentials.json")
if CREDENTIAL_PATH.exists():
with CREDENTIAL_PATH.open("r") as f:
CREDENTIALS = json.load(f)
else:
CREDENTIALS = {}
USERNAME = CREDENTIALS["username"]
PASSWORD = CREDENTIALS["password"]
# ======== LOGIN & SESSION HANDLING ========
def require_login(request: Request):
user = request.session.get("user")
if user != USERNAME:
# raise 302 to trigger redirection manually (FastAPI doesn't support redirects from Depends directly)
raise StarletteHTTPException(status_code=302, detail="Redirect to login")
return user
@app.get("/", response_class=HTMLResponse)
def login_form(request: Request):
return templates.TemplateResponse("login.html", {"request": request})
@app.post("/login")
def login(request: Request, username: str = Form(...), password: str = Form(...)):
if username == USERNAME and password == PASSWORD:
request.session["user"] = username
return RedirectResponse("/control", status_code=HTTP_302_FOUND)
return templates.TemplateResponse(
"login.html", {"request": request, "error": "Invalid credentials.json"}
)
@app.get("/logout")
def logout(request: Request):
request.session.clear()
return RedirectResponse("/", status_code=HTTP_302_FOUND)
# ======== PROTECTED INTERFACE ========
@app.get("/control", response_class=HTMLResponse)
def control_page(request: Request):
if request.session.get("user") != USERNAME:
return RedirectResponse("/", status_code=HTTP_302_FOUND)
return templates.TemplateResponse("control.html", {"request": request})
@app.get("/monitor-page", response_class=HTMLResponse)
def monitor_page(request: Request):
with open("static/monitor.html") as f:
return HTMLResponse(f.read())
# ======== CAN + BACKEND ROUTES ========
@app.post("/connect_toggle")
def connect_toggle():
logging.info("Toggling CAN connection...")
if can_backend.connected:
can_backend.shutdown()
return {"connected": False}
else:
success = can_backend.connect()
try:
valve_backend.connect()
except Exception as e:
print(f"Connection error : {e}")
if not success:
raise HTTPException(status_code=500, detail="Connection failed.")
return {"connected": True}
@app.post("/command/{state}/pu/{pu_number}")
def send_command(state: str, pu_number: int, ploop_setpoint: float = Query(...)):
VALID_STATES = {
"IDLE",
"PRE-PRODUCTION",
"PRODUCTION",
"FIRST_START",
"THERMALLOOPCLEANING",
"DISINFECTION",
"SLEEP",
}
state = state.upper()
if state not in VALID_STATES:
raise HTTPException(status_code=400, detail=f"Invalid state '{state}'")
logging.info(f"Sending state '{state}' to PU {pu_number}")
try:
can_backend.send_state_command(state, pu_number, ploop_setpoint)
current_state = can_backend.read_current_state(pu_number)
return {
"status": "success",
"command": state,
"pu": pu_number,
"ploop_setpoint": ploop_setpoint,
"current_state": current_state,
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/pu_status")
def get_pu_status():
states = {
"PU1": can_backend.read_current_state(1),
"PU2": can_backend.read_current_state(2),
"PU3": can_backend.read_current_state(3),
}
logging.info(f"[PU STATUS] {states}")
return JSONResponse(content=states)
async def update_latest_data():
while True:
for pu in [1, 2]: # TODO: REPLACE THIS WITH CONNECTED PUs, IS GET PU STATUS SLOW?
data = can_backend.get_latest_data(pu_number=pu)
latest_data[f"PU_{pu}"] = format_data(data)
current_data = latest_data[f"PU_{pu}"]
logging.debug(f"[MONITOR BUFFER] PU{pu}: {current_data}")
# logging.info(f"[MONITOR BUFFER] latest_data: {latest_data}")
await asyncio.sleep(0.1)
@app.get("/monitor")
async def get_monitor_data(pu_number: Optional[float] = Query(None)):
print(f"pu_number is {pu_number}")
if pu_number is not None:
return latest_data.get(f"PU_{pu_number}", {})
else:
# print(latest_data)
return latest_data
@app.on_event("startup")
async def startup_event():
asyncio.create_task(update_latest_data())
asyncio.create_task(update_latest_flow())
@app.get("/can_status")
def can_status():
"""Return current CAN connection status."""
return {"connected": can_backend.connected}
@app.post("/command/feed_valve")
def feedvalve_control(MV01_opening: int = Query(...)):
"""Control MV01 feed valve"""
valve_backend.send_command(MV01_opening)
logging.info(f"Feed valve opening to {MV01_opening}")
return {"status": "ok"}
#LOCAL RECORDER
@app.post("/start_recording")
async def start_recording():
global recording_flag, recording_task, recording_file, recording_writer
if recording_flag:
raise HTTPException(status_code=400, detail="Already recording.")
now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"recording_{now}.csv"
os.makedirs("recordings", exist_ok=True)
filepath = os.path.join("recordings", filename)
recording_file = open(filepath, "w", newline="")
fieldnames = ["timestamp", "pu", "QSkid"] + list(format_data({}).keys())
recording_writer = csv.DictWriter(recording_file, fieldnames=fieldnames)
recording_writer.writeheader()
recording_flag = True
recording_task = asyncio.create_task(record_data_loop())
logging.info(f"[RECORDING STARTED] File: {filepath}")
return {"status": "recording started", "file": filename}
@app.post("/stop_recording")
async def stop_recording():
global recording_flag, recording_task, recording_file
if not recording_flag:
raise HTTPException(status_code=400, detail="Not recording.")
recording_flag = False
if recording_task:
await recording_task
recording_task = None
if recording_file:
recording_file.close()
recording_file = None
logging.info("[RECORDING STOPPED]")
return {"status": "recording stopped"}
async def record_data_loop():
global recording_writer, recording_file, write_buffer, last_flush_time
while recording_flag:
timestamp = datetime.datetime.now().isoformat()
for pu, data in latest_data.items():
if data:
print("record_data",data)
row = {
"timestamp": timestamp,
"pu": pu,
**data
}
recording_writer.writerow(row)
# Flush every flush_interval seconds
if (datetime.datetime.now() - last_flush_time).total_seconds() >= flush_interval:
recording_file.flush()
last_flush_time = datetime.datetime.now()
await asyncio.sleep(0.1) # 10 Hz
app.include_router(router)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="127.0.0.1",
port=8080,
reload=True,
reload_dirs=["."],
)