PS_Ballistics/Tools/analyze_antirecoil.py
j.foucher cd097e4e55 Optimize adaptive extrapolation defaults from real-world test data
- Update defaults from test-driven optimization:
  BufferTime=200ms, DiscardTime=30ms, Sensitivity=3.0,
  DeadZone=0.95, MinSpeed=0.0, Damping=5.0
- Add ShotFired column to CSV recording for contamination analysis
- Rewrite Python optimizer with 6-parameter search (sensitivity,
  dead zone, min speed, damping, buffer time, discard time)
- Fix velocity weighting order bug in Python simulation
- Add dead zone, min speed threshold, and damping to Python sim
- Add shot contamination analysis (analyze_shots.py) to measure
  exact IMU perturbation duration per shot
- Support multi-file optimization with mean/worst_case strategies
- Add jitter and overshoot scoring metrics

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 18:33:14 +01:00

780 lines
29 KiB
Python

"""
Anti-Recoil Parameter Optimizer
================================
Reads CSV files recorded by the EBBarrel CSV recording feature and finds
optimal parameters for the Adaptive Extrapolation mode.
Usage:
python analyze_antirecoil.py <csv_file> [csv_file2 ...] [options]
Options:
--plot Generate comparison plots (requires matplotlib)
--grid Use grid search instead of differential evolution
--strategy <s> Multi-file aggregation: mean (default), worst_case
--max-iter <n> Max optimizer iterations (default: 200)
The script:
1. Loads per-frame data (real position/aim vs predicted position/aim)
2. Simulates adaptive extrapolation offline (matching C++ exactly)
3. Optimizes all 4 parameters: Sensitivity, DeadZone, MinSpeed, Damping
4. Reports recommended parameters with per-file breakdown
"""
import csv
import sys
import math
import os
import argparse
from dataclasses import dataclass
from typing import List, Tuple, Optional
@dataclass
class Frame:
timestamp: float
real_pos: Tuple[float, float, float]
real_aim: Tuple[float, float, float]
pred_pos: Tuple[float, float, float]
pred_aim: Tuple[float, float, float]
safe_count: int
buffer_count: int
extrap_time: float
shot_fired: bool = False
@dataclass
class AdaptiveParams:
sensitivity: float = 3.0
dead_zone: float = 0.95
min_speed: float = 0.0
damping: float = 5.0
buffer_time_ms: float = 200.0
discard_time_ms: float = 30.0
@dataclass
class ScoreResult:
pos_mean: float
pos_p95: float
aim_mean: float
aim_p95: float
jitter: float
overshoot: float
score: float
def load_csv(path: str) -> List[Frame]:
frames = []
with open(path, 'r') as f:
reader = csv.DictReader(f)
has_shot_col = False
for row in reader:
if not has_shot_col and 'ShotFired' in row:
has_shot_col = True
frames.append(Frame(
timestamp=float(row['Timestamp']),
real_pos=(float(row['RealPosX']), float(row['RealPosY']), float(row['RealPosZ'])),
real_aim=(float(row['RealAimX']), float(row['RealAimY']), float(row['RealAimZ'])),
pred_pos=(float(row['PredPosX']), float(row['PredPosY']), float(row['PredPosZ'])),
pred_aim=(float(row['PredAimX']), float(row['PredAimY']), float(row['PredAimZ'])),
safe_count=int(row['SafeCount']),
buffer_count=int(row['BufferCount']),
extrap_time=float(row['ExtrapolationTime']),
shot_fired=int(row.get('ShotFired', 0)) == 1,
))
return frames
# --- Vector math helpers ---
def vec_dist(a, b):
return math.sqrt(sum((ai - bi) ** 2 for ai, bi in zip(a, b)))
def vec_sub(a, b):
return tuple(ai - bi for ai, bi in zip(a, b))
def vec_add(a, b):
return tuple(ai + bi for ai, bi in zip(a, b))
def vec_scale(a, s):
return tuple(ai * s for ai in a)
def vec_len(a):
return math.sqrt(sum(ai * ai for ai in a))
def vec_normalize(a):
l = vec_len(a)
if l < 1e-10:
return (0, 0, 0)
return tuple(ai / l for ai in a)
def angle_between(a, b):
"""Angle in degrees between two direction vectors."""
dot = sum(ai * bi for ai, bi in zip(a, b))
dot = max(-1.0, min(1.0, dot))
return math.degrees(math.acos(dot))
# --- Prediction error from recorded data ---
def compute_prediction_error(frames: List[Frame]) -> dict:
"""Compute error between predicted and actual (real) positions/aims."""
pos_errors = []
aim_errors = []
for f in frames:
pos_err = vec_dist(f.pred_pos, f.real_pos)
pos_errors.append(pos_err)
aim_a = vec_normalize(f.pred_aim)
aim_b = vec_normalize(f.real_aim)
if vec_len(aim_a) > 0.5 and vec_len(aim_b) > 0.5:
aim_err = angle_between(aim_a, aim_b)
aim_errors.append(aim_err)
if not pos_errors:
return {'pos_mean': 0, 'pos_p95': 0, 'pos_max': 0, 'aim_mean': 0, 'aim_p95': 0, 'aim_max': 0}
pos_errors.sort()
aim_errors.sort()
p95_idx_pos = int(len(pos_errors) * 0.95)
p95_idx_aim = int(len(aim_errors) * 0.95) if aim_errors else 0
return {
'pos_mean': sum(pos_errors) / len(pos_errors),
'pos_p95': pos_errors[min(p95_idx_pos, len(pos_errors) - 1)],
'pos_max': pos_errors[-1],
'aim_mean': sum(aim_errors) / len(aim_errors) if aim_errors else 0,
'aim_p95': aim_errors[min(p95_idx_aim, len(aim_errors) - 1)] if aim_errors else 0,
'aim_max': aim_errors[-1] if aim_errors else 0,
}
# --- Shot contamination analysis ---
def analyze_shot_contamination(frames: List[Frame], analysis_window_ms: float = 200.0):
"""
Analyze how shots contaminate the tracking data.
For each shot, measure the velocity/acceleration spike and how long it takes
to return to baseline. This tells us the minimum discard_time needed.
Returns a dict with analysis results, or None if no shots found.
"""
shot_indices = [i for i, f in enumerate(frames) if f.shot_fired]
if not shot_indices:
return None
analysis_window_s = analysis_window_ms / 1000.0
# Compute per-frame speeds
speeds = [0.0]
for i in range(1, len(frames)):
dt = frames[i].timestamp - frames[i - 1].timestamp
if dt > 1e-6:
d = vec_dist(frames[i].real_pos, frames[i - 1].real_pos)
speeds.append(d / dt)
else:
speeds.append(speeds[-1] if speeds else 0.0)
# For each shot, measure the speed profile before and after
contamination_durations = []
speed_spikes = []
for si in shot_indices:
# Baseline speed: average speed in 100ms BEFORE the shot
baseline_speeds = []
for j in range(si - 1, -1, -1):
if frames[si].timestamp - frames[j].timestamp > 0.1:
break
baseline_speeds.append(speeds[j])
if not baseline_speeds:
continue
baseline_mean = sum(baseline_speeds) / len(baseline_speeds)
baseline_std = math.sqrt(sum((s - baseline_mean) ** 2 for s in baseline_speeds) / len(baseline_speeds)) if len(baseline_speeds) > 1 else baseline_mean * 0.1
# Threshold: speed is "contaminated" if it deviates by more than 3 sigma from baseline
threshold = baseline_mean + max(3.0 * baseline_std, 10.0) # at least 10 cm/s spike
# Find how long after the shot the speed stays above threshold
max_speed = 0.0
last_contaminated_time = 0.0
for j in range(si, len(frames)):
dt_from_shot = frames[j].timestamp - frames[si].timestamp
if dt_from_shot > analysis_window_s:
break
if speeds[j] > threshold:
last_contaminated_time = dt_from_shot
if speeds[j] > max_speed:
max_speed = speeds[j]
contamination_durations.append(last_contaminated_time * 1000.0) # in ms
speed_spikes.append(max_speed - baseline_mean)
if not contamination_durations:
return None
contamination_durations.sort()
return {
'num_shots': len(shot_indices),
'contamination_mean_ms': sum(contamination_durations) / len(contamination_durations),
'contamination_p95_ms': contamination_durations[int(len(contamination_durations) * 0.95)],
'contamination_max_ms': contamination_durations[-1],
'speed_spike_mean': sum(speed_spikes) / len(speed_spikes) if speed_spikes else 0,
'speed_spike_max': max(speed_spikes) if speed_spikes else 0,
'recommended_discard_ms': math.ceil(contamination_durations[int(len(contamination_durations) * 0.95)] / 5.0) * 5.0, # round up to 5ms
}
# --- Offline adaptive extrapolation simulation (matches C++ exactly) ---
def simulate_adaptive(frames: List[Frame], params: AdaptiveParams) -> Tuple[List[float], List[float]]:
"""
Simulate the adaptive extrapolation offline with given parameters.
Matches the C++ PredictAdaptiveExtrapolation algorithm exactly.
Optimized for speed: pre-extracts arrays, inlines math, avoids allocations.
"""
pos_errors = []
aim_errors = []
n_frames = len(frames)
if n_frames < 4:
return pos_errors, aim_errors
# Pre-extract into flat arrays for speed
ts = [f.timestamp for f in frames]
px = [f.real_pos[0] for f in frames]
py = [f.real_pos[1] for f in frames]
pz = [f.real_pos[2] for f in frames]
ax = [f.real_aim[0] for f in frames]
ay = [f.real_aim[1] for f in frames]
az = [f.real_aim[2] for f in frames]
buffer_s = params.buffer_time_ms / 1000.0
discard_s = params.discard_time_ms / 1000.0
sensitivity = params.sensitivity
dead_zone = params.dead_zone
min_speed = params.min_speed
damping = params.damping
SMALL = 1e-10
_sqrt = math.sqrt
_exp = math.exp
_acos = math.acos
_degrees = math.degrees
_pow = pow
for i in range(2, n_frames - 1):
ct = ts[i]
safe_cutoff = ct - discard_s
oldest_allowed = ct - buffer_s
# Collect safe sample indices (backward scan, then reverse)
safe = []
for j in range(i, -1, -1):
t = ts[j]
if t < oldest_allowed:
break
if t <= safe_cutoff:
safe.append(j)
safe.reverse()
ns = len(safe)
if ns < 2:
continue
# Build velocity pairs inline
vpx = []; vpy = []; vpz = []
vax = []; vay = []; vaz = []
for k in range(1, ns):
p, c = safe[k - 1], safe[k]
dt = ts[c] - ts[p]
if dt > 1e-6:
inv_dt = 1.0 / dt
vpx.append((px[c] - px[p]) * inv_dt)
vpy.append((py[c] - py[p]) * inv_dt)
vpz.append((pz[c] - pz[p]) * inv_dt)
vax.append((ax[c] - ax[p]) * inv_dt)
vay.append((ay[c] - ay[p]) * inv_dt)
vaz.append((az[c] - az[p]) * inv_dt)
nv = len(vpx)
if nv < 2:
continue
# Weighted average velocity (quadratic weights, oldest=index 0)
tw = 0.0
apx = apy = apz = 0.0
aax = aay = aaz = 0.0
for k in range(nv):
w = (k + 1) * (k + 1)
apx += vpx[k] * w; apy += vpy[k] * w; apz += vpz[k] * w
aax += vax[k] * w; aay += vay[k] * w; aaz += vaz[k] * w
tw += w
inv_tw = 1.0 / tw
apx *= inv_tw; apy *= inv_tw; apz *= inv_tw
aax *= inv_tw; aay *= inv_tw; aaz *= inv_tw
# Recent velocity (last 25%, unweighted)
rs = max(0, nv - max(1, nv // 4))
rc = nv - rs
rpx = rpy = rpz = 0.0
rax = ray = raz = 0.0
for k in range(rs, nv):
rpx += vpx[k]; rpy += vpy[k]; rpz += vpz[k]
rax += vax[k]; ray += vay[k]; raz += vaz[k]
inv_rc = 1.0 / rc
rpx *= inv_rc; rpy *= inv_rc; rpz *= inv_rc
rax *= inv_rc; ray *= inv_rc; raz *= inv_rc
avg_ps = _sqrt(apx*apx + apy*apy + apz*apz)
avg_as = _sqrt(aax*aax + aay*aay + aaz*aaz)
rec_ps = _sqrt(rpx*rpx + rpy*rpy + rpz*rpz)
rec_as = _sqrt(rax*rax + ray*ray + raz*raz)
# Position confidence
pc = 1.0
if avg_ps > min_speed:
ratio = rec_ps / avg_ps
if ratio > 1.0: ratio = 1.0
if ratio < dead_zone:
rm = ratio / dead_zone if dead_zone > SMALL else 0.0
if rm > 1.0: rm = 1.0
pc = _pow(rm, sensitivity)
# Aim confidence
ac = 1.0
if avg_as > min_speed:
ratio = rec_as / avg_as
if ratio > 1.0: ratio = 1.0
if ratio < dead_zone:
rm = ratio / dead_zone if dead_zone > SMALL else 0.0
if rm > 1.0: rm = 1.0
ac = _pow(rm, sensitivity)
# Extrapolation time
lsi = safe[-1]
edt = ct - ts[lsi]
if edt <= 0: edt = 0.011
# Damping
ds = _exp(-damping * edt) if damping > 0.0 else 1.0
# Predict
m = edt * pc * ds
ppx = px[lsi] + apx * m
ppy = py[lsi] + apy * m
ppz = pz[lsi] + apz * m
ma = edt * ac * ds
pax_r = ax[lsi] + aax * ma
pay_r = ay[lsi] + aay * ma
paz_r = az[lsi] + aaz * ma
pa_len = _sqrt(pax_r*pax_r + pay_r*pay_r + paz_r*paz_r)
# Position error
dx = ppx - px[i]; dy = ppy - py[i]; dz = ppz - pz[i]
pos_errors.append(_sqrt(dx*dx + dy*dy + dz*dz))
# Aim error
if pa_len > 0.5:
inv_pa = 1.0 / pa_len
pax_n = pax_r * inv_pa; pay_n = pay_r * inv_pa; paz_n = paz_r * inv_pa
ra_len = _sqrt(ax[i]*ax[i] + ay[i]*ay[i] + az[i]*az[i])
if ra_len > 0.5:
inv_ra = 1.0 / ra_len
dot = pax_n * ax[i] * inv_ra + pay_n * ay[i] * inv_ra + paz_n * az[i] * inv_ra
if dot > 1.0: dot = 1.0
if dot < -1.0: dot = -1.0
aim_errors.append(_degrees(_acos(dot)))
return pos_errors, aim_errors
# --- Scoring ---
def compute_score(pos_errors: List[float], aim_errors: List[float]) -> ScoreResult:
"""Compute a combined score from position and aim errors, including stability metrics."""
if not pos_errors:
return ScoreResult(0, 0, 0, 0, 0, 0, float('inf'))
pos_sorted = sorted(pos_errors)
aim_sorted = sorted(aim_errors) if aim_errors else [0]
pos_mean = sum(pos_errors) / len(pos_errors)
pos_p95 = pos_sorted[int(len(pos_sorted) * 0.95)]
aim_mean = sum(aim_errors) / len(aim_errors) if aim_errors else 0
aim_p95 = aim_sorted[int(len(aim_sorted) * 0.95)] if aim_errors else 0
# Jitter: standard deviation of frame-to-frame error change
jitter = 0.0
if len(pos_errors) > 1:
deltas = [abs(pos_errors[i] - pos_errors[i - 1]) for i in range(1, len(pos_errors))]
delta_mean = sum(deltas) / len(deltas)
jitter = math.sqrt(sum((d - delta_mean) ** 2 for d in deltas) / len(deltas))
# Overshoot: percentage of frames where error spikes above 2x mean
overshoot = 0.0
if pos_mean > 0:
overshoot_count = sum(1 for e in pos_errors if e > 2.0 * pos_mean)
overshoot = overshoot_count / len(pos_errors)
# Combined score
score = (pos_mean * 0.25 + pos_p95 * 0.15 +
aim_mean * 0.25 + aim_p95 * 0.15 +
jitter * 0.10 + overshoot * 0.10)
return ScoreResult(pos_mean, pos_p95, aim_mean, aim_p95, jitter, overshoot, score)
def aggregate_scores(per_file_scores: List[Tuple[str, ScoreResult]],
strategy: str = "mean") -> float:
"""Aggregate scores across multiple files."""
scores = [s.score for _, s in per_file_scores]
if not scores:
return float('inf')
if strategy == "worst_case":
return max(scores)
else: # mean
return sum(scores) / len(scores)
# --- Optimizer ---
def objective(x, all_frames, strategy):
"""Objective function for the optimizer."""
params = AdaptiveParams(
sensitivity=x[0],
dead_zone=x[1],
min_speed=x[2],
damping=x[3],
buffer_time_ms=x[4],
discard_time_ms=x[5]
)
per_file_scores = []
for name, frames in all_frames:
pos_errors, aim_errors = simulate_adaptive(frames, params)
score_result = compute_score(pos_errors, aim_errors)
per_file_scores.append((name, score_result))
return aggregate_scores(per_file_scores, strategy)
def optimize_differential_evolution(all_frames, strategy="mean", max_iter=200, min_discard_ms=10.0):
"""Find optimal parameters using scipy differential evolution."""
try:
from scipy.optimize import differential_evolution
except ImportError:
print("ERROR: scipy is required for optimization.")
print("Install with: pip install scipy")
sys.exit(1)
bounds = [
(0.1, 5.0), # sensitivity
(0.0, 0.95), # dead_zone
(0.0, 200.0), # min_speed
(0.0, 50.0), # damping
(100.0, 500.0), # buffer_time_ms
(max(10.0, min_discard_ms), 100.0), # discard_time_ms (floor from contamination analysis)
]
print(f"\nRunning differential evolution (maxiter={max_iter}, popsize=25, min_discard={min_discard_ms:.0f}ms)...")
print("This may take a few minutes...\n")
result = differential_evolution(
objective,
bounds,
args=(all_frames, strategy),
maxiter=max_iter,
seed=42,
tol=1e-4,
popsize=25,
disp=True,
workers=1
)
best_params = AdaptiveParams(
sensitivity=round(result.x[0], 2),
dead_zone=round(result.x[1], 3),
min_speed=round(result.x[2], 1),
damping=round(result.x[3], 1),
buffer_time_ms=round(result.x[4], 0),
discard_time_ms=round(result.x[5], 0)
)
return best_params, result.fun
def optimize_grid_search(all_frames, strategy="mean", min_discard_ms=10.0):
"""Find optimal parameters using grid search (slower but no scipy needed)."""
print(f"\nRunning grid search over 6 parameters (min_discard={min_discard_ms:.0f}ms)...")
sensitivities = [1.0, 2.0, 3.0, 4.0]
dead_zones = [0.7, 0.8, 0.9]
min_speeds = [0.0, 30.0]
dampings = [5.0, 10.0, 15.0]
buffer_times = [300.0, 400.0, 500.0, 600.0, 800.0]
discard_times = [d for d in [20.0, 40.0, 60.0, 100.0, 150.0, 200.0] if d >= min_discard_ms]
if not discard_times:
discard_times = [min_discard_ms]
total = (len(sensitivities) * len(dead_zones) * len(min_speeds) *
len(dampings) * len(buffer_times) * len(discard_times))
print(f"Total combinations: {total}")
best_score = float('inf')
best_params = AdaptiveParams()
count = 0
for sens in sensitivities:
for dz in dead_zones:
for ms in min_speeds:
for damp in dampings:
for bt in buffer_times:
for dt in discard_times:
count += 1
if count % 500 == 0:
print(f" Progress: {count}/{total} ({100 * count / total:.0f}%) best={best_score:.4f}")
params = AdaptiveParams(sens, dz, ms, damp, bt, dt)
per_file_scores = []
for name, frames in all_frames:
pos_errors, aim_errors = simulate_adaptive(frames, params)
score_result = compute_score(pos_errors, aim_errors)
per_file_scores.append((name, score_result))
score = aggregate_scores(per_file_scores, strategy)
if score < best_score:
best_score = score
best_params = params
return best_params, best_score
# --- Main ---
def print_file_stats(name: str, frames: List[Frame]):
"""Print basic stats for a CSV file."""
duration = frames[-1].timestamp - frames[0].timestamp
avg_fps = len(frames) / duration if duration > 0 else 0
avg_safe = sum(f.safe_count for f in frames) / len(frames)
avg_buffer = sum(f.buffer_count for f in frames) / len(frames)
avg_extrap = sum(f.extrap_time for f in frames) / len(frames) * 1000
num_shots = sum(1 for f in frames if f.shot_fired)
print(f" {os.path.basename(name)}: {len(frames)} frames, {avg_fps:.0f}fps, "
f"{duration:.1f}s, safe={avg_safe:.1f}, extrap={avg_extrap:.1f}ms, shots={num_shots}")
def print_score_detail(name: str, score: ScoreResult):
"""Print detailed score for a file."""
print(f" {os.path.basename(name):30s} Pos: mean={score.pos_mean:.3f}cm p95={score.pos_p95:.3f}cm | "
f"Aim: mean={score.aim_mean:.3f}deg p95={score.aim_p95:.3f}deg | "
f"jitter={score.jitter:.3f} overshoot={score.overshoot:.1%} | "
f"score={score.score:.4f}")
def main():
parser = argparse.ArgumentParser(
description="Anti-Recoil Parameter Optimizer - finds optimal AdaptiveExtrapolation parameters"
)
parser.add_argument("csv_files", nargs="+", help="One or more CSV recording files")
parser.add_argument("--plot", action="store_true", help="Generate comparison plots (requires matplotlib)")
parser.add_argument("--grid", action="store_true", help="Use grid search instead of differential evolution")
parser.add_argument("--strategy", choices=["mean", "worst_case"], default="mean",
help="Multi-file score aggregation strategy (default: mean)")
parser.add_argument("--max-iter", type=int, default=200, help="Max optimizer iterations (default: 200)")
args = parser.parse_args()
# Load all CSV files
all_frames = []
for csv_path in args.csv_files:
if not os.path.exists(csv_path):
print(f"Error: File not found: {csv_path}")
sys.exit(1)
frames = load_csv(csv_path)
if len(frames) < 50:
print(f"Warning: {csv_path} has only {len(frames)} frames (need at least 50 for good results)")
all_frames.append((csv_path, frames))
print(f"\nLoaded {len(all_frames)} file(s)")
print("=" * 70)
# Per-file stats
print("\n=== FILE STATISTICS ===")
for name, frames in all_frames:
print_file_stats(name, frames)
# Shot contamination analysis
has_shots = any(any(f.shot_fired for f in frames) for _, frames in all_frames)
if has_shots:
print("\n=== SHOT CONTAMINATION ANALYSIS ===")
max_recommended_discard = 0.0
for name, frames in all_frames:
result = analyze_shot_contamination(frames)
if result:
print(f" {os.path.basename(name)}:")
print(f" Shots detected: {result['num_shots']}")
print(f" Speed spike: mean={result['speed_spike_mean']:.1f} cm/s, max={result['speed_spike_max']:.1f} cm/s")
print(f" Contamination duration: mean={result['contamination_mean_ms']:.1f}ms, "
f"p95={result['contamination_p95_ms']:.1f}ms, max={result['contamination_max_ms']:.1f}ms")
print(f" Recommended discard_time: >= {result['recommended_discard_ms']:.0f}ms")
max_recommended_discard = max(max_recommended_discard, result['recommended_discard_ms'])
else:
print(f" {os.path.basename(name)}: no shots detected")
if max_recommended_discard > 0:
print(f"\n >>> MINIMUM SAFE DiscardTime across all files: {max_recommended_discard:.0f}ms <<<")
else:
print("\n (No ShotFired data in CSV - record with updated plugin to get contamination analysis)")
# Baseline: current default parameters
default_params = AdaptiveParams()
print(f"\n=== BASELINE (defaults: sens={default_params.sensitivity}, dz={default_params.dead_zone}, "
f"minspd={default_params.min_speed}, damp={default_params.damping}, "
f"buf={default_params.buffer_time_ms}ms, disc={default_params.discard_time_ms}ms) ===")
baseline_scores = []
for name, frames in all_frames:
pos_errors, aim_errors = simulate_adaptive(frames, default_params)
score = compute_score(pos_errors, aim_errors)
baseline_scores.append((name, score))
print_score_detail(name, score)
baseline_agg = aggregate_scores(baseline_scores, args.strategy)
print(f"\n Aggregate score ({args.strategy}): {baseline_agg:.4f}")
# Also show recorded prediction error (as-is from the engine)
print(f"\n=== RECORDED PREDICTION ERROR (as captured in-engine) ===")
for name, frames in all_frames:
err = compute_prediction_error(frames)
print(f" {os.path.basename(name):30s} Pos: mean={err['pos_mean']:.3f}cm p95={err['pos_p95']:.3f}cm | "
f"Aim: mean={err['aim_mean']:.3f}deg p95={err['aim_p95']:.3f}deg")
# Compute minimum safe discard time from shot contamination analysis
min_discard_ms = 10.0 # absolute minimum
if has_shots:
for name, frames in all_frames:
result = analyze_shot_contamination(frames)
if result and result['recommended_discard_ms'] > min_discard_ms:
min_discard_ms = result['recommended_discard_ms']
# Optimize
print(f"\n=== OPTIMIZATION ({args.strategy}) ===")
if args.grid:
best_params, best_score = optimize_grid_search(all_frames, args.strategy, min_discard_ms)
else:
best_params, best_score = optimize_differential_evolution(all_frames, args.strategy, args.max_iter, min_discard_ms)
# Results
print(f"\n{'=' * 70}")
print(f" BEST PARAMETERS FOUND:")
print(f" AdaptiveSensitivity = {best_params.sensitivity}")
print(f" AdaptiveDeadZone = {best_params.dead_zone}")
print(f" AdaptiveMinSpeed = {best_params.min_speed}")
print(f" ExtrapolationDamping = {best_params.damping}")
print(f" AntiRecoilBufferTimeMs = {best_params.buffer_time_ms}")
print(f" AntiRecoilDiscardTimeMs= {best_params.discard_time_ms}")
print(f"{'=' * 70}")
# Per-file breakdown with optimized params
print(f"\n=== OPTIMIZED RESULTS ===")
opt_scores = []
for name, frames in all_frames:
pos_errors, aim_errors = simulate_adaptive(frames, best_params)
score = compute_score(pos_errors, aim_errors)
opt_scores.append((name, score))
print_score_detail(name, score)
opt_agg = aggregate_scores(opt_scores, args.strategy)
print(f"\n Aggregate score ({args.strategy}): {opt_agg:.4f}")
# Improvement
print(f"\n=== IMPROVEMENT vs BASELINE ===")
for (name, baseline), (_, optimized) in zip(baseline_scores, opt_scores):
pos_pct = ((baseline.pos_mean - optimized.pos_mean) / baseline.pos_mean * 100) if baseline.pos_mean > 0 else 0
aim_pct = ((baseline.aim_mean - optimized.aim_mean) / baseline.aim_mean * 100) if baseline.aim_mean > 0 else 0
score_pct = ((baseline.score - optimized.score) / baseline.score * 100) if baseline.score > 0 else 0
print(f" {os.path.basename(name):30s} Pos: {pos_pct:+.1f}% | Aim: {aim_pct:+.1f}% | Score: {score_pct:+.1f}%")
total_pct = ((baseline_agg - opt_agg) / baseline_agg * 100) if baseline_agg > 0 else 0
print(f" {'TOTAL':30s} Score: {total_pct:+.1f}%")
# Plotting
if args.plot:
try:
import matplotlib.pyplot as plt
n_files = len(all_frames)
fig, axes = plt.subplots(n_files, 3, figsize=(18, 5 * n_files), squeeze=False)
for row, (name, frames) in enumerate(all_frames):
timestamps = [f.timestamp - frames[0].timestamp for f in frames]
short_name = os.path.basename(name)
# Baseline errors
bl_pos, bl_aim = simulate_adaptive(frames, default_params)
# Optimized errors
op_pos, op_aim = simulate_adaptive(frames, best_params)
# Time axis for simulated errors (offset by window_size)
t_start = window_size = 12
sim_timestamps = [frames[i].timestamp - frames[0].timestamp
for i in range(t_start + 1, t_start + 1 + len(bl_pos))]
# Position error
ax = axes[row][0]
if len(sim_timestamps) == len(bl_pos):
ax.plot(sim_timestamps, bl_pos, 'r-', alpha=0.4, linewidth=0.5, label='Baseline')
ax.plot(sim_timestamps, op_pos, 'g-', alpha=0.4, linewidth=0.5, label='Optimized')
ax.set_ylabel('Position Error (cm)')
ax.set_title(f'{short_name} - Position Error')
ax.legend()
# Aim error
ax = axes[row][1]
if len(sim_timestamps) >= len(bl_aim):
t_aim = sim_timestamps[:len(bl_aim)]
ax.plot(t_aim, bl_aim, 'r-', alpha=0.4, linewidth=0.5, label='Baseline')
if len(sim_timestamps) >= len(op_aim):
t_aim = sim_timestamps[:len(op_aim)]
ax.plot(t_aim, op_aim, 'g-', alpha=0.4, linewidth=0.5, label='Optimized')
ax.set_ylabel('Aim Error (deg)')
ax.set_title(f'{short_name} - Aim Error')
ax.legend()
# Speed profile
ax = axes[row][2]
speeds = [0]
for i in range(1, len(frames)):
dt = frames[i].timestamp - frames[i - 1].timestamp
if dt > 1e-6:
d = vec_dist(frames[i].real_pos, frames[i - 1].real_pos)
speeds.append(d / dt)
else:
speeds.append(speeds[-1])
ax.plot(timestamps, speeds, 'b-', alpha=0.7, linewidth=0.5)
ax.set_ylabel('Speed (cm/s)')
ax.set_xlabel('Time (s)')
ax.set_title(f'{short_name} - Speed Profile')
plt.tight_layout()
plot_path = args.csv_files[0].replace('.csv', '_optimizer.png')
plt.savefig(plot_path, dpi=150)
print(f"\nPlot saved: {plot_path}")
plt.show()
except ImportError:
print("\nmatplotlib not installed. Install with: pip install matplotlib")
print("\nDone.")
if __name__ == '__main__':
main()