""" Anti-Recoil Parameter Optimizer ================================ Reads CSV files recorded by the EBBarrel CSV recording feature and finds optimal parameters for the Adaptive Extrapolation mode. Usage: python analyze_antirecoil.py [csv_file2 ...] [options] Options: --plot Generate comparison plots (requires matplotlib) --grid Use grid search instead of differential evolution --strategy Multi-file aggregation: mean (default), worst_case --max-iter Max optimizer iterations (default: 200) The script: 1. Loads per-frame data (real position/aim vs predicted position/aim) 2. Simulates adaptive extrapolation offline (matching C++ exactly) 3. Optimizes all 4 parameters: Sensitivity, DeadZone, MinSpeed, Damping 4. Reports recommended parameters with per-file breakdown """ import csv import sys import math import os import argparse from dataclasses import dataclass from typing import List, Tuple, Optional @dataclass class Frame: timestamp: float real_pos: Tuple[float, float, float] real_aim: Tuple[float, float, float] pred_pos: Tuple[float, float, float] pred_aim: Tuple[float, float, float] safe_count: int buffer_count: int extrap_time: float shot_fired: bool = False @dataclass class AdaptiveParams: sensitivity: float = 3.0 dead_zone: float = 0.95 min_speed: float = 0.0 damping: float = 5.0 buffer_time_ms: float = 200.0 discard_time_ms: float = 30.0 @dataclass class ScoreResult: pos_mean: float pos_p95: float aim_mean: float aim_p95: float jitter: float overshoot: float score: float def load_csv(path: str) -> List[Frame]: frames = [] with open(path, 'r') as f: reader = csv.DictReader(f) has_shot_col = False for row in reader: if not has_shot_col and 'ShotFired' in row: has_shot_col = True frames.append(Frame( timestamp=float(row['Timestamp']), real_pos=(float(row['RealPosX']), float(row['RealPosY']), float(row['RealPosZ'])), real_aim=(float(row['RealAimX']), float(row['RealAimY']), float(row['RealAimZ'])), pred_pos=(float(row['PredPosX']), float(row['PredPosY']), float(row['PredPosZ'])), pred_aim=(float(row['PredAimX']), float(row['PredAimY']), float(row['PredAimZ'])), safe_count=int(row['SafeCount']), buffer_count=int(row['BufferCount']), extrap_time=float(row['ExtrapolationTime']), shot_fired=int(row.get('ShotFired', 0)) == 1, )) return frames # --- Vector math helpers --- def vec_dist(a, b): return math.sqrt(sum((ai - bi) ** 2 for ai, bi in zip(a, b))) def vec_sub(a, b): return tuple(ai - bi for ai, bi in zip(a, b)) def vec_add(a, b): return tuple(ai + bi for ai, bi in zip(a, b)) def vec_scale(a, s): return tuple(ai * s for ai in a) def vec_len(a): return math.sqrt(sum(ai * ai for ai in a)) def vec_normalize(a): l = vec_len(a) if l < 1e-10: return (0, 0, 0) return tuple(ai / l for ai in a) def angle_between(a, b): """Angle in degrees between two direction vectors.""" dot = sum(ai * bi for ai, bi in zip(a, b)) dot = max(-1.0, min(1.0, dot)) return math.degrees(math.acos(dot)) # --- Prediction error from recorded data --- def compute_prediction_error(frames: List[Frame]) -> dict: """Compute error between predicted and actual (real) positions/aims.""" pos_errors = [] aim_errors = [] for f in frames: pos_err = vec_dist(f.pred_pos, f.real_pos) pos_errors.append(pos_err) aim_a = vec_normalize(f.pred_aim) aim_b = vec_normalize(f.real_aim) if vec_len(aim_a) > 0.5 and vec_len(aim_b) > 0.5: aim_err = angle_between(aim_a, aim_b) aim_errors.append(aim_err) if not pos_errors: return {'pos_mean': 0, 'pos_p95': 0, 'pos_max': 0, 'aim_mean': 0, 'aim_p95': 0, 'aim_max': 0} pos_errors.sort() aim_errors.sort() p95_idx_pos = int(len(pos_errors) * 0.95) p95_idx_aim = int(len(aim_errors) * 0.95) if aim_errors else 0 return { 'pos_mean': sum(pos_errors) / len(pos_errors), 'pos_p95': pos_errors[min(p95_idx_pos, len(pos_errors) - 1)], 'pos_max': pos_errors[-1], 'aim_mean': sum(aim_errors) / len(aim_errors) if aim_errors else 0, 'aim_p95': aim_errors[min(p95_idx_aim, len(aim_errors) - 1)] if aim_errors else 0, 'aim_max': aim_errors[-1] if aim_errors else 0, } # --- Shot contamination analysis --- def analyze_shot_contamination(frames: List[Frame], analysis_window_ms: float = 200.0): """ Analyze how shots contaminate the tracking data. For each shot, measure the velocity/acceleration spike and how long it takes to return to baseline. This tells us the minimum discard_time needed. Returns a dict with analysis results, or None if no shots found. """ shot_indices = [i for i, f in enumerate(frames) if f.shot_fired] if not shot_indices: return None analysis_window_s = analysis_window_ms / 1000.0 # Compute per-frame speeds speeds = [0.0] for i in range(1, len(frames)): dt = frames[i].timestamp - frames[i - 1].timestamp if dt > 1e-6: d = vec_dist(frames[i].real_pos, frames[i - 1].real_pos) speeds.append(d / dt) else: speeds.append(speeds[-1] if speeds else 0.0) # For each shot, measure the speed profile before and after contamination_durations = [] speed_spikes = [] for si in shot_indices: # Baseline speed: average speed in 100ms BEFORE the shot baseline_speeds = [] for j in range(si - 1, -1, -1): if frames[si].timestamp - frames[j].timestamp > 0.1: break baseline_speeds.append(speeds[j]) if not baseline_speeds: continue baseline_mean = sum(baseline_speeds) / len(baseline_speeds) baseline_std = math.sqrt(sum((s - baseline_mean) ** 2 for s in baseline_speeds) / len(baseline_speeds)) if len(baseline_speeds) > 1 else baseline_mean * 0.1 # Threshold: speed is "contaminated" if it deviates by more than 3 sigma from baseline threshold = baseline_mean + max(3.0 * baseline_std, 10.0) # at least 10 cm/s spike # Find how long after the shot the speed stays above threshold max_speed = 0.0 last_contaminated_time = 0.0 for j in range(si, len(frames)): dt_from_shot = frames[j].timestamp - frames[si].timestamp if dt_from_shot > analysis_window_s: break if speeds[j] > threshold: last_contaminated_time = dt_from_shot if speeds[j] > max_speed: max_speed = speeds[j] contamination_durations.append(last_contaminated_time * 1000.0) # in ms speed_spikes.append(max_speed - baseline_mean) if not contamination_durations: return None contamination_durations.sort() return { 'num_shots': len(shot_indices), 'contamination_mean_ms': sum(contamination_durations) / len(contamination_durations), 'contamination_p95_ms': contamination_durations[int(len(contamination_durations) * 0.95)], 'contamination_max_ms': contamination_durations[-1], 'speed_spike_mean': sum(speed_spikes) / len(speed_spikes) if speed_spikes else 0, 'speed_spike_max': max(speed_spikes) if speed_spikes else 0, 'recommended_discard_ms': math.ceil(contamination_durations[int(len(contamination_durations) * 0.95)] / 5.0) * 5.0, # round up to 5ms } # --- Offline adaptive extrapolation simulation (matches C++ exactly) --- def simulate_adaptive(frames: List[Frame], params: AdaptiveParams) -> Tuple[List[float], List[float]]: """ Simulate the adaptive extrapolation offline with given parameters. Matches the C++ PredictAdaptiveExtrapolation algorithm exactly. Optimized for speed: pre-extracts arrays, inlines math, avoids allocations. """ pos_errors = [] aim_errors = [] n_frames = len(frames) if n_frames < 4: return pos_errors, aim_errors # Pre-extract into flat arrays for speed ts = [f.timestamp for f in frames] px = [f.real_pos[0] for f in frames] py = [f.real_pos[1] for f in frames] pz = [f.real_pos[2] for f in frames] ax = [f.real_aim[0] for f in frames] ay = [f.real_aim[1] for f in frames] az = [f.real_aim[2] for f in frames] buffer_s = params.buffer_time_ms / 1000.0 discard_s = params.discard_time_ms / 1000.0 sensitivity = params.sensitivity dead_zone = params.dead_zone min_speed = params.min_speed damping = params.damping SMALL = 1e-10 _sqrt = math.sqrt _exp = math.exp _acos = math.acos _degrees = math.degrees _pow = pow for i in range(2, n_frames - 1): ct = ts[i] safe_cutoff = ct - discard_s oldest_allowed = ct - buffer_s # Collect safe sample indices (backward scan, then reverse) safe = [] for j in range(i, -1, -1): t = ts[j] if t < oldest_allowed: break if t <= safe_cutoff: safe.append(j) safe.reverse() ns = len(safe) if ns < 2: continue # Build velocity pairs inline vpx = []; vpy = []; vpz = [] vax = []; vay = []; vaz = [] for k in range(1, ns): p, c = safe[k - 1], safe[k] dt = ts[c] - ts[p] if dt > 1e-6: inv_dt = 1.0 / dt vpx.append((px[c] - px[p]) * inv_dt) vpy.append((py[c] - py[p]) * inv_dt) vpz.append((pz[c] - pz[p]) * inv_dt) vax.append((ax[c] - ax[p]) * inv_dt) vay.append((ay[c] - ay[p]) * inv_dt) vaz.append((az[c] - az[p]) * inv_dt) nv = len(vpx) if nv < 2: continue # Weighted average velocity (quadratic weights, oldest=index 0) tw = 0.0 apx = apy = apz = 0.0 aax = aay = aaz = 0.0 for k in range(nv): w = (k + 1) * (k + 1) apx += vpx[k] * w; apy += vpy[k] * w; apz += vpz[k] * w aax += vax[k] * w; aay += vay[k] * w; aaz += vaz[k] * w tw += w inv_tw = 1.0 / tw apx *= inv_tw; apy *= inv_tw; apz *= inv_tw aax *= inv_tw; aay *= inv_tw; aaz *= inv_tw # Recent velocity (last 25%, unweighted) rs = max(0, nv - max(1, nv // 4)) rc = nv - rs rpx = rpy = rpz = 0.0 rax = ray = raz = 0.0 for k in range(rs, nv): rpx += vpx[k]; rpy += vpy[k]; rpz += vpz[k] rax += vax[k]; ray += vay[k]; raz += vaz[k] inv_rc = 1.0 / rc rpx *= inv_rc; rpy *= inv_rc; rpz *= inv_rc rax *= inv_rc; ray *= inv_rc; raz *= inv_rc avg_ps = _sqrt(apx*apx + apy*apy + apz*apz) avg_as = _sqrt(aax*aax + aay*aay + aaz*aaz) rec_ps = _sqrt(rpx*rpx + rpy*rpy + rpz*rpz) rec_as = _sqrt(rax*rax + ray*ray + raz*raz) # Position confidence pc = 1.0 if avg_ps > min_speed: ratio = rec_ps / avg_ps if ratio > 1.0: ratio = 1.0 if ratio < dead_zone: rm = ratio / dead_zone if dead_zone > SMALL else 0.0 if rm > 1.0: rm = 1.0 pc = _pow(rm, sensitivity) # Aim confidence ac = 1.0 if avg_as > min_speed: ratio = rec_as / avg_as if ratio > 1.0: ratio = 1.0 if ratio < dead_zone: rm = ratio / dead_zone if dead_zone > SMALL else 0.0 if rm > 1.0: rm = 1.0 ac = _pow(rm, sensitivity) # Extrapolation time lsi = safe[-1] edt = ct - ts[lsi] if edt <= 0: edt = 0.011 # Damping ds = _exp(-damping * edt) if damping > 0.0 else 1.0 # Predict m = edt * pc * ds ppx = px[lsi] + apx * m ppy = py[lsi] + apy * m ppz = pz[lsi] + apz * m ma = edt * ac * ds pax_r = ax[lsi] + aax * ma pay_r = ay[lsi] + aay * ma paz_r = az[lsi] + aaz * ma pa_len = _sqrt(pax_r*pax_r + pay_r*pay_r + paz_r*paz_r) # Position error dx = ppx - px[i]; dy = ppy - py[i]; dz = ppz - pz[i] pos_errors.append(_sqrt(dx*dx + dy*dy + dz*dz)) # Aim error if pa_len > 0.5: inv_pa = 1.0 / pa_len pax_n = pax_r * inv_pa; pay_n = pay_r * inv_pa; paz_n = paz_r * inv_pa ra_len = _sqrt(ax[i]*ax[i] + ay[i]*ay[i] + az[i]*az[i]) if ra_len > 0.5: inv_ra = 1.0 / ra_len dot = pax_n * ax[i] * inv_ra + pay_n * ay[i] * inv_ra + paz_n * az[i] * inv_ra if dot > 1.0: dot = 1.0 if dot < -1.0: dot = -1.0 aim_errors.append(_degrees(_acos(dot))) return pos_errors, aim_errors # --- Scoring --- def compute_score(pos_errors: List[float], aim_errors: List[float]) -> ScoreResult: """Compute a combined score from position and aim errors, including stability metrics.""" if not pos_errors: return ScoreResult(0, 0, 0, 0, 0, 0, float('inf')) pos_sorted = sorted(pos_errors) aim_sorted = sorted(aim_errors) if aim_errors else [0] pos_mean = sum(pos_errors) / len(pos_errors) pos_p95 = pos_sorted[int(len(pos_sorted) * 0.95)] aim_mean = sum(aim_errors) / len(aim_errors) if aim_errors else 0 aim_p95 = aim_sorted[int(len(aim_sorted) * 0.95)] if aim_errors else 0 # Jitter: standard deviation of frame-to-frame error change jitter = 0.0 if len(pos_errors) > 1: deltas = [abs(pos_errors[i] - pos_errors[i - 1]) for i in range(1, len(pos_errors))] delta_mean = sum(deltas) / len(deltas) jitter = math.sqrt(sum((d - delta_mean) ** 2 for d in deltas) / len(deltas)) # Overshoot: percentage of frames where error spikes above 2x mean overshoot = 0.0 if pos_mean > 0: overshoot_count = sum(1 for e in pos_errors if e > 2.0 * pos_mean) overshoot = overshoot_count / len(pos_errors) # Combined score score = (pos_mean * 0.25 + pos_p95 * 0.15 + aim_mean * 0.25 + aim_p95 * 0.15 + jitter * 0.10 + overshoot * 0.10) return ScoreResult(pos_mean, pos_p95, aim_mean, aim_p95, jitter, overshoot, score) def aggregate_scores(per_file_scores: List[Tuple[str, ScoreResult]], strategy: str = "mean") -> float: """Aggregate scores across multiple files.""" scores = [s.score for _, s in per_file_scores] if not scores: return float('inf') if strategy == "worst_case": return max(scores) else: # mean return sum(scores) / len(scores) # --- Optimizer --- def objective(x, all_frames, strategy): """Objective function for the optimizer.""" params = AdaptiveParams( sensitivity=x[0], dead_zone=x[1], min_speed=x[2], damping=x[3], buffer_time_ms=x[4], discard_time_ms=x[5] ) per_file_scores = [] for name, frames in all_frames: pos_errors, aim_errors = simulate_adaptive(frames, params) score_result = compute_score(pos_errors, aim_errors) per_file_scores.append((name, score_result)) return aggregate_scores(per_file_scores, strategy) def optimize_differential_evolution(all_frames, strategy="mean", max_iter=200, min_discard_ms=10.0): """Find optimal parameters using scipy differential evolution.""" try: from scipy.optimize import differential_evolution except ImportError: print("ERROR: scipy is required for optimization.") print("Install with: pip install scipy") sys.exit(1) bounds = [ (0.1, 5.0), # sensitivity (0.0, 0.95), # dead_zone (0.0, 200.0), # min_speed (0.0, 50.0), # damping (100.0, 500.0), # buffer_time_ms (max(10.0, min_discard_ms), 100.0), # discard_time_ms (floor from contamination analysis) ] print(f"\nRunning differential evolution (maxiter={max_iter}, popsize=25, min_discard={min_discard_ms:.0f}ms)...") print("This may take a few minutes...\n") result = differential_evolution( objective, bounds, args=(all_frames, strategy), maxiter=max_iter, seed=42, tol=1e-4, popsize=25, disp=True, workers=1 ) best_params = AdaptiveParams( sensitivity=round(result.x[0], 2), dead_zone=round(result.x[1], 3), min_speed=round(result.x[2], 1), damping=round(result.x[3], 1), buffer_time_ms=round(result.x[4], 0), discard_time_ms=round(result.x[5], 0) ) return best_params, result.fun def optimize_grid_search(all_frames, strategy="mean", min_discard_ms=10.0): """Find optimal parameters using grid search (slower but no scipy needed).""" print(f"\nRunning grid search over 6 parameters (min_discard={min_discard_ms:.0f}ms)...") sensitivities = [1.0, 2.0, 3.0, 4.0] dead_zones = [0.7, 0.8, 0.9] min_speeds = [0.0, 30.0] dampings = [5.0, 10.0, 15.0] buffer_times = [300.0, 400.0, 500.0, 600.0, 800.0] discard_times = [d for d in [20.0, 40.0, 60.0, 100.0, 150.0, 200.0] if d >= min_discard_ms] if not discard_times: discard_times = [min_discard_ms] total = (len(sensitivities) * len(dead_zones) * len(min_speeds) * len(dampings) * len(buffer_times) * len(discard_times)) print(f"Total combinations: {total}") best_score = float('inf') best_params = AdaptiveParams() count = 0 for sens in sensitivities: for dz in dead_zones: for ms in min_speeds: for damp in dampings: for bt in buffer_times: for dt in discard_times: count += 1 if count % 500 == 0: print(f" Progress: {count}/{total} ({100 * count / total:.0f}%) best={best_score:.4f}") params = AdaptiveParams(sens, dz, ms, damp, bt, dt) per_file_scores = [] for name, frames in all_frames: pos_errors, aim_errors = simulate_adaptive(frames, params) score_result = compute_score(pos_errors, aim_errors) per_file_scores.append((name, score_result)) score = aggregate_scores(per_file_scores, strategy) if score < best_score: best_score = score best_params = params return best_params, best_score # --- Main --- def print_file_stats(name: str, frames: List[Frame]): """Print basic stats for a CSV file.""" duration = frames[-1].timestamp - frames[0].timestamp avg_fps = len(frames) / duration if duration > 0 else 0 avg_safe = sum(f.safe_count for f in frames) / len(frames) avg_buffer = sum(f.buffer_count for f in frames) / len(frames) avg_extrap = sum(f.extrap_time for f in frames) / len(frames) * 1000 num_shots = sum(1 for f in frames if f.shot_fired) print(f" {os.path.basename(name)}: {len(frames)} frames, {avg_fps:.0f}fps, " f"{duration:.1f}s, safe={avg_safe:.1f}, extrap={avg_extrap:.1f}ms, shots={num_shots}") def print_score_detail(name: str, score: ScoreResult): """Print detailed score for a file.""" print(f" {os.path.basename(name):30s} Pos: mean={score.pos_mean:.3f}cm p95={score.pos_p95:.3f}cm | " f"Aim: mean={score.aim_mean:.3f}deg p95={score.aim_p95:.3f}deg | " f"jitter={score.jitter:.3f} overshoot={score.overshoot:.1%} | " f"score={score.score:.4f}") def main(): parser = argparse.ArgumentParser( description="Anti-Recoil Parameter Optimizer - finds optimal AdaptiveExtrapolation parameters" ) parser.add_argument("csv_files", nargs="+", help="One or more CSV recording files") parser.add_argument("--plot", action="store_true", help="Generate comparison plots (requires matplotlib)") parser.add_argument("--grid", action="store_true", help="Use grid search instead of differential evolution") parser.add_argument("--strategy", choices=["mean", "worst_case"], default="mean", help="Multi-file score aggregation strategy (default: mean)") parser.add_argument("--max-iter", type=int, default=200, help="Max optimizer iterations (default: 200)") args = parser.parse_args() # Load all CSV files all_frames = [] for csv_path in args.csv_files: if not os.path.exists(csv_path): print(f"Error: File not found: {csv_path}") sys.exit(1) frames = load_csv(csv_path) if len(frames) < 50: print(f"Warning: {csv_path} has only {len(frames)} frames (need at least 50 for good results)") all_frames.append((csv_path, frames)) print(f"\nLoaded {len(all_frames)} file(s)") print("=" * 70) # Per-file stats print("\n=== FILE STATISTICS ===") for name, frames in all_frames: print_file_stats(name, frames) # Shot contamination analysis has_shots = any(any(f.shot_fired for f in frames) for _, frames in all_frames) if has_shots: print("\n=== SHOT CONTAMINATION ANALYSIS ===") max_recommended_discard = 0.0 for name, frames in all_frames: result = analyze_shot_contamination(frames) if result: print(f" {os.path.basename(name)}:") print(f" Shots detected: {result['num_shots']}") print(f" Speed spike: mean={result['speed_spike_mean']:.1f} cm/s, max={result['speed_spike_max']:.1f} cm/s") print(f" Contamination duration: mean={result['contamination_mean_ms']:.1f}ms, " f"p95={result['contamination_p95_ms']:.1f}ms, max={result['contamination_max_ms']:.1f}ms") print(f" Recommended discard_time: >= {result['recommended_discard_ms']:.0f}ms") max_recommended_discard = max(max_recommended_discard, result['recommended_discard_ms']) else: print(f" {os.path.basename(name)}: no shots detected") if max_recommended_discard > 0: print(f"\n >>> MINIMUM SAFE DiscardTime across all files: {max_recommended_discard:.0f}ms <<<") else: print("\n (No ShotFired data in CSV - record with updated plugin to get contamination analysis)") # Baseline: current default parameters default_params = AdaptiveParams() print(f"\n=== BASELINE (defaults: sens={default_params.sensitivity}, dz={default_params.dead_zone}, " f"minspd={default_params.min_speed}, damp={default_params.damping}, " f"buf={default_params.buffer_time_ms}ms, disc={default_params.discard_time_ms}ms) ===") baseline_scores = [] for name, frames in all_frames: pos_errors, aim_errors = simulate_adaptive(frames, default_params) score = compute_score(pos_errors, aim_errors) baseline_scores.append((name, score)) print_score_detail(name, score) baseline_agg = aggregate_scores(baseline_scores, args.strategy) print(f"\n Aggregate score ({args.strategy}): {baseline_agg:.4f}") # Also show recorded prediction error (as-is from the engine) print(f"\n=== RECORDED PREDICTION ERROR (as captured in-engine) ===") for name, frames in all_frames: err = compute_prediction_error(frames) print(f" {os.path.basename(name):30s} Pos: mean={err['pos_mean']:.3f}cm p95={err['pos_p95']:.3f}cm | " f"Aim: mean={err['aim_mean']:.3f}deg p95={err['aim_p95']:.3f}deg") # Compute minimum safe discard time from shot contamination analysis min_discard_ms = 10.0 # absolute minimum if has_shots: for name, frames in all_frames: result = analyze_shot_contamination(frames) if result and result['recommended_discard_ms'] > min_discard_ms: min_discard_ms = result['recommended_discard_ms'] # Optimize print(f"\n=== OPTIMIZATION ({args.strategy}) ===") if args.grid: best_params, best_score = optimize_grid_search(all_frames, args.strategy, min_discard_ms) else: best_params, best_score = optimize_differential_evolution(all_frames, args.strategy, args.max_iter, min_discard_ms) # Results print(f"\n{'=' * 70}") print(f" BEST PARAMETERS FOUND:") print(f" AdaptiveSensitivity = {best_params.sensitivity}") print(f" AdaptiveDeadZone = {best_params.dead_zone}") print(f" AdaptiveMinSpeed = {best_params.min_speed}") print(f" ExtrapolationDamping = {best_params.damping}") print(f" AntiRecoilBufferTimeMs = {best_params.buffer_time_ms}") print(f" AntiRecoilDiscardTimeMs= {best_params.discard_time_ms}") print(f"{'=' * 70}") # Per-file breakdown with optimized params print(f"\n=== OPTIMIZED RESULTS ===") opt_scores = [] for name, frames in all_frames: pos_errors, aim_errors = simulate_adaptive(frames, best_params) score = compute_score(pos_errors, aim_errors) opt_scores.append((name, score)) print_score_detail(name, score) opt_agg = aggregate_scores(opt_scores, args.strategy) print(f"\n Aggregate score ({args.strategy}): {opt_agg:.4f}") # Improvement print(f"\n=== IMPROVEMENT vs BASELINE ===") for (name, baseline), (_, optimized) in zip(baseline_scores, opt_scores): pos_pct = ((baseline.pos_mean - optimized.pos_mean) / baseline.pos_mean * 100) if baseline.pos_mean > 0 else 0 aim_pct = ((baseline.aim_mean - optimized.aim_mean) / baseline.aim_mean * 100) if baseline.aim_mean > 0 else 0 score_pct = ((baseline.score - optimized.score) / baseline.score * 100) if baseline.score > 0 else 0 print(f" {os.path.basename(name):30s} Pos: {pos_pct:+.1f}% | Aim: {aim_pct:+.1f}% | Score: {score_pct:+.1f}%") total_pct = ((baseline_agg - opt_agg) / baseline_agg * 100) if baseline_agg > 0 else 0 print(f" {'TOTAL':30s} Score: {total_pct:+.1f}%") # Plotting if args.plot: try: import matplotlib.pyplot as plt n_files = len(all_frames) fig, axes = plt.subplots(n_files, 3, figsize=(18, 5 * n_files), squeeze=False) for row, (name, frames) in enumerate(all_frames): timestamps = [f.timestamp - frames[0].timestamp for f in frames] short_name = os.path.basename(name) # Baseline errors bl_pos, bl_aim = simulate_adaptive(frames, default_params) # Optimized errors op_pos, op_aim = simulate_adaptive(frames, best_params) # Time axis for simulated errors (offset by window_size) t_start = window_size = 12 sim_timestamps = [frames[i].timestamp - frames[0].timestamp for i in range(t_start + 1, t_start + 1 + len(bl_pos))] # Position error ax = axes[row][0] if len(sim_timestamps) == len(bl_pos): ax.plot(sim_timestamps, bl_pos, 'r-', alpha=0.4, linewidth=0.5, label='Baseline') ax.plot(sim_timestamps, op_pos, 'g-', alpha=0.4, linewidth=0.5, label='Optimized') ax.set_ylabel('Position Error (cm)') ax.set_title(f'{short_name} - Position Error') ax.legend() # Aim error ax = axes[row][1] if len(sim_timestamps) >= len(bl_aim): t_aim = sim_timestamps[:len(bl_aim)] ax.plot(t_aim, bl_aim, 'r-', alpha=0.4, linewidth=0.5, label='Baseline') if len(sim_timestamps) >= len(op_aim): t_aim = sim_timestamps[:len(op_aim)] ax.plot(t_aim, op_aim, 'g-', alpha=0.4, linewidth=0.5, label='Optimized') ax.set_ylabel('Aim Error (deg)') ax.set_title(f'{short_name} - Aim Error') ax.legend() # Speed profile ax = axes[row][2] speeds = [0] for i in range(1, len(frames)): dt = frames[i].timestamp - frames[i - 1].timestamp if dt > 1e-6: d = vec_dist(frames[i].real_pos, frames[i - 1].real_pos) speeds.append(d / dt) else: speeds.append(speeds[-1]) ax.plot(timestamps, speeds, 'b-', alpha=0.7, linewidth=0.5) ax.set_ylabel('Speed (cm/s)') ax.set_xlabel('Time (s)') ax.set_title(f'{short_name} - Speed Profile') plt.tight_layout() plot_path = args.csv_files[0].replace('.csv', '_optimizer.png') plt.savefig(plot_path, dpi=150) print(f"\nPlot saved: {plot_path}") plt.show() except ImportError: print("\nmatplotlib not installed. Install with: pip install matplotlib") print("\nDone.") if __name__ == '__main__': main()