From d2c3a11ce2c5fe8bba739d481a345ea81168931f Mon Sep 17 00:00:00 2001 From: Etienne Chassaing <60154720+cetiennec@users.noreply.github.com> Date: Thu, 17 Jul 2025 16:12:17 +0200 Subject: [PATCH] Adds automatic tests logic --- main.py | 113 ++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 81 insertions(+), 32 deletions(-) diff --git a/main.py b/main.py index fec2f61..e9558a9 100644 --- a/main.py +++ b/main.py @@ -22,6 +22,7 @@ import csv from collections import deque import numpy as np import aiohttp +import httpx if platform.system() in ["Darwin"]: # macOS or Windows from MockCAN import CANBackend @@ -66,20 +67,7 @@ flush_interval = 1.0 # flush every 1 second last_flush_time = datetime.datetime.now() -## LOGGING QSkid -async def update_latest_flow(): - async with aiohttp.ClientSession() as session: - while True: - try: - async with session.get("http://192.168.1.28:8000/instant_flow") as resp: - data = await resp.json() - latest_flow = int(data["log"]["flow"]) - logging.debug(f"Updated flow: {latest_flow}") - latest_data["PatientSkid"]["QSkid"] = latest_flow - - except Exception as e: - logging.error(f"Error fetching flow: {e}") - await asyncio.sleep(1.0) +## LOGGING def format_data(data): @@ -293,25 +281,7 @@ def feedvalve_control(MV01_opening: int = Query(...)): logging.info(f"Feed valve opening to {MV01_opening}") return {"status": "ok"} -# AUTOMATIC TESTING -@router.post("/test/auto/1") -async def auto_test_pu1(): - # Call the function for PU1 auto test - logging.info("Start auto test of 1 PU") - return {"status": "started", "pu": 1} - -@router.post("/test/auto/2") -async def auto_test_pu2(): - # Call the function for PU2 auto test - logging.info("Start auto test of 2 PU") - return {"status": "started", "pu": 2} - -@router.post("/test/auto/3") -async def auto_test_pu3(): - # Call the function for PU3 auto test - logging.info("Start auto test of 3 PU") - return {"status": "started", "pu": 3} # LOCAL RECORDER @app.post("/start_recording") @@ -380,6 +350,85 @@ async def record_data_loop(): await asyncio.sleep(0.1) # 10 Hz +## AUTOMATIC TESTING + +async def send_command_with_delay(state: str, pu: int, delay_s: int = 0, ploop_setpoint: float = 0.0): + await asyncio.sleep(delay_s) + logging.info(f"[AUTO TEST] Sending {state} to PU{pu} after {delay_s}s") + can_backend.send_state_command(state, pu, ploop_setpoint) + +async def set_patients_with_delay(count: int, delay_s: int): + await asyncio.sleep(delay_s) + logging.info(f"[AUTO TEST] Sending {count} patients to patient skid after {delay_s}s") + set_patient_skid_users(count) + +@router.post("/test/auto/1") +async def auto_test_pu1(ploop_setpoint: float = Query(0.0)): + pu = 1 + logging.info("[AUTO TEST] Starting automatic test for 1 PU") + asyncio.create_task(run_auto_test_pu1(pu, ploop_setpoint)) + return {"status": "started", "pu": pu} + +@router.post("/test/auto/2") +async def auto_test_pu2(ploop_setpoint: float = Query(0.0)): + logging.info("[AUTO TEST] Starting automatic test for 2 PUs") + asyncio.create_task(run_auto_test_pu2(ploop_setpoint)) + return {"status": "started", "pu": [1, 2]} + +async def run_auto_test_pu1(pu: int, ploop_setpoint: float): + await send_command_with_delay("PRE-PRODUCTION", pu, delay_s=0, ploop_setpoint=ploop_setpoint) + await send_command_with_delay("PRODUCTION", pu, delay_s=180, ploop_setpoint=ploop_setpoint) + await set_patients_with_delay(5, delay_s=60) + await set_patients_with_delay(10, delay_s=60) + await send_command_with_delay("IDLE", pu, delay_s=60, ploop_setpoint=ploop_setpoint) + logging.info("[AUTO TEST] Finished PU1 test") + +async def run_auto_test_pu2(ploop_setpoint: float): + # Step 1: Run PU1 test + await run_auto_test_pu1(1, ploop_setpoint) + + # Step 2: PU2 sequence + await send_command_with_delay("PRE-PRODUCTION", 2, delay_s=0, ploop_setpoint=ploop_setpoint) + await send_command_with_delay("PRODUCTION", 2, delay_s=180, ploop_setpoint=ploop_setpoint) + await set_patients_with_delay(15, delay_s=60) + await set_patients_with_delay(0, delay_s=60) + await send_command_with_delay("IDLE", 2, delay_s=60, ploop_setpoint=ploop_setpoint) + await send_command_with_delay("IDLE", 1, delay_s=60, ploop_setpoint=ploop_setpoint) + logging.info("[AUTO TEST] Finished PU1 + PU2 test") + +@router.post("/test/auto/3") +async def auto_test_pu3(): + # Call the function for PU3 auto test + logging.info("Start auto test of 3 PU") + return {"status": "started", "pu": 3} + +# PATIENT SKID HELPERS +async def update_latest_flow(): + async with aiohttp.ClientSession() as session: + while True: + try: + async with session.get("http://192.168.1.28:8000/instant_flow") as resp: + data = await resp.json() + latest_flow = int(data["log"]["flow"]) + logging.debug(f"Updated flow: {latest_flow}") + latest_data["PatientSkid"]["QSkid"] = latest_flow + + except Exception as e: + logging.error(f"Error fetching flow: {e}") + await asyncio.sleep(1.0) + +def set_patient_skid_users(count: int = 1): + try: + url = f"http://192.168.1.28:8000/set_users/{count}" + response = httpx.get(url, timeout=5.0) + + if response.status_code == 200: + return {"status": "success", "detail": response.json()} + else: + raise HTTPException(status_code=502, detail=f"Remote server error: {response.text}") + except httpx.RequestError as e: + raise HTTPException(status_code=500, detail=f"Request to external server failed: {str(e)}") + app.include_router(router)