NorthStar-HMI/analysis/read_recording.py
2025-08-12 11:42:04 +02:00

154 lines
5.0 KiB
Python

# Cell 1: Import des bibliothèques
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
# Configuration pour des graphiques lisibles
sns.set(style="whitegrid")
plt.rcParams['figure.figsize'] = (14, 6)
# Chargement des données
df = pd.read_csv("../recordings/recording_20250718_112807.csv", parse_dates=["timestamp"])
df_PatientSkid = df[df['pu'] == 'PatientSkid'].copy()
# Cellule finale : Affichage multi-PU par grandeur
import matplotlib.dates as mdates
reference_lines = {
'Qperm': 1200,
'Pdilute': 2.5
}
quantities = ['Qperm', 'Qdilute', 'Qdrain', 'Pro', 'Pdilute','MV07_sp']
n_quantities = len(quantities)
pus_all =pus = ['PU_1','PU_2','PU_3']
fig, axes = plt.subplots(n_quantities, 1, figsize=(14, 3 * n_quantities), sharex=True)
fig.suptitle("Évolution des grandeurs par PU", fontsize=16)
for i, quantity in enumerate(quantities):
ax = axes[i]
for pu in pus_all:
df_pu = df[df['pu'] == pu]
if quantity in df_pu.columns:
ax.plot(df_pu['timestamp'], df_pu[quantity], label=pu)
if quantity in reference_lines:
ax.axhline(reference_lines[quantity], linestyle='--', color='red')
if quantity == 'Qdilute':
ax.plot(df_PatientSkid['timestamp'], df_PatientSkid['QSkid'], label='QSkid')
ax.set_ylabel(quantity)
ax.grid(True)
ax.legend(loc='upper right')
if i == n_quantities - 1:
ax.set_xlabel("Timestamp")
else:
ax.set_xlabel("")
ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M:%S"))
plt.tight_layout(rect=[0, 0, 1, 0.96])
plt.show()
# Analyse initiale pour PU_1
df_pu_1 = df[df['pu'] == 'PU_1'].copy()
df_pu_1.sort_values('timestamp', inplace=True)
df_pu_1['delta_t'] = df_pu_1['timestamp'].diff().dt.total_seconds()
df_pu_1 = df_pu_1.iloc[1:] # Supprimer la première valeur NaN
plt.figure('Time between messages',figsize=(10, 4))
sns.histplot(df_pu_1['delta_t'], bins=10,stat='probability')
plt.title("Time between messages for PU_1")
plt.xlabel("Timestamp")
plt.ylabel("Δt (seconds)")
plt.grid(True)
plt.tight_layout()
# plt.show()
print("Average time is ", df_pu_1['delta_t'].mean())
def plot_pu_data(pu_name):
# Filtrage
df_pu = df[df['pu'] == pu_name].copy()
df_pu['timestamp'] = pd.to_datetime(df_pu['timestamp'], errors='coerce')
df_pu = df_pu.dropna(subset=['timestamp'])
# --------- Plot 1: Débits ---------
flow_cols = ['Qperm', 'Qdilute', 'Qdrain', 'Qrecirc']
available_flows = [col for col in flow_cols if col in df_pu.columns]
if available_flows:
fig, ax = plt.subplots(figsize=(10, 4))
for col in available_flows:
ax.plot(df_pu['timestamp'], df_pu[col], label=col)
ax.plot(df_PatientSkid['timestamp'], df_PatientSkid['QSkid'],label='QSkid')
ax.set_title(f'{pu_name} - Flow Rates')
ax.set_xlabel("Timestamp")
ax.set_ylabel("Flow (L/min)")
ax.legend(loc='upper right')
ax.grid(True)
fig.tight_layout()
# plt.show()
# --------- Plot 2: Pressions ---------
pressure_cols = ['Pro', 'Pdilute', 'Pretentate']
available_pressures = [col for col in pressure_cols if col in df_pu.columns]
if available_pressures:
fig, ax = plt.subplots(figsize=(10, 4))
for col in available_pressures:
ax.plot(df_pu['timestamp'], df_pu[col], label=col)
ax.set_title(f'{pu_name} - Pressures')
ax.set_xlabel("Timestamp")
ax.set_ylabel("Pressure (bar)")
ax.legend(loc='upper right')
ax.grid(True)
fig.tight_layout()
# plt.show()
# --------- Plot 3: Motor Valve Positions ---------
mv_indices = range(2, 9) # MV02 à MV08
fig, axes = plt.subplots(3, 3, figsize=(15, 10), sharex=True)
fig.suptitle(f'{pu_name} - Motor Valve Positions vs Setpoints', fontsize=16)
plot_index = 0
for mv in mv_indices:
mv_real = f"MV0{mv}"
mv_sp = f"MV0{mv}_sp"
row, col = divmod(plot_index, 3)
ax = axes[row, col]
if mv_real in df_pu.columns and mv_sp in df_pu.columns:
ax.plot(df_pu['timestamp'], df_pu[mv_real], label='Actual', color='blue')
ax.plot(df_pu['timestamp'], df_pu[mv_sp], label='Setpoint', linestyle='--', color='orange')
ax.set_title(f"{mv_real}")
ax.set_ylabel("Position (%)")
ax.grid(True)
if row == 2:
ax.set_xlabel("Timestamp")
else:
ax.set_visible(False)
plot_index += 1
# Cacher les sous-graphiques inutilisés
while plot_index < 9:
row, col = divmod(plot_index, 3)
axes[row, col].set_visible(False)
plot_index += 1
handles, labels = axes[0][0].get_legend_handles_labels()
fig.legend(handles, labels, loc='upper right')
fig.tight_layout(rect=[0, 0, 1, 0.96])
# plt.show()
# Cell final : Affichage pour tous les PU
pus = df['pu'].dropna().unique()
print("PU disponibles :", pus)
pus = ['PU_1']
for pu in pus:
print(f"\n--- Data for {pu} ---\n")
plot_pu_data(pu)
plt.show()