PS_BLE_Tracker/Python/Xiao_BLE_tools/xiao_calibration_tool.py
j.foucher f825695658 Python calibration tool: simplify graphs, fix CancelledError
- Remove orange "last shot peak" line (unreliable at 20Hz)
- Remove rolling max curve (not useful in practice)
- Add specific asyncio.CancelledError catch in ble_loop with retry
  message when device is already connected elsewhere
- Clean up titles: show only current val + threshold

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 10:15:51 +01:00

404 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
XIAO Airsoft Calibration Tool v3
- Rendu optimisé : set_data() sans redraw complet
- Fenêtre glissante fixe (pas de grossissement des buffers)
- Aucune latence même après des heures
- 4ème courbe : timeline des tirs détectés
"""
import asyncio
import struct
import threading
from collections import deque
from bleak import BleakClient, BleakScanner
DEVICE_NAME = "XIAO Airsoft Pro"
DEVICE_ADDRESS = "46:35:F1:51:51:5A" # Adresse MAC directe — plus fiable que le scan par nom
DEBUG_CHAR_UUID = "6E400005-B5A3-F393-E0A9-E50E24DCCA9E"
SHOT_CHAR_UUID = "6E400004-B5A3-F393-E0A9-E50E24DCCA9E"
CONFIG_CHAR_UUID = "6E400006-B5A3-F393-E0A9-E50E24DCCA9E"
# Fenêtre fixe : WINDOW_SIZE points affichés, jamais plus
WINDOW_SIZE = 200 # ~10s à 20Hz — ajustez si besoin
# Buffers circulaires de taille fixe
accel_buf = deque([0.0] * WINDOW_SIZE, maxlen=WINDOW_SIZE)
gyro_buf = deque([0.0] * WINDOW_SIZE, maxlen=WINDOW_SIZE)
audio_buf = deque([0] * WINDOW_SIZE, maxlen=WINDOW_SIZE)
accel_trig = deque([False] * WINDOW_SIZE, maxlen=WINDOW_SIZE)
gyro_trig = deque([False] * WINDOW_SIZE, maxlen=WINDOW_SIZE)
audio_trig = deque([False] * WINDOW_SIZE, maxlen=WINDOW_SIZE)
shot_buf = deque([0.0] * WINDOW_SIZE, maxlen=WINDOW_SIZE) # 1.0 au moment d'un tir
thresholds = {"accel": 2.5, "gyro": 200.0, "audio": 3000} # PDM : 0-32767
min_sensors = 3 # Nb de capteurs requis simultanément (1-3) — NUM6=+1 NUM4=-1
shot_count = 0
shot_pending = False # Flag : un tir reçu, à enregistrer dans shot_buf au prochain debug tick
# Pic max au moment du dernier tir (mis à jour au moment où shot_pending devient True)
last_shot_peak = {"accel": 0.0, "gyro": 0.0, "audio": 0}
ble_status = "[..] Connexion..."
ble_running = True
audio_max_global = 1000 # Tracks le max absolu jamais vu
ble_client = None # Référence au client BLE actif
config_pending = False # Flag : config à envoyer au prochain cycle
debug_mode = 3 # 0=OFF, 1=RAW, 2=TRIGGERS, 3=FULL (actif par défaut)
import numpy as np
X = np.arange(WINDOW_SIZE) # axe X fixe, ne change jamais
# Fenêtre glissante pour afficher le pic max récent dans le titre (informatif seulement)
PEAK_WINDOW = 20 # ~1s à 20Hz
# ─── BLE ────────────────────────────────────────────────
def debug_callback(sender, data):
global audio_max_global, shot_pending
if len(data) < 14:
return
accel_buf.append( struct.unpack('<f', data[1:5])[0] )
gyro_buf.append( struct.unpack('<f', data[5:9])[0] )
val = struct.unpack('<H', data[9:11])[0]
audio_buf.append(val)
if val > audio_max_global:
audio_max_global = val
accel_trig.append(bool(data[11]))
gyro_trig.append( bool(data[12]))
audio_trig.append(bool(data[13]))
# Enregistrer le tir dans le buffer synchronisé (1.0 si tir reçu depuis le dernier tick)
shot_buf.append(1.0 if shot_pending else 0.0)
shot_pending = False
def shot_callback(sender, data):
global shot_count, shot_pending, last_shot_peak
if data[0] == 1:
shot_count += 1
shot_pending = True
# Capturer le pic courant au moment exact du tir (meilleure approximation possible en BLE)
last_shot_peak["accel"] = accel_buf[-1] if accel_buf else 0.0
last_shot_peak["gyro"] = gyro_buf[-1] if gyro_buf else 0.0
last_shot_peak["audio"] = audio_buf[-1] if audio_buf else 0
async def find_device():
"""Scan par nom (toutes les 0.5s pendant 30s max).
Si DEVICE_ADDRESS est renseignée, connexion directe sans scan.
"""
if DEVICE_ADDRESS:
return DEVICE_ADDRESS # connexion directe — BleakClient accepte une string
found = None
def cb(device, adv):
nonlocal found
if device.name and DEVICE_NAME.lower() in device.name.lower():
found = device
scanner = BleakScanner(cb)
await scanner.start()
for _ in range(60): # 30s max (60 × 0.5s)
if found: break
await asyncio.sleep(0.5)
await scanner.stop()
return found
DEBUG_BYTE_OFFSET = 28 # Position fixe dans le payload 32 bytes — après toute la struct
def build_config_payload(include_debug=False):
"""Construit le payload ShotConfig (32 bytes) à envoyer au XIAO.
Le byte debug_mode est placé à l'offset fixe DEBUG_BYTE_OFFSET=28,
bien après la fin de ShotConfig quelle que soit la taille réelle en C++.
0xFF = ne pas changer le mode debug actuel du XIAO.
"""
# struct ShotConfig : float, float, uint16, uint16, uint16, uint16, uint8, uint16, bool, bool, bool
payload = struct.pack('<ffHHHHBHBBB',
thresholds["accel"], # accelThreshold (float)
thresholds["gyro"], # gyroThreshold (float)
int(thresholds["audio"]), # audioThreshold (uint16)
60, # accelWindow (uint16) ms — >DEBUG_RATE 50ms
60, # gyroWindow (uint16) ms — >DEBUG_RATE 50ms
60, # audioWindow (uint16) ms — >DEBUG_RATE 50ms
min_sensors, # minSensors (uint8) — 1, 2 ou 3
80, # shotCooldown (uint16) ms
1, # useAccel (bool)
1, # useGyro (bool)
1, # useAudio (bool)
)
# Padder à 32 bytes avec 0xFF (= "ne pas changer")
buf = bytearray(payload + b'\xff' * (32 - len(payload)))
# Placer le debug_mode à l'offset fixe 28
if include_debug:
buf[DEBUG_BYTE_OFFSET] = debug_mode & 0xFF
return bytes(buf)
DEBUG_MODE_NAMES = ["OFF", "RAW", "TRIGGERS", "FULL"]
async def ble_loop():
global ble_status, ble_running, ble_client, config_pending
while ble_running:
try:
ble_status = f"[?] Recherche '{DEVICE_NAME}'..."
device = await find_device()
if not device:
ble_status = "[!] XIAO non trouve — reessai..."
await asyncio.sleep(5)
continue
# device peut être une string (adresse directe) ou un BLEDevice
if isinstance(device, str):
addr = device
display_name = DEVICE_NAME
else:
addr = device.address
display_name = device.name or DEVICE_NAME
ble_status = f"[..] Connexion {display_name}..."
async with BleakClient(addr, timeout=15.0) as client:
ble_client = client
await client.start_notify(DEBUG_CHAR_UUID, debug_callback)
await client.start_notify(SHOT_CHAR_UUID, shot_callback)
ble_status = f"[OK] {display_name} ({addr})"
# À la connexion : lire la config sauvegardée dans le XIAO (flash)
try:
raw = await client.read_gatt_char(CONFIG_CHAR_UUID)
if len(raw) >= 22:
vals = struct.unpack_from('<ffH', raw, 0)
thresholds["accel"] = round(vals[0], 1)
thresholds["gyro"] = round(vals[1], 1)
thresholds["audio"] = int(vals[2])
print(f"📥 Config lue depuis XIAO → Accel:{thresholds['accel']:.1f}G Gyro:{thresholds['gyro']:.0f}°/s Audio:{thresholds['audio']}")
except Exception as ex:
print(f"⚠️ Lecture config XIAO impossible: {ex}")
# Envoyer debug FULL (sans écraser les seuils du XIAO)
try:
payload = build_config_payload(include_debug=True)
await client.write_gatt_char(CONFIG_CHAR_UUID, payload, response=True)
print(f"📤 Config initiale + Debug={DEBUG_MODE_NAMES[debug_mode]} envoyés")
except Exception as ex:
print(f"⚠️ Erreur envoi config initiale: {ex}")
while client.is_connected and ble_running:
if config_pending:
try:
payload = build_config_payload(include_debug=True)
await client.write_gatt_char(CONFIG_CHAR_UUID, payload, response=True)
print(f"📤 Config → Accel:{thresholds['accel']:.1f}G Gyro:{thresholds['gyro']:.0f}°/s Audio:{thresholds['audio']} Debug:{DEBUG_MODE_NAMES[debug_mode]}")
except Exception as ex:
print(f"⚠️ Erreur envoi config: {ex}")
finally:
config_pending = False
await asyncio.sleep(0.1)
ble_client = None
ble_status = "[X] Deconnecte — reconnexion..."
except asyncio.CancelledError:
ble_client = None
ble_status = "[!] Connexion annulee (device deja connecte?) — reessai..."
await asyncio.sleep(3)
except Exception as e:
ble_client = None
ble_status = f"[X] {str(e)[:50]}"
await asyncio.sleep(5)
def run_ble():
asyncio.run(ble_loop())
# ─── MATPLOTLIB optimisé ────────────────────────────────
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from matplotlib.animation import FuncAnimation
BG = '#1a1a2e'
PANEL = '#16213e'
TEXT = '#e0e0e0'
CA = '#4fc3f7' # accel brut
CG = '#81c784' # gyro brut
CM = '#ce93d8' # audio brut
CT = '#ef5350' # trigger / seuil
CS = '#ff9800' # shot (orange vif)
GRID = '#2a2a4a'
fig = plt.figure(figsize=(13, 10), facecolor=BG)
# 4 lignes : 3 capteurs (hauteur 3) + 1 timeline tirs (hauteur 1)
gs = gridspec.GridSpec(4, 1, hspace=0.5, top=0.92, bottom=0.05,
height_ratios=[3, 3, 3, 1])
axes = [fig.add_subplot(gs[i]) for i in range(4)]
for ax in axes:
ax.set_facecolor(PANEL)
ax.tick_params(colors=TEXT, labelsize=8)
for sp in ax.spines.values():
sp.set_edgecolor('#444466')
ax.grid(True, alpha=0.2, color=GRID)
ax.set_xlim(0, WINDOW_SIZE - 1)
axes[0].set_ylabel("G", color=TEXT, fontsize=9)
axes[1].set_ylabel("°/s", color=TEXT, fontsize=9)
axes[2].set_ylabel("Niveau",color=TEXT, fontsize=9)
axes[3].set_ylabel("Tirs", color=CS, fontsize=9)
axes[3].set_xlabel("Echantillons (fenetre glissante)", color=TEXT, fontsize=8)
axes[3].set_ylim(-0.1, 1.5)
axes[3].set_yticks([]) # pas de graduations Y sur la timeline
# Créer les artistes UNE SEULE FOIS — on ne les recrée jamais
line_a, = axes[0].plot(X, list(accel_buf), color=CA, lw=1.5, label='Valeur brute')
line_g, = axes[1].plot(X, list(gyro_buf), color=CG, lw=1.5, label='Valeur brute')
line_m, = axes[2].plot(X, list(audio_buf), color=CM, lw=1.5, label='Valeur brute')
line_s, = axes[3].plot(X, list(shot_buf), color=CS, lw=0, marker='|',
markersize=20, markeredgewidth=2.5)
thr_a = axes[0].axhline(thresholds["accel"], color=CT, ls='--', lw=1.5, label='Seuil (trigger XIAO)')
thr_g = axes[1].axhline(thresholds["gyro"], color=CT, ls='--', lw=1.5, label='Seuil (trigger XIAO)')
thr_m = axes[2].axhline(thresholds["audio"], color=CT, ls='--', lw=1.5, label='Seuil (trigger XIAO)')
# Légendes
for ax in axes[:3]:
lines = ax.get_lines()
ax.legend(loc='upper left', fontsize=7, facecolor=PANEL, edgecolor='#444466',
labelcolor=TEXT, framealpha=0.8,
handles=[lines[0],
plt.Rectangle((0,0),1,1, fc=CT, alpha=0.25, label='Trigger actif (XIAO)')])
from matplotlib.patches import Rectangle
from matplotlib.collections import PatchCollection
# Titres dynamiques
titles = [
axes[0].set_title("", color=TEXT, fontsize=10, fontweight='bold', pad=5),
axes[1].set_title("", color=TEXT, fontsize=10, fontweight='bold', pad=5),
axes[2].set_title("", color=TEXT, fontsize=10, fontweight='bold', pad=5),
axes[3].set_title("", color=CS, fontsize=10, fontweight='bold', pad=5),
]
status_txt = fig.text(0.01, 0.97, "", color=TEXT, fontsize=9, va='top')
debug_txt = fig.text(0.01, 0.945, "", color='#ffcc44', fontsize=8, va='top')
help_txt = fig.text(0.99, 0.97,
"NUM7/1 Accel+-0.1G NUM8/2 Gyro+-10dps NUM9/3 Audio+-500 NUM6/4 MinSensors+-1 NUM0 Debug NUM5 Reset ESC Quitter",
color='#8888aa', fontsize=8, va='top', ha='right')
# Fonds de trigger (spans) — créés une fois, rendus invisibles par défaut
trig_spans_a = [axes[0].axvspan(i, i+1, alpha=0, color=CT) for i in range(0, WINDOW_SIZE, 1)]
trig_spans_g = [axes[1].axvspan(i, i+1, alpha=0, color=CT) for i in range(0, WINDOW_SIZE, 1)]
trig_spans_m = [axes[2].axvspan(i, i+1, alpha=0, color=CT) for i in range(0, WINDOW_SIZE, 1)]
# Spans de tir sur tous les graphiques (ligne verticale orange traversant tout)
shot_spans = [axes[3].axvspan(i, i+1, alpha=0, color=CS) for i in range(0, WINDOW_SIZE, 1)]
def update_spans(spans, trig_list):
"""Met à jour l'alpha des spans sans en créer de nouveaux"""
tl = list(trig_list)
for i, span in enumerate(spans):
span.set_alpha(0.25 if i < len(tl) and tl[i] else 0)
def update(frame):
# Snapshot des buffers (rapide)
a = np.array(accel_buf)
g = np.array(gyro_buf)
m = np.array(audio_buf, dtype=float)
s = np.array(shot_buf)
# Mise à jour des données des lignes brutes
line_a.set_ydata(a)
line_g.set_ydata(g)
line_m.set_ydata(m)
line_s.set_ydata(s)
# Mise à jour des seuils
thr_a.set_ydata([thresholds["accel"], thresholds["accel"]])
thr_g.set_ydata([thresholds["gyro"], thresholds["gyro"]])
thr_m.set_ydata([thresholds["audio"], thresholds["audio"]])
# Ylim adaptatif
axes[0].set_ylim(0, max(thresholds["accel"] * 1.8, a.max() * 1.2, 2.0))
axes[1].set_ylim(0, max(thresholds["gyro"] * 1.8, g.max() * 1.2, 100))
axes[2].set_ylim(0, max(audio_max_global * 1.2, thresholds["audio"] * 2.0, 1000))
# Triggers capteurs
update_spans(trig_spans_a, accel_trig)
update_spans(trig_spans_g, gyro_trig)
update_spans(trig_spans_m, audio_trig)
# Spans tirs (orange plein sur la timeline)
sl = list(shot_buf)
for i, span in enumerate(shot_spans):
span.set_alpha(0.85 if i < len(sl) and sl[i] > 0 else 0)
# Titres avec valeurs courantes
titles[0].set_text(f"Accelerometre val: {a[-1]:.2f} G seuil: {thresholds['accel']:.1f} G [NUM7=+0.1 NUM1=-0.1]")
titles[1].set_text(f"Gyroscope val: {g[-1]:.0f} dps seuil: {thresholds['gyro']:.0f} dps [NUM8=+10 NUM2=-10]")
titles[2].set_text(f"Microphone PDM val: {m[-1]:.0f} seuil: {thresholds['audio']} [NUM9=+500 NUM3=-500]")
titles[3].set_text(f"Tirs detectes : {shot_count} (fenetre glissante)")
status_txt.set_text(
f"{ble_status} | Tirs total : {shot_count}")
debug_txt.set_text(
f"Debug XIAO : {DEBUG_MODE_NAMES[debug_mode]} [NUM0 = cycle OFF->RAW->TRIGGERS->FULL] | "
f"MinSensors : {min_sensors}/3 [NUM6=+1 NUM4=-1]")
return (line_a, line_g, line_m, line_s,
thr_a, thr_g, thr_m,
*titles, status_txt, debug_txt)
def on_key(event):
global audio_max_global, ble_running, config_pending, debug_mode, min_sensors
k = event.key
changed = False
# Pavé numérique (Num Lock ON : '7','1'... / Num Lock OFF : 'num7','num1'...)
if k in ('7', 'num7'): thresholds["accel"] = round(thresholds["accel"] + 0.1, 1); changed = True
elif k in ('1', 'num1'): thresholds["accel"] = round(max(0.3, thresholds["accel"] - 0.1), 1); changed = True
elif k in ('8', 'num8'): thresholds["gyro"] = thresholds["gyro"] + 10; changed = True
elif k in ('2', 'num2'): thresholds["gyro"] = max(20, thresholds["gyro"] - 10); changed = True
elif k in ('9', 'num9'): thresholds["audio"] = thresholds["audio"] + 500; changed = True
elif k in ('3', 'num3'): thresholds["audio"] = max(200, thresholds["audio"] - 500); changed = True
elif k in ('6', 'num6'): min_sensors = min(3, min_sensors + 1); print(f"MinSensors → {min_sensors}"); changed = True
elif k in ('4', 'num4'): min_sensors = max(1, min_sensors - 1); print(f"MinSensors → {min_sensors}"); changed = True
elif k in ('0', 'num0'):
debug_mode = (debug_mode + 1) % 4 # cycle OFF→RAW→TRIGGERS→FULL
print(f"Debug XIAO → {DEBUG_MODE_NAMES[debug_mode]}")
changed = True # déclenche l'envoi du nouveau mode au XIAO
elif k in ('5', 'num5'):
audio_max_global = 1000
for b in (accel_buf, gyro_buf, audio_buf, shot_buf,
accel_trig, gyro_trig, audio_trig):
b.clear()
b.extend([0] * WINDOW_SIZE)
print("Courbes reinitialisees")
elif k == 'escape':
ble_running = False
plt.close('all')
# Envoi de la config (+ debug mode) au XIAO si connecté
if changed:
config_pending = True
# Affichage console
if k in ('7','1','num7','num1'): print(f"Accel seuil → {thresholds['accel']:.1f} G")
elif k in ('8','2','num8','num2'): print(f"Gyro seuil → {thresholds['gyro']:.0f} °/s")
elif k in ('9','3','num9','num3'): print(f"Audio seuil → {thresholds['audio']}")
def on_close(event):
global ble_running
ble_running = False
fig.canvas.mpl_connect('key_press_event', on_key)
fig.canvas.mpl_connect('close_event', on_close)
def main():
print("╔══════════════════════════════════════════════╗")
print("║ XIAO Airsoft Calibration Tool v3 ║")
print("║ Rendu optimisé — aucune latence ║")
print("╚══════════════════════════════════════════════╝")
print(f"\n🎯 Cible : '{DEVICE_NAME}'")
print("⚠️ Tapez 'D' dans le Moniteur Série Arduino")
print(" jusqu'à voir 'Debug : FULL'\n")
ble_thread = threading.Thread(target=run_ble, daemon=True)
ble_thread.start()
anim = FuncAnimation(
fig, update,
interval=80, # ~12 fps, largement suffisant
blit=False, # blit=True cause des bugs de spans
cache_frame_data=False
)
plt.show()
global ble_running
ble_running = False
if __name__ == "__main__":
main()