PS_BLE_Tracker/Python/Xiao_BLE_tools/xiao_calibration_tool.py

278 lines
10 KiB
Python

"""
XIAO Airsoft Calibration Tool v3
- Rendu optimisé : set_data() sans redraw complet
- Fenêtre glissante fixe (pas de grossissement des buffers)
- Aucune latence même après des heures
"""
import asyncio
import struct
import threading
from collections import deque
from bleak import BleakClient, BleakScanner
DEVICE_NAME = "XIAO Airsoft Pro"
DEBUG_CHAR_UUID = "6E400005-B5A3-F393-E0A9-E50E24DCCA9E"
SHOT_CHAR_UUID = "6E400004-B5A3-F393-E0A9-E50E24DCCA9E"
# Fenêtre fixe : WINDOW_SIZE points affichés, jamais plus
WINDOW_SIZE = 200 # ~10s à 20Hz — ajustez si besoin
# Buffers circulaires de taille fixe
accel_buf = deque([0.0] * WINDOW_SIZE, maxlen=WINDOW_SIZE)
gyro_buf = deque([0.0] * WINDOW_SIZE, maxlen=WINDOW_SIZE)
audio_buf = deque([0] * WINDOW_SIZE, maxlen=WINDOW_SIZE)
accel_trig = deque([False] * WINDOW_SIZE, maxlen=WINDOW_SIZE)
gyro_trig = deque([False] * WINDOW_SIZE, maxlen=WINDOW_SIZE)
audio_trig = deque([False] * WINDOW_SIZE, maxlen=WINDOW_SIZE)
thresholds = {"accel": 2.5, "gyro": 200.0, "audio": 3000} # PDM : 0-32767
shot_count = 0
ble_status = "🔍 Connexion..."
ble_running = True
audio_max_global = 1000 # Tracks le max absolu jamais vu
import numpy as np
X = np.arange(WINDOW_SIZE) # axe X fixe, ne change jamais
# ─── BLE ────────────────────────────────────────────────
def debug_callback(sender, data):
global audio_max_global
if len(data) < 14:
return
accel_buf.append( struct.unpack('<f', data[1:5])[0] )
gyro_buf.append( struct.unpack('<f', data[5:9])[0] )
val = struct.unpack('<H', data[9:11])[0]
audio_buf.append(val)
if val > audio_max_global:
audio_max_global = val
accel_trig.append(bool(data[11]))
gyro_trig.append( bool(data[12]))
audio_trig.append(bool(data[13]))
def shot_callback(sender, data):
global shot_count
if data[0] == 1:
shot_count += 1
async def find_device():
found = None
def cb(device, adv):
nonlocal found
if device.name and DEVICE_NAME.lower() in device.name.lower():
found = device
scanner = BleakScanner(cb)
await scanner.start()
for _ in range(20):
if found: break
await asyncio.sleep(0.5)
await scanner.stop()
return found
async def ble_loop():
global ble_status, ble_running
while ble_running:
try:
ble_status = f"🔍 Recherche '{DEVICE_NAME}'..."
device = await find_device()
if not device:
ble_status = "⚠️ XIAO non trouvé — réessai..."
await asyncio.sleep(5)
continue
ble_status = f"📡 Connexion..."
async with BleakClient(device, timeout=15.0) as client:
await client.start_notify(DEBUG_CHAR_UUID, debug_callback)
await client.start_notify(SHOT_CHAR_UUID, shot_callback)
ble_status = f"{device.name} ({device.address})"
while client.is_connected and ble_running:
await asyncio.sleep(0.5)
ble_status = "❌ Déconnecté — reconnexion..."
except Exception as e:
ble_status = f"{str(e)[:50]}"
await asyncio.sleep(5)
def run_ble():
asyncio.run(ble_loop())
# ─── MATPLOTLIB optimisé ────────────────────────────────
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from matplotlib.animation import FuncAnimation
BG = '#1a1a2e'
PANEL = '#16213e'
TEXT = '#e0e0e0'
CA = '#4fc3f7' # accel
CG = '#81c784' # gyro
CM = '#ce93d8' # audio
CT = '#ef5350' # trigger / seuil
GRID = '#2a2a4a'
fig = plt.figure(figsize=(13, 9), facecolor=BG)
gs = gridspec.GridSpec(3, 1, hspace=0.5, top=0.92, bottom=0.06)
axes = [fig.add_subplot(gs[i]) for i in range(3)]
for ax in axes:
ax.set_facecolor(PANEL)
ax.tick_params(colors=TEXT, labelsize=8)
for sp in ax.spines.values():
sp.set_edgecolor('#444466')
ax.grid(True, alpha=0.2, color=GRID)
ax.set_xlim(0, WINDOW_SIZE - 1)
axes[0].set_ylabel("G", color=TEXT, fontsize=9)
axes[1].set_ylabel("°/s", color=TEXT, fontsize=9)
axes[2].set_ylabel("Niveau",color=TEXT, fontsize=9)
axes[2].set_xlabel("Échantillons (fenêtre glissante)",
color=TEXT, fontsize=8)
# Créer les artistes UNE SEULE FOIS — on ne les recrée jamais
line_a, = axes[0].plot(X, list(accel_buf), color=CA, lw=1.5)
line_g, = axes[1].plot(X, list(gyro_buf), color=CG, lw=1.5)
line_m, = axes[2].plot(X, list(audio_buf), color=CM, lw=1.5)
thr_a = axes[0].axhline(thresholds["accel"], color=CT, ls='--', lw=1.5)
thr_g = axes[1].axhline(thresholds["gyro"], color=CT, ls='--', lw=1.5)
thr_m = axes[2].axhline(thresholds["audio"], color=CT, ls='--', lw=1.5)
# Zones de trigger : rectangles pré-créés (un par point)
# On utilise une collection de spans pour les triggers
from matplotlib.patches import Rectangle
from matplotlib.collections import PatchCollection
# Titres dynamiques
titles = [
axes[0].set_title("", color=TEXT, fontsize=10, fontweight='bold', pad=5),
axes[1].set_title("", color=TEXT, fontsize=10, fontweight='bold', pad=5),
axes[2].set_title("", color=TEXT, fontsize=10, fontweight='bold', pad=5),
]
status_txt = fig.text(0.01, 0.97, "", color=TEXT, fontsize=9, va='top')
help_txt = fig.text(0.99, 0.97,
"Q/A Accel±0.1G W/S Gyro±10°/s E/D Audio±500 R Reset ESC Quitter",
color='#8888aa', fontsize=8, va='top', ha='right')
# Fonds de trigger (spans) — créés une fois, rendus invisibles par défaut
# On dessine juste une image de fond qu'on met à jour
trig_spans_a = [axes[0].axvspan(i, i+1, alpha=0, color=CT) for i in range(0, WINDOW_SIZE, 1)]
trig_spans_g = [axes[1].axvspan(i, i+1, alpha=0, color=CT) for i in range(0, WINDOW_SIZE, 1)]
trig_spans_m = [axes[2].axvspan(i, i+1, alpha=0, color=CT) for i in range(0, WINDOW_SIZE, 1)]
def update_spans(spans, trig_list):
"""Met à jour l'alpha des spans sans en créer de nouveaux"""
tl = list(trig_list)
for i, span in enumerate(spans):
span.set_alpha(0.25 if i < len(tl) and tl[i] else 0)
def update(frame):
# Snapshot des buffers (rapide)
a = np.array(accel_buf)
g = np.array(gyro_buf)
m = np.array(audio_buf)
# Mise à jour des données des lignes
line_a.set_ydata(a)
line_g.set_ydata(g)
line_m.set_ydata(m)
# Mise à jour des seuils
thr_a.set_ydata([thresholds["accel"], thresholds["accel"]])
thr_g.set_ydata([thresholds["gyro"], thresholds["gyro"]])
thr_m.set_ydata([thresholds["audio"], thresholds["audio"]])
# Ylim adaptatif
axes[0].set_ylim(0, max(thresholds["accel"] * 1.8, a.max() * 1.2, 2.0))
axes[1].set_ylim(0, max(thresholds["gyro"] * 1.8, g.max() * 1.2, 100))
# ylim audio basé sur le max GLOBAL jamais vu (jamais réduit automatiquement)
axes[2].set_ylim(0, max(audio_max_global * 1.2, thresholds["audio"] * 2.0, 1000))
# Triggers
update_spans(trig_spans_a, accel_trig)
update_spans(trig_spans_g, gyro_trig)
update_spans(trig_spans_m, audio_trig)
# Titres avec valeurs courantes
titles[0].set_text(
f"Accéléromètre "
f"val: {a[-1]:.2f} G "
f"seuil: {thresholds['accel']:.1f} G "
f"[Q=+0.1 A=-0.1]")
titles[1].set_text(
f"Gyroscope "
f"val: {g[-1]:.0f} °/s "
f"seuil: {thresholds['gyro']:.0f} °/s "
f"[W=+10 S=-10]")
titles[2].set_text(
f"Microphone PDM "
f"val: {m[-1]:.0f} "
f"seuil: {thresholds['audio']} "
f"[E=+500 D=-500]")
status_txt.set_text(
f"{ble_status} | 💥 Tirs détectés : {shot_count}")
return (line_a, line_g, line_m,
thr_a, thr_g, thr_m,
*titles, status_txt)
def on_key(event):
k = event.key
if k == 'q': thresholds["accel"] = round(thresholds["accel"] + 0.1, 1)
elif k == 'a': thresholds["accel"] = round(max(0.3, thresholds["accel"] - 0.1), 1)
elif k == 'w': thresholds["gyro"] = thresholds["gyro"] + 10
elif k == 's': thresholds["gyro"] = max(20, thresholds["gyro"] - 10)
elif k == 'e': thresholds["audio"] = thresholds["audio"] + 500
elif k == 'd': thresholds["audio"] = max(200, thresholds["audio"] - 500)
elif k == 'r':
global audio_max_global
audio_max_global = 1000
for b in (accel_buf, gyro_buf, audio_buf,
accel_trig, gyro_trig, audio_trig):
b.clear()
b.extend([0] * WINDOW_SIZE)
print("Courbes réinitialisées")
elif k == 'escape':
global ble_running
ble_running = False
plt.close('all')
# Affichage console
if k in ('q','a'): print(f"Accel seuil → {thresholds['accel']:.1f} G")
elif k in ('w','s'): print(f"Gyro seuil → {thresholds['gyro']:.0f} °/s")
elif k in ('e','d'): print(f"Audio seuil → {thresholds['audio']}")
def on_close(event):
global ble_running
ble_running = False
fig.canvas.mpl_connect('key_press_event', on_key)
fig.canvas.mpl_connect('close_event', on_close)
def main():
print("╔══════════════════════════════════════════════╗")
print("║ XIAO Airsoft Calibration Tool v3 ║")
print("║ Rendu optimisé — aucune latence ║")
print("╚══════════════════════════════════════════════╝")
print(f"\n🎯 Cible : '{DEVICE_NAME}'")
print("⚠️ Tapez 'D' dans le Moniteur Série Arduino")
print(" jusqu'à voir 'Debug : FULL'\n")
ble_thread = threading.Thread(target=run_ble, daemon=True)
ble_thread.start()
anim = FuncAnimation(
fig, update,
interval=80, # ~12 fps, largement suffisant
blit=False, # blit=True cause des bugs de spans
cache_frame_data=False
)
plt.show()
global ble_running
ble_running = False
if __name__ == "__main__":
main()