Adds automatic tests logic

This commit is contained in:
Etienne Chassaing 2025-07-17 16:12:17 +02:00
parent 9da4d347e6
commit d2c3a11ce2

113
main.py
View File

@ -22,6 +22,7 @@ import csv
from collections import deque from collections import deque
import numpy as np import numpy as np
import aiohttp import aiohttp
import httpx
if platform.system() in ["Darwin"]: # macOS or Windows if platform.system() in ["Darwin"]: # macOS or Windows
from MockCAN import CANBackend from MockCAN import CANBackend
@ -66,20 +67,7 @@ flush_interval = 1.0 # flush every 1 second
last_flush_time = datetime.datetime.now() last_flush_time = datetime.datetime.now()
## LOGGING QSkid ## LOGGING
async def update_latest_flow():
async with aiohttp.ClientSession() as session:
while True:
try:
async with session.get("http://192.168.1.28:8000/instant_flow") as resp:
data = await resp.json()
latest_flow = int(data["log"]["flow"])
logging.debug(f"Updated flow: {latest_flow}")
latest_data["PatientSkid"]["QSkid"] = latest_flow
except Exception as e:
logging.error(f"Error fetching flow: {e}")
await asyncio.sleep(1.0)
def format_data(data): def format_data(data):
@ -293,25 +281,7 @@ def feedvalve_control(MV01_opening: int = Query(...)):
logging.info(f"Feed valve opening to {MV01_opening}") logging.info(f"Feed valve opening to {MV01_opening}")
return {"status": "ok"} return {"status": "ok"}
# AUTOMATIC TESTING
@router.post("/test/auto/1")
async def auto_test_pu1():
# Call the function for PU1 auto test
logging.info("Start auto test of 1 PU")
return {"status": "started", "pu": 1}
@router.post("/test/auto/2")
async def auto_test_pu2():
# Call the function for PU2 auto test
logging.info("Start auto test of 2 PU")
return {"status": "started", "pu": 2}
@router.post("/test/auto/3")
async def auto_test_pu3():
# Call the function for PU3 auto test
logging.info("Start auto test of 3 PU")
return {"status": "started", "pu": 3}
# LOCAL RECORDER # LOCAL RECORDER
@app.post("/start_recording") @app.post("/start_recording")
@ -380,6 +350,85 @@ async def record_data_loop():
await asyncio.sleep(0.1) # 10 Hz await asyncio.sleep(0.1) # 10 Hz
## AUTOMATIC TESTING
async def send_command_with_delay(state: str, pu: int, delay_s: int = 0, ploop_setpoint: float = 0.0):
await asyncio.sleep(delay_s)
logging.info(f"[AUTO TEST] Sending {state} to PU{pu} after {delay_s}s")
can_backend.send_state_command(state, pu, ploop_setpoint)
async def set_patients_with_delay(count: int, delay_s: int):
await asyncio.sleep(delay_s)
logging.info(f"[AUTO TEST] Sending {count} patients to patient skid after {delay_s}s")
set_patient_skid_users(count)
@router.post("/test/auto/1")
async def auto_test_pu1(ploop_setpoint: float = Query(0.0)):
pu = 1
logging.info("[AUTO TEST] Starting automatic test for 1 PU")
asyncio.create_task(run_auto_test_pu1(pu, ploop_setpoint))
return {"status": "started", "pu": pu}
@router.post("/test/auto/2")
async def auto_test_pu2(ploop_setpoint: float = Query(0.0)):
logging.info("[AUTO TEST] Starting automatic test for 2 PUs")
asyncio.create_task(run_auto_test_pu2(ploop_setpoint))
return {"status": "started", "pu": [1, 2]}
async def run_auto_test_pu1(pu: int, ploop_setpoint: float):
await send_command_with_delay("PRE-PRODUCTION", pu, delay_s=0, ploop_setpoint=ploop_setpoint)
await send_command_with_delay("PRODUCTION", pu, delay_s=180, ploop_setpoint=ploop_setpoint)
await set_patients_with_delay(5, delay_s=60)
await set_patients_with_delay(10, delay_s=60)
await send_command_with_delay("IDLE", pu, delay_s=60, ploop_setpoint=ploop_setpoint)
logging.info("[AUTO TEST] Finished PU1 test")
async def run_auto_test_pu2(ploop_setpoint: float):
# Step 1: Run PU1 test
await run_auto_test_pu1(1, ploop_setpoint)
# Step 2: PU2 sequence
await send_command_with_delay("PRE-PRODUCTION", 2, delay_s=0, ploop_setpoint=ploop_setpoint)
await send_command_with_delay("PRODUCTION", 2, delay_s=180, ploop_setpoint=ploop_setpoint)
await set_patients_with_delay(15, delay_s=60)
await set_patients_with_delay(0, delay_s=60)
await send_command_with_delay("IDLE", 2, delay_s=60, ploop_setpoint=ploop_setpoint)
await send_command_with_delay("IDLE", 1, delay_s=60, ploop_setpoint=ploop_setpoint)
logging.info("[AUTO TEST] Finished PU1 + PU2 test")
@router.post("/test/auto/3")
async def auto_test_pu3():
# Call the function for PU3 auto test
logging.info("Start auto test of 3 PU")
return {"status": "started", "pu": 3}
# PATIENT SKID HELPERS
async def update_latest_flow():
async with aiohttp.ClientSession() as session:
while True:
try:
async with session.get("http://192.168.1.28:8000/instant_flow") as resp:
data = await resp.json()
latest_flow = int(data["log"]["flow"])
logging.debug(f"Updated flow: {latest_flow}")
latest_data["PatientSkid"]["QSkid"] = latest_flow
except Exception as e:
logging.error(f"Error fetching flow: {e}")
await asyncio.sleep(1.0)
def set_patient_skid_users(count: int = 1):
try:
url = f"http://192.168.1.28:8000/set_users/{count}"
response = httpx.get(url, timeout=5.0)
if response.status_code == 200:
return {"status": "success", "detail": response.json()}
else:
raise HTTPException(status_code=502, detail=f"Remote server error: {response.text}")
except httpx.RequestError as e:
raise HTTPException(status_code=500, detail=f"Request to external server failed: {str(e)}")
app.include_router(router) app.include_router(router)