""" XIAO Airsoft Calibration Tool v3 - Rendu optimisé : set_data() sans redraw complet - Fenêtre glissante fixe (pas de grossissement des buffers) - Aucune latence même après des heures - 4ème courbe : timeline des tirs détectés """ import asyncio import struct import threading from collections import deque from bleak import BleakClient, BleakScanner DEVICE_NAME = "XIAO Airsoft Pro" DEVICE_ADDRESS = "" # Laisser vide pour scan automatique par nom, ou mettre l'adresse MAC pour connexion directe DEBUG_CHAR_UUID = "6E400005-B5A3-F393-E0A9-E50E24DCCA9E" SHOT_CHAR_UUID = "6E400004-B5A3-F393-E0A9-E50E24DCCA9E" CONFIG_CHAR_UUID = "6E400006-B5A3-F393-E0A9-E50E24DCCA9E" # Fenêtre fixe : WINDOW_SIZE points affichés, jamais plus WINDOW_SIZE = 200 # ~10s à 20Hz — ajustez si besoin # Buffers circulaires de taille fixe accel_buf = deque([0.0] * WINDOW_SIZE, maxlen=WINDOW_SIZE) gyro_buf = deque([0.0] * WINDOW_SIZE, maxlen=WINDOW_SIZE) audio_buf = deque([0] * WINDOW_SIZE, maxlen=WINDOW_SIZE) accel_trig = deque([False] * WINDOW_SIZE, maxlen=WINDOW_SIZE) gyro_trig = deque([False] * WINDOW_SIZE, maxlen=WINDOW_SIZE) audio_trig = deque([False] * WINDOW_SIZE, maxlen=WINDOW_SIZE) shot_buf = deque([0.0] * WINDOW_SIZE, maxlen=WINDOW_SIZE) # 1.0 au moment d'un tir thresholds = {"accel": 2.5, "gyro": 200.0, "audio": 3000} # PDM : 0-32767 min_sensors = 3 # Nb de capteurs requis simultanément (1-3) — NUM6=+1 NUM4=-1 shot_count = 0 shot_pending = False # Flag : un tir reçu, à enregistrer dans shot_buf au prochain debug tick ble_status = "🔍 Connexion..." ble_running = True audio_max_global = 1000 # Tracks le max absolu jamais vu ble_client = None # Référence au client BLE actif config_pending = False # Flag : config à envoyer au prochain cycle debug_mode = 3 # 0=OFF, 1=RAW, 2=TRIGGERS, 3=FULL (actif par défaut) import numpy as np X = np.arange(WINDOW_SIZE) # axe X fixe, ne change jamais # ─── BLE ──────────────────────────────────────────────── def debug_callback(sender, data): global audio_max_global, shot_pending if len(data) < 14: return accel_buf.append( struct.unpack(' audio_max_global: audio_max_global = val accel_trig.append(bool(data[11])) gyro_trig.append( bool(data[12])) audio_trig.append(bool(data[13])) # Enregistrer le tir dans le buffer synchronisé (1.0 si tir reçu depuis le dernier tick) shot_buf.append(1.0 if shot_pending else 0.0) shot_pending = False def shot_callback(sender, data): global shot_count, shot_pending if data[0] == 1: shot_count += 1 shot_pending = True async def find_device(): """Scan par nom (toutes les 0.5s pendant 30s max). Si DEVICE_ADDRESS est renseignée, connexion directe sans scan. """ if DEVICE_ADDRESS: return DEVICE_ADDRESS # connexion directe — BleakClient accepte une string found = None def cb(device, adv): nonlocal found if device.name and DEVICE_NAME.lower() in device.name.lower(): found = device scanner = BleakScanner(cb) await scanner.start() for _ in range(60): # 30s max (60 × 0.5s) if found: break await asyncio.sleep(0.5) await scanner.stop() return found DEBUG_BYTE_OFFSET = 28 # Position fixe dans le payload 32 bytes — après toute la struct def build_config_payload(include_debug=False): """Construit le payload ShotConfig (32 bytes) à envoyer au XIAO. Le byte debug_mode est placé à l'offset fixe DEBUG_BYTE_OFFSET=28, bien après la fin de ShotConfig quelle que soit la taille réelle en C++. 0xFF = ne pas changer le mode debug actuel du XIAO. """ # struct ShotConfig : float, float, uint16, uint16, uint16, uint16, uint8, uint16, bool, bool, bool payload = struct.pack('DEBUG_RATE 50ms 60, # gyroWindow (uint16) ms — >DEBUG_RATE 50ms 60, # audioWindow (uint16) ms — >DEBUG_RATE 50ms min_sensors, # minSensors (uint8) — 1, 2 ou 3 80, # shotCooldown (uint16) ms 1, # useAccel (bool) 1, # useGyro (bool) 1, # useAudio (bool) ) # Padder à 32 bytes avec 0xFF (= "ne pas changer") buf = bytearray(payload + b'\xff' * (32 - len(payload))) # Placer le debug_mode à l'offset fixe 28 if include_debug: buf[DEBUG_BYTE_OFFSET] = debug_mode & 0xFF return bytes(buf) DEBUG_MODE_NAMES = ["OFF", "RAW", "TRIGGERS", "FULL"] async def ble_loop(): global ble_status, ble_running, ble_client, config_pending while ble_running: try: ble_status = f"🔍 Recherche '{DEVICE_NAME}'..." device = await find_device() if not device: ble_status = "⚠️ XIAO non trouvé — réessai..." await asyncio.sleep(5) continue # device peut être une string (adresse directe) ou un BLEDevice if isinstance(device, str): addr = device display_name = DEVICE_NAME else: addr = device.address display_name = device.name or DEVICE_NAME ble_status = f"📡 Connexion {display_name}..." async with BleakClient(addr, timeout=15.0) as client: ble_client = client await client.start_notify(DEBUG_CHAR_UUID, debug_callback) await client.start_notify(SHOT_CHAR_UUID, shot_callback) ble_status = f"✅ {display_name} ({addr})" # À la connexion : envoyer config + activer debug FULL automatiquement try: payload = build_config_payload(include_debug=True) await client.write_gatt_char(CONFIG_CHAR_UUID, payload, response=True) print(f"📤 Config initiale + Debug={DEBUG_MODE_NAMES[debug_mode]} envoyés") except Exception as ex: print(f"⚠️ Erreur envoi config initiale: {ex}") while client.is_connected and ble_running: if config_pending: try: payload = build_config_payload(include_debug=True) await client.write_gatt_char(CONFIG_CHAR_UUID, payload, response=True) print(f"📤 Config → Accel:{thresholds['accel']:.1f}G Gyro:{thresholds['gyro']:.0f}°/s Audio:{thresholds['audio']} Debug:{DEBUG_MODE_NAMES[debug_mode]}") except Exception as ex: print(f"⚠️ Erreur envoi config: {ex}") finally: config_pending = False await asyncio.sleep(0.1) ble_client = None ble_status = "❌ Déconnecté — reconnexion..." except Exception as e: ble_client = None ble_status = f"❌ {str(e)[:50]}" await asyncio.sleep(5) def run_ble(): asyncio.run(ble_loop()) # ─── MATPLOTLIB optimisé ──────────────────────────────── import matplotlib matplotlib.use('TkAgg') import matplotlib.pyplot as plt import matplotlib.gridspec as gridspec from matplotlib.animation import FuncAnimation BG = '#1a1a2e' PANEL = '#16213e' TEXT = '#e0e0e0' CA = '#4fc3f7' # accel CG = '#81c784' # gyro CM = '#ce93d8' # audio CT = '#ef5350' # trigger / seuil CS = '#ff9800' # shot (orange vif) GRID = '#2a2a4a' fig = plt.figure(figsize=(13, 10), facecolor=BG) # 4 lignes : 3 capteurs (hauteur 3) + 1 timeline tirs (hauteur 1) gs = gridspec.GridSpec(4, 1, hspace=0.5, top=0.92, bottom=0.05, height_ratios=[3, 3, 3, 1]) axes = [fig.add_subplot(gs[i]) for i in range(4)] for ax in axes: ax.set_facecolor(PANEL) ax.tick_params(colors=TEXT, labelsize=8) for sp in ax.spines.values(): sp.set_edgecolor('#444466') ax.grid(True, alpha=0.2, color=GRID) ax.set_xlim(0, WINDOW_SIZE - 1) axes[0].set_ylabel("G", color=TEXT, fontsize=9) axes[1].set_ylabel("°/s", color=TEXT, fontsize=9) axes[2].set_ylabel("Niveau",color=TEXT, fontsize=9) axes[3].set_ylabel("Tirs", color=CS, fontsize=9) axes[3].set_xlabel("Echantillons (fenetre glissante)", color=TEXT, fontsize=8) axes[3].set_ylim(-0.1, 1.5) axes[3].set_yticks([]) # pas de graduations Y sur la timeline # Créer les artistes UNE SEULE FOIS — on ne les recrée jamais line_a, = axes[0].plot(X, list(accel_buf), color=CA, lw=1.5) line_g, = axes[1].plot(X, list(gyro_buf), color=CG, lw=1.5) line_m, = axes[2].plot(X, list(audio_buf), color=CM, lw=1.5) line_s, = axes[3].plot(X, list(shot_buf), color=CS, lw=0, marker='|', markersize=20, markeredgewidth=2.5) # barres verticales thr_a = axes[0].axhline(thresholds["accel"], color=CT, ls='--', lw=1.5) thr_g = axes[1].axhline(thresholds["gyro"], color=CT, ls='--', lw=1.5) thr_m = axes[2].axhline(thresholds["audio"], color=CT, ls='--', lw=1.5) from matplotlib.patches import Rectangle from matplotlib.collections import PatchCollection # Titres dynamiques titles = [ axes[0].set_title("", color=TEXT, fontsize=10, fontweight='bold', pad=5), axes[1].set_title("", color=TEXT, fontsize=10, fontweight='bold', pad=5), axes[2].set_title("", color=TEXT, fontsize=10, fontweight='bold', pad=5), axes[3].set_title("", color=CS, fontsize=10, fontweight='bold', pad=5), ] status_txt = fig.text(0.01, 0.97, "", color=TEXT, fontsize=9, va='top') debug_txt = fig.text(0.01, 0.945, "", color='#ffcc44', fontsize=8, va='top') help_txt = fig.text(0.99, 0.97, "NUM7/1 Accel+-0.1G NUM8/2 Gyro+-10dps NUM9/3 Audio+-500 NUM6/4 MinSensors+-1 NUM0 Debug NUM5 Reset ESC Quitter", color='#8888aa', fontsize=8, va='top', ha='right') # Fonds de trigger (spans) — créés une fois, rendus invisibles par défaut trig_spans_a = [axes[0].axvspan(i, i+1, alpha=0, color=CT) for i in range(0, WINDOW_SIZE, 1)] trig_spans_g = [axes[1].axvspan(i, i+1, alpha=0, color=CT) for i in range(0, WINDOW_SIZE, 1)] trig_spans_m = [axes[2].axvspan(i, i+1, alpha=0, color=CT) for i in range(0, WINDOW_SIZE, 1)] # Spans de tir sur tous les graphiques (ligne verticale orange traversant tout) shot_spans = [axes[3].axvspan(i, i+1, alpha=0, color=CS) for i in range(0, WINDOW_SIZE, 1)] def update_spans(spans, trig_list): """Met à jour l'alpha des spans sans en créer de nouveaux""" tl = list(trig_list) for i, span in enumerate(spans): span.set_alpha(0.25 if i < len(tl) and tl[i] else 0) def update(frame): # Snapshot des buffers (rapide) a = np.array(accel_buf) g = np.array(gyro_buf) m = np.array(audio_buf) s = np.array(shot_buf) # Mise à jour des données des lignes line_a.set_ydata(a) line_g.set_ydata(g) line_m.set_ydata(m) line_s.set_ydata(s) # Mise à jour des seuils thr_a.set_ydata([thresholds["accel"], thresholds["accel"]]) thr_g.set_ydata([thresholds["gyro"], thresholds["gyro"]]) thr_m.set_ydata([thresholds["audio"], thresholds["audio"]]) # Ylim adaptatif axes[0].set_ylim(0, max(thresholds["accel"] * 1.8, a.max() * 1.2, 2.0)) axes[1].set_ylim(0, max(thresholds["gyro"] * 1.8, g.max() * 1.2, 100)) axes[2].set_ylim(0, max(audio_max_global * 1.2, thresholds["audio"] * 2.0, 1000)) # Triggers capteurs update_spans(trig_spans_a, accel_trig) update_spans(trig_spans_g, gyro_trig) update_spans(trig_spans_m, audio_trig) # Spans tirs (orange plein sur la timeline) sl = list(shot_buf) for i, span in enumerate(shot_spans): span.set_alpha(0.85 if i < len(sl) and sl[i] > 0 else 0) # Titres avec valeurs courantes titles[0].set_text( f"Accelerometre " f"val: {a[-1]:.2f} G " f"seuil: {thresholds['accel']:.1f} G " f"[NUM7=+0.1 NUM1=-0.1]") titles[1].set_text( f"Gyroscope " f"val: {g[-1]:.0f} dps " f"seuil: {thresholds['gyro']:.0f} dps " f"[NUM8=+10 NUM2=-10]") titles[2].set_text( f"Microphone PDM " f"val: {m[-1]:.0f} " f"seuil: {thresholds['audio']} " f"[NUM9=+500 NUM3=-500]") titles[3].set_text(f"Tirs detectes : {shot_count} (fenetre glissante)") status_txt.set_text( f"{ble_status} | Tirs total : {shot_count}") debug_txt.set_text( f"Debug XIAO : {DEBUG_MODE_NAMES[debug_mode]} [NUM0 = cycle OFF->RAW->TRIGGERS->FULL] | " f"MinSensors : {min_sensors}/3 [NUM6=+1 NUM4=-1]") return (line_a, line_g, line_m, line_s, thr_a, thr_g, thr_m, *titles, status_txt, debug_txt) def on_key(event): global audio_max_global, ble_running, config_pending, debug_mode, min_sensors k = event.key changed = False # Pavé numérique (Num Lock ON : '7','1'... / Num Lock OFF : 'num7','num1'...) if k in ('7', 'num7'): thresholds["accel"] = round(thresholds["accel"] + 0.1, 1); changed = True elif k in ('1', 'num1'): thresholds["accel"] = round(max(0.3, thresholds["accel"] - 0.1), 1); changed = True elif k in ('8', 'num8'): thresholds["gyro"] = thresholds["gyro"] + 10; changed = True elif k in ('2', 'num2'): thresholds["gyro"] = max(20, thresholds["gyro"] - 10); changed = True elif k in ('9', 'num9'): thresholds["audio"] = thresholds["audio"] + 500; changed = True elif k in ('3', 'num3'): thresholds["audio"] = max(200, thresholds["audio"] - 500); changed = True elif k in ('6', 'num6'): min_sensors = min(3, min_sensors + 1); print(f"MinSensors → {min_sensors}"); changed = True elif k in ('4', 'num4'): min_sensors = max(1, min_sensors - 1); print(f"MinSensors → {min_sensors}"); changed = True elif k in ('0', 'num0'): debug_mode = (debug_mode + 1) % 4 # cycle OFF→RAW→TRIGGERS→FULL print(f"Debug XIAO → {DEBUG_MODE_NAMES[debug_mode]}") changed = True # déclenche l'envoi du nouveau mode au XIAO elif k in ('5', 'num5'): audio_max_global = 1000 for b in (accel_buf, gyro_buf, audio_buf, shot_buf, accel_trig, gyro_trig, audio_trig): b.clear() b.extend([0] * WINDOW_SIZE) print("Courbes reinitialisees") elif k == 'escape': ble_running = False plt.close('all') # Envoi de la config (+ debug mode) au XIAO si connecté if changed: config_pending = True # Affichage console if k in ('7','1','num7','num1'): print(f"Accel seuil → {thresholds['accel']:.1f} G") elif k in ('8','2','num8','num2'): print(f"Gyro seuil → {thresholds['gyro']:.0f} °/s") elif k in ('9','3','num9','num3'): print(f"Audio seuil → {thresholds['audio']}") def on_close(event): global ble_running ble_running = False fig.canvas.mpl_connect('key_press_event', on_key) fig.canvas.mpl_connect('close_event', on_close) def main(): print("╔══════════════════════════════════════════════╗") print("║ XIAO Airsoft Calibration Tool v3 ║") print("║ Rendu optimisé — aucune latence ║") print("╚══════════════════════════════════════════════╝") print(f"\n🎯 Cible : '{DEVICE_NAME}'") print("⚠️ Tapez 'D' dans le Moniteur Série Arduino") print(" jusqu'à voir 'Debug : FULL'\n") ble_thread = threading.Thread(target=run_ble, daemon=True) ble_thread.start() anim = FuncAnimation( fig, update, interval=80, # ~12 fps, largement suffisant blit=False, # blit=True cause des bugs de spans cache_frame_data=False ) plt.show() global ble_running ble_running = False if __name__ == "__main__": main()