- Update defaults from test-driven optimization: BufferTime=200ms, DiscardTime=30ms, Sensitivity=3.0, DeadZone=0.95, MinSpeed=0.0, Damping=5.0 - Add ShotFired column to CSV recording for contamination analysis - Rewrite Python optimizer with 6-parameter search (sensitivity, dead zone, min speed, damping, buffer time, discard time) - Fix velocity weighting order bug in Python simulation - Add dead zone, min speed threshold, and damping to Python sim - Add shot contamination analysis (analyze_shots.py) to measure exact IMU perturbation duration per shot - Support multi-file optimization with mean/worst_case strategies - Add jitter and overshoot scoring metrics Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
780 lines
29 KiB
Python
780 lines
29 KiB
Python
"""
|
|
Anti-Recoil Parameter Optimizer
|
|
================================
|
|
Reads CSV files recorded by the EBBarrel CSV recording feature and finds
|
|
optimal parameters for the Adaptive Extrapolation mode.
|
|
|
|
Usage:
|
|
python analyze_antirecoil.py <csv_file> [csv_file2 ...] [options]
|
|
|
|
Options:
|
|
--plot Generate comparison plots (requires matplotlib)
|
|
--grid Use grid search instead of differential evolution
|
|
--strategy <s> Multi-file aggregation: mean (default), worst_case
|
|
--max-iter <n> Max optimizer iterations (default: 200)
|
|
|
|
The script:
|
|
1. Loads per-frame data (real position/aim vs predicted position/aim)
|
|
2. Simulates adaptive extrapolation offline (matching C++ exactly)
|
|
3. Optimizes all 4 parameters: Sensitivity, DeadZone, MinSpeed, Damping
|
|
4. Reports recommended parameters with per-file breakdown
|
|
"""
|
|
|
|
import csv
|
|
import sys
|
|
import math
|
|
import os
|
|
import argparse
|
|
from dataclasses import dataclass
|
|
from typing import List, Tuple, Optional
|
|
|
|
|
|
@dataclass
|
|
class Frame:
|
|
timestamp: float
|
|
real_pos: Tuple[float, float, float]
|
|
real_aim: Tuple[float, float, float]
|
|
pred_pos: Tuple[float, float, float]
|
|
pred_aim: Tuple[float, float, float]
|
|
safe_count: int
|
|
buffer_count: int
|
|
extrap_time: float
|
|
shot_fired: bool = False
|
|
|
|
|
|
@dataclass
|
|
class AdaptiveParams:
|
|
sensitivity: float = 3.0
|
|
dead_zone: float = 0.95
|
|
min_speed: float = 0.0
|
|
damping: float = 5.0
|
|
buffer_time_ms: float = 200.0
|
|
discard_time_ms: float = 30.0
|
|
|
|
|
|
@dataclass
|
|
class ScoreResult:
|
|
pos_mean: float
|
|
pos_p95: float
|
|
aim_mean: float
|
|
aim_p95: float
|
|
jitter: float
|
|
overshoot: float
|
|
score: float
|
|
|
|
|
|
def load_csv(path: str) -> List[Frame]:
|
|
frames = []
|
|
with open(path, 'r') as f:
|
|
reader = csv.DictReader(f)
|
|
has_shot_col = False
|
|
for row in reader:
|
|
if not has_shot_col and 'ShotFired' in row:
|
|
has_shot_col = True
|
|
frames.append(Frame(
|
|
timestamp=float(row['Timestamp']),
|
|
real_pos=(float(row['RealPosX']), float(row['RealPosY']), float(row['RealPosZ'])),
|
|
real_aim=(float(row['RealAimX']), float(row['RealAimY']), float(row['RealAimZ'])),
|
|
pred_pos=(float(row['PredPosX']), float(row['PredPosY']), float(row['PredPosZ'])),
|
|
pred_aim=(float(row['PredAimX']), float(row['PredAimY']), float(row['PredAimZ'])),
|
|
safe_count=int(row['SafeCount']),
|
|
buffer_count=int(row['BufferCount']),
|
|
extrap_time=float(row['ExtrapolationTime']),
|
|
shot_fired=int(row.get('ShotFired', 0)) == 1,
|
|
))
|
|
return frames
|
|
|
|
|
|
# --- Vector math helpers ---
|
|
|
|
def vec_dist(a, b):
|
|
return math.sqrt(sum((ai - bi) ** 2 for ai, bi in zip(a, b)))
|
|
|
|
|
|
def vec_sub(a, b):
|
|
return tuple(ai - bi for ai, bi in zip(a, b))
|
|
|
|
|
|
def vec_add(a, b):
|
|
return tuple(ai + bi for ai, bi in zip(a, b))
|
|
|
|
|
|
def vec_scale(a, s):
|
|
return tuple(ai * s for ai in a)
|
|
|
|
|
|
def vec_len(a):
|
|
return math.sqrt(sum(ai * ai for ai in a))
|
|
|
|
|
|
def vec_normalize(a):
|
|
l = vec_len(a)
|
|
if l < 1e-10:
|
|
return (0, 0, 0)
|
|
return tuple(ai / l for ai in a)
|
|
|
|
|
|
def angle_between(a, b):
|
|
"""Angle in degrees between two direction vectors."""
|
|
dot = sum(ai * bi for ai, bi in zip(a, b))
|
|
dot = max(-1.0, min(1.0, dot))
|
|
return math.degrees(math.acos(dot))
|
|
|
|
|
|
# --- Prediction error from recorded data ---
|
|
|
|
def compute_prediction_error(frames: List[Frame]) -> dict:
|
|
"""Compute error between predicted and actual (real) positions/aims."""
|
|
pos_errors = []
|
|
aim_errors = []
|
|
|
|
for f in frames:
|
|
pos_err = vec_dist(f.pred_pos, f.real_pos)
|
|
pos_errors.append(pos_err)
|
|
|
|
aim_a = vec_normalize(f.pred_aim)
|
|
aim_b = vec_normalize(f.real_aim)
|
|
if vec_len(aim_a) > 0.5 and vec_len(aim_b) > 0.5:
|
|
aim_err = angle_between(aim_a, aim_b)
|
|
aim_errors.append(aim_err)
|
|
|
|
if not pos_errors:
|
|
return {'pos_mean': 0, 'pos_p95': 0, 'pos_max': 0, 'aim_mean': 0, 'aim_p95': 0, 'aim_max': 0}
|
|
|
|
pos_errors.sort()
|
|
aim_errors.sort()
|
|
|
|
p95_idx_pos = int(len(pos_errors) * 0.95)
|
|
p95_idx_aim = int(len(aim_errors) * 0.95) if aim_errors else 0
|
|
|
|
return {
|
|
'pos_mean': sum(pos_errors) / len(pos_errors),
|
|
'pos_p95': pos_errors[min(p95_idx_pos, len(pos_errors) - 1)],
|
|
'pos_max': pos_errors[-1],
|
|
'aim_mean': sum(aim_errors) / len(aim_errors) if aim_errors else 0,
|
|
'aim_p95': aim_errors[min(p95_idx_aim, len(aim_errors) - 1)] if aim_errors else 0,
|
|
'aim_max': aim_errors[-1] if aim_errors else 0,
|
|
}
|
|
|
|
|
|
# --- Shot contamination analysis ---
|
|
|
|
def analyze_shot_contamination(frames: List[Frame], analysis_window_ms: float = 200.0):
|
|
"""
|
|
Analyze how shots contaminate the tracking data.
|
|
For each shot, measure the velocity/acceleration spike and how long it takes
|
|
to return to baseline. This tells us the minimum discard_time needed.
|
|
|
|
Returns a dict with analysis results, or None if no shots found.
|
|
"""
|
|
shot_indices = [i for i, f in enumerate(frames) if f.shot_fired]
|
|
if not shot_indices:
|
|
return None
|
|
|
|
analysis_window_s = analysis_window_ms / 1000.0
|
|
|
|
# Compute per-frame speeds
|
|
speeds = [0.0]
|
|
for i in range(1, len(frames)):
|
|
dt = frames[i].timestamp - frames[i - 1].timestamp
|
|
if dt > 1e-6:
|
|
d = vec_dist(frames[i].real_pos, frames[i - 1].real_pos)
|
|
speeds.append(d / dt)
|
|
else:
|
|
speeds.append(speeds[-1] if speeds else 0.0)
|
|
|
|
# For each shot, measure the speed profile before and after
|
|
contamination_durations = []
|
|
speed_spikes = []
|
|
|
|
for si in shot_indices:
|
|
# Baseline speed: average speed in 100ms BEFORE the shot
|
|
baseline_speeds = []
|
|
for j in range(si - 1, -1, -1):
|
|
if frames[si].timestamp - frames[j].timestamp > 0.1:
|
|
break
|
|
baseline_speeds.append(speeds[j])
|
|
|
|
if not baseline_speeds:
|
|
continue
|
|
|
|
baseline_mean = sum(baseline_speeds) / len(baseline_speeds)
|
|
baseline_std = math.sqrt(sum((s - baseline_mean) ** 2 for s in baseline_speeds) / len(baseline_speeds)) if len(baseline_speeds) > 1 else baseline_mean * 0.1
|
|
|
|
# Threshold: speed is "contaminated" if it deviates by more than 3 sigma from baseline
|
|
threshold = baseline_mean + max(3.0 * baseline_std, 10.0) # at least 10 cm/s spike
|
|
|
|
# Find how long after the shot the speed stays above threshold
|
|
max_speed = 0.0
|
|
last_contaminated_time = 0.0
|
|
for j in range(si, len(frames)):
|
|
dt_from_shot = frames[j].timestamp - frames[si].timestamp
|
|
if dt_from_shot > analysis_window_s:
|
|
break
|
|
if speeds[j] > threshold:
|
|
last_contaminated_time = dt_from_shot
|
|
if speeds[j] > max_speed:
|
|
max_speed = speeds[j]
|
|
|
|
contamination_durations.append(last_contaminated_time * 1000.0) # in ms
|
|
speed_spikes.append(max_speed - baseline_mean)
|
|
|
|
if not contamination_durations:
|
|
return None
|
|
|
|
contamination_durations.sort()
|
|
return {
|
|
'num_shots': len(shot_indices),
|
|
'contamination_mean_ms': sum(contamination_durations) / len(contamination_durations),
|
|
'contamination_p95_ms': contamination_durations[int(len(contamination_durations) * 0.95)],
|
|
'contamination_max_ms': contamination_durations[-1],
|
|
'speed_spike_mean': sum(speed_spikes) / len(speed_spikes) if speed_spikes else 0,
|
|
'speed_spike_max': max(speed_spikes) if speed_spikes else 0,
|
|
'recommended_discard_ms': math.ceil(contamination_durations[int(len(contamination_durations) * 0.95)] / 5.0) * 5.0, # round up to 5ms
|
|
}
|
|
|
|
|
|
# --- Offline adaptive extrapolation simulation (matches C++ exactly) ---
|
|
|
|
def simulate_adaptive(frames: List[Frame], params: AdaptiveParams) -> Tuple[List[float], List[float]]:
|
|
"""
|
|
Simulate the adaptive extrapolation offline with given parameters.
|
|
Matches the C++ PredictAdaptiveExtrapolation algorithm exactly.
|
|
Optimized for speed: pre-extracts arrays, inlines math, avoids allocations.
|
|
"""
|
|
pos_errors = []
|
|
aim_errors = []
|
|
|
|
n_frames = len(frames)
|
|
if n_frames < 4:
|
|
return pos_errors, aim_errors
|
|
|
|
# Pre-extract into flat arrays for speed
|
|
ts = [f.timestamp for f in frames]
|
|
px = [f.real_pos[0] for f in frames]
|
|
py = [f.real_pos[1] for f in frames]
|
|
pz = [f.real_pos[2] for f in frames]
|
|
ax = [f.real_aim[0] for f in frames]
|
|
ay = [f.real_aim[1] for f in frames]
|
|
az = [f.real_aim[2] for f in frames]
|
|
|
|
buffer_s = params.buffer_time_ms / 1000.0
|
|
discard_s = params.discard_time_ms / 1000.0
|
|
sensitivity = params.sensitivity
|
|
dead_zone = params.dead_zone
|
|
min_speed = params.min_speed
|
|
damping = params.damping
|
|
SMALL = 1e-10
|
|
_sqrt = math.sqrt
|
|
_exp = math.exp
|
|
_acos = math.acos
|
|
_degrees = math.degrees
|
|
_pow = pow
|
|
|
|
for i in range(2, n_frames - 1):
|
|
ct = ts[i]
|
|
safe_cutoff = ct - discard_s
|
|
oldest_allowed = ct - buffer_s
|
|
|
|
# Collect safe sample indices (backward scan, then reverse)
|
|
safe = []
|
|
for j in range(i, -1, -1):
|
|
t = ts[j]
|
|
if t < oldest_allowed:
|
|
break
|
|
if t <= safe_cutoff:
|
|
safe.append(j)
|
|
safe.reverse()
|
|
|
|
ns = len(safe)
|
|
if ns < 2:
|
|
continue
|
|
|
|
# Build velocity pairs inline
|
|
vpx = []; vpy = []; vpz = []
|
|
vax = []; vay = []; vaz = []
|
|
for k in range(1, ns):
|
|
p, c = safe[k - 1], safe[k]
|
|
dt = ts[c] - ts[p]
|
|
if dt > 1e-6:
|
|
inv_dt = 1.0 / dt
|
|
vpx.append((px[c] - px[p]) * inv_dt)
|
|
vpy.append((py[c] - py[p]) * inv_dt)
|
|
vpz.append((pz[c] - pz[p]) * inv_dt)
|
|
vax.append((ax[c] - ax[p]) * inv_dt)
|
|
vay.append((ay[c] - ay[p]) * inv_dt)
|
|
vaz.append((az[c] - az[p]) * inv_dt)
|
|
|
|
nv = len(vpx)
|
|
if nv < 2:
|
|
continue
|
|
|
|
# Weighted average velocity (quadratic weights, oldest=index 0)
|
|
tw = 0.0
|
|
apx = apy = apz = 0.0
|
|
aax = aay = aaz = 0.0
|
|
for k in range(nv):
|
|
w = (k + 1) * (k + 1)
|
|
apx += vpx[k] * w; apy += vpy[k] * w; apz += vpz[k] * w
|
|
aax += vax[k] * w; aay += vay[k] * w; aaz += vaz[k] * w
|
|
tw += w
|
|
inv_tw = 1.0 / tw
|
|
apx *= inv_tw; apy *= inv_tw; apz *= inv_tw
|
|
aax *= inv_tw; aay *= inv_tw; aaz *= inv_tw
|
|
|
|
# Recent velocity (last 25%, unweighted)
|
|
rs = max(0, nv - max(1, nv // 4))
|
|
rc = nv - rs
|
|
rpx = rpy = rpz = 0.0
|
|
rax = ray = raz = 0.0
|
|
for k in range(rs, nv):
|
|
rpx += vpx[k]; rpy += vpy[k]; rpz += vpz[k]
|
|
rax += vax[k]; ray += vay[k]; raz += vaz[k]
|
|
inv_rc = 1.0 / rc
|
|
rpx *= inv_rc; rpy *= inv_rc; rpz *= inv_rc
|
|
rax *= inv_rc; ray *= inv_rc; raz *= inv_rc
|
|
|
|
avg_ps = _sqrt(apx*apx + apy*apy + apz*apz)
|
|
avg_as = _sqrt(aax*aax + aay*aay + aaz*aaz)
|
|
rec_ps = _sqrt(rpx*rpx + rpy*rpy + rpz*rpz)
|
|
rec_as = _sqrt(rax*rax + ray*ray + raz*raz)
|
|
|
|
# Position confidence
|
|
pc = 1.0
|
|
if avg_ps > min_speed:
|
|
ratio = rec_ps / avg_ps
|
|
if ratio > 1.0: ratio = 1.0
|
|
if ratio < dead_zone:
|
|
rm = ratio / dead_zone if dead_zone > SMALL else 0.0
|
|
if rm > 1.0: rm = 1.0
|
|
pc = _pow(rm, sensitivity)
|
|
|
|
# Aim confidence
|
|
ac = 1.0
|
|
if avg_as > min_speed:
|
|
ratio = rec_as / avg_as
|
|
if ratio > 1.0: ratio = 1.0
|
|
if ratio < dead_zone:
|
|
rm = ratio / dead_zone if dead_zone > SMALL else 0.0
|
|
if rm > 1.0: rm = 1.0
|
|
ac = _pow(rm, sensitivity)
|
|
|
|
# Extrapolation time
|
|
lsi = safe[-1]
|
|
edt = ct - ts[lsi]
|
|
if edt <= 0: edt = 0.011
|
|
|
|
# Damping
|
|
ds = _exp(-damping * edt) if damping > 0.0 else 1.0
|
|
|
|
# Predict
|
|
m = edt * pc * ds
|
|
ppx = px[lsi] + apx * m
|
|
ppy = py[lsi] + apy * m
|
|
ppz = pz[lsi] + apz * m
|
|
|
|
ma = edt * ac * ds
|
|
pax_r = ax[lsi] + aax * ma
|
|
pay_r = ay[lsi] + aay * ma
|
|
paz_r = az[lsi] + aaz * ma
|
|
pa_len = _sqrt(pax_r*pax_r + pay_r*pay_r + paz_r*paz_r)
|
|
|
|
# Position error
|
|
dx = ppx - px[i]; dy = ppy - py[i]; dz = ppz - pz[i]
|
|
pos_errors.append(_sqrt(dx*dx + dy*dy + dz*dz))
|
|
|
|
# Aim error
|
|
if pa_len > 0.5:
|
|
inv_pa = 1.0 / pa_len
|
|
pax_n = pax_r * inv_pa; pay_n = pay_r * inv_pa; paz_n = paz_r * inv_pa
|
|
ra_len = _sqrt(ax[i]*ax[i] + ay[i]*ay[i] + az[i]*az[i])
|
|
if ra_len > 0.5:
|
|
inv_ra = 1.0 / ra_len
|
|
dot = pax_n * ax[i] * inv_ra + pay_n * ay[i] * inv_ra + paz_n * az[i] * inv_ra
|
|
if dot > 1.0: dot = 1.0
|
|
if dot < -1.0: dot = -1.0
|
|
aim_errors.append(_degrees(_acos(dot)))
|
|
|
|
return pos_errors, aim_errors
|
|
|
|
|
|
# --- Scoring ---
|
|
|
|
def compute_score(pos_errors: List[float], aim_errors: List[float]) -> ScoreResult:
|
|
"""Compute a combined score from position and aim errors, including stability metrics."""
|
|
if not pos_errors:
|
|
return ScoreResult(0, 0, 0, 0, 0, 0, float('inf'))
|
|
|
|
pos_sorted = sorted(pos_errors)
|
|
aim_sorted = sorted(aim_errors) if aim_errors else [0]
|
|
|
|
pos_mean = sum(pos_errors) / len(pos_errors)
|
|
pos_p95 = pos_sorted[int(len(pos_sorted) * 0.95)]
|
|
aim_mean = sum(aim_errors) / len(aim_errors) if aim_errors else 0
|
|
aim_p95 = aim_sorted[int(len(aim_sorted) * 0.95)] if aim_errors else 0
|
|
|
|
# Jitter: standard deviation of frame-to-frame error change
|
|
jitter = 0.0
|
|
if len(pos_errors) > 1:
|
|
deltas = [abs(pos_errors[i] - pos_errors[i - 1]) for i in range(1, len(pos_errors))]
|
|
delta_mean = sum(deltas) / len(deltas)
|
|
jitter = math.sqrt(sum((d - delta_mean) ** 2 for d in deltas) / len(deltas))
|
|
|
|
# Overshoot: percentage of frames where error spikes above 2x mean
|
|
overshoot = 0.0
|
|
if pos_mean > 0:
|
|
overshoot_count = sum(1 for e in pos_errors if e > 2.0 * pos_mean)
|
|
overshoot = overshoot_count / len(pos_errors)
|
|
|
|
# Combined score
|
|
score = (pos_mean * 0.25 + pos_p95 * 0.15 +
|
|
aim_mean * 0.25 + aim_p95 * 0.15 +
|
|
jitter * 0.10 + overshoot * 0.10)
|
|
|
|
return ScoreResult(pos_mean, pos_p95, aim_mean, aim_p95, jitter, overshoot, score)
|
|
|
|
|
|
def aggregate_scores(per_file_scores: List[Tuple[str, ScoreResult]],
|
|
strategy: str = "mean") -> float:
|
|
"""Aggregate scores across multiple files."""
|
|
scores = [s.score for _, s in per_file_scores]
|
|
if not scores:
|
|
return float('inf')
|
|
if strategy == "worst_case":
|
|
return max(scores)
|
|
else: # mean
|
|
return sum(scores) / len(scores)
|
|
|
|
|
|
# --- Optimizer ---
|
|
|
|
def objective(x, all_frames, strategy):
|
|
"""Objective function for the optimizer."""
|
|
params = AdaptiveParams(
|
|
sensitivity=x[0],
|
|
dead_zone=x[1],
|
|
min_speed=x[2],
|
|
damping=x[3],
|
|
buffer_time_ms=x[4],
|
|
discard_time_ms=x[5]
|
|
)
|
|
per_file_scores = []
|
|
for name, frames in all_frames:
|
|
pos_errors, aim_errors = simulate_adaptive(frames, params)
|
|
score_result = compute_score(pos_errors, aim_errors)
|
|
per_file_scores.append((name, score_result))
|
|
return aggregate_scores(per_file_scores, strategy)
|
|
|
|
|
|
def optimize_differential_evolution(all_frames, strategy="mean", max_iter=200, min_discard_ms=10.0):
|
|
"""Find optimal parameters using scipy differential evolution."""
|
|
try:
|
|
from scipy.optimize import differential_evolution
|
|
except ImportError:
|
|
print("ERROR: scipy is required for optimization.")
|
|
print("Install with: pip install scipy")
|
|
sys.exit(1)
|
|
|
|
bounds = [
|
|
(0.1, 5.0), # sensitivity
|
|
(0.0, 0.95), # dead_zone
|
|
(0.0, 200.0), # min_speed
|
|
(0.0, 50.0), # damping
|
|
(100.0, 500.0), # buffer_time_ms
|
|
(max(10.0, min_discard_ms), 100.0), # discard_time_ms (floor from contamination analysis)
|
|
]
|
|
|
|
print(f"\nRunning differential evolution (maxiter={max_iter}, popsize=25, min_discard={min_discard_ms:.0f}ms)...")
|
|
print("This may take a few minutes...\n")
|
|
|
|
result = differential_evolution(
|
|
objective,
|
|
bounds,
|
|
args=(all_frames, strategy),
|
|
maxiter=max_iter,
|
|
seed=42,
|
|
tol=1e-4,
|
|
popsize=25,
|
|
disp=True,
|
|
workers=1
|
|
)
|
|
|
|
best_params = AdaptiveParams(
|
|
sensitivity=round(result.x[0], 2),
|
|
dead_zone=round(result.x[1], 3),
|
|
min_speed=round(result.x[2], 1),
|
|
damping=round(result.x[3], 1),
|
|
buffer_time_ms=round(result.x[4], 0),
|
|
discard_time_ms=round(result.x[5], 0)
|
|
)
|
|
return best_params, result.fun
|
|
|
|
|
|
def optimize_grid_search(all_frames, strategy="mean", min_discard_ms=10.0):
|
|
"""Find optimal parameters using grid search (slower but no scipy needed)."""
|
|
print(f"\nRunning grid search over 6 parameters (min_discard={min_discard_ms:.0f}ms)...")
|
|
|
|
sensitivities = [1.0, 2.0, 3.0, 4.0]
|
|
dead_zones = [0.7, 0.8, 0.9]
|
|
min_speeds = [0.0, 30.0]
|
|
dampings = [5.0, 10.0, 15.0]
|
|
buffer_times = [300.0, 400.0, 500.0, 600.0, 800.0]
|
|
discard_times = [d for d in [20.0, 40.0, 60.0, 100.0, 150.0, 200.0] if d >= min_discard_ms]
|
|
if not discard_times:
|
|
discard_times = [min_discard_ms]
|
|
|
|
total = (len(sensitivities) * len(dead_zones) * len(min_speeds) *
|
|
len(dampings) * len(buffer_times) * len(discard_times))
|
|
print(f"Total combinations: {total}")
|
|
|
|
best_score = float('inf')
|
|
best_params = AdaptiveParams()
|
|
count = 0
|
|
|
|
for sens in sensitivities:
|
|
for dz in dead_zones:
|
|
for ms in min_speeds:
|
|
for damp in dampings:
|
|
for bt in buffer_times:
|
|
for dt in discard_times:
|
|
count += 1
|
|
if count % 500 == 0:
|
|
print(f" Progress: {count}/{total} ({100 * count / total:.0f}%) best={best_score:.4f}")
|
|
|
|
params = AdaptiveParams(sens, dz, ms, damp, bt, dt)
|
|
per_file_scores = []
|
|
for name, frames in all_frames:
|
|
pos_errors, aim_errors = simulate_adaptive(frames, params)
|
|
score_result = compute_score(pos_errors, aim_errors)
|
|
per_file_scores.append((name, score_result))
|
|
|
|
score = aggregate_scores(per_file_scores, strategy)
|
|
if score < best_score:
|
|
best_score = score
|
|
best_params = params
|
|
|
|
return best_params, best_score
|
|
|
|
|
|
# --- Main ---
|
|
|
|
def print_file_stats(name: str, frames: List[Frame]):
|
|
"""Print basic stats for a CSV file."""
|
|
duration = frames[-1].timestamp - frames[0].timestamp
|
|
avg_fps = len(frames) / duration if duration > 0 else 0
|
|
avg_safe = sum(f.safe_count for f in frames) / len(frames)
|
|
avg_buffer = sum(f.buffer_count for f in frames) / len(frames)
|
|
avg_extrap = sum(f.extrap_time for f in frames) / len(frames) * 1000
|
|
num_shots = sum(1 for f in frames if f.shot_fired)
|
|
print(f" {os.path.basename(name)}: {len(frames)} frames, {avg_fps:.0f}fps, "
|
|
f"{duration:.1f}s, safe={avg_safe:.1f}, extrap={avg_extrap:.1f}ms, shots={num_shots}")
|
|
|
|
|
|
def print_score_detail(name: str, score: ScoreResult):
|
|
"""Print detailed score for a file."""
|
|
print(f" {os.path.basename(name):30s} Pos: mean={score.pos_mean:.3f}cm p95={score.pos_p95:.3f}cm | "
|
|
f"Aim: mean={score.aim_mean:.3f}deg p95={score.aim_p95:.3f}deg | "
|
|
f"jitter={score.jitter:.3f} overshoot={score.overshoot:.1%} | "
|
|
f"score={score.score:.4f}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Anti-Recoil Parameter Optimizer - finds optimal AdaptiveExtrapolation parameters"
|
|
)
|
|
parser.add_argument("csv_files", nargs="+", help="One or more CSV recording files")
|
|
parser.add_argument("--plot", action="store_true", help="Generate comparison plots (requires matplotlib)")
|
|
parser.add_argument("--grid", action="store_true", help="Use grid search instead of differential evolution")
|
|
parser.add_argument("--strategy", choices=["mean", "worst_case"], default="mean",
|
|
help="Multi-file score aggregation strategy (default: mean)")
|
|
parser.add_argument("--max-iter", type=int, default=200, help="Max optimizer iterations (default: 200)")
|
|
args = parser.parse_args()
|
|
|
|
# Load all CSV files
|
|
all_frames = []
|
|
for csv_path in args.csv_files:
|
|
if not os.path.exists(csv_path):
|
|
print(f"Error: File not found: {csv_path}")
|
|
sys.exit(1)
|
|
frames = load_csv(csv_path)
|
|
if len(frames) < 50:
|
|
print(f"Warning: {csv_path} has only {len(frames)} frames (need at least 50 for good results)")
|
|
all_frames.append((csv_path, frames))
|
|
|
|
print(f"\nLoaded {len(all_frames)} file(s)")
|
|
print("=" * 70)
|
|
|
|
# Per-file stats
|
|
print("\n=== FILE STATISTICS ===")
|
|
for name, frames in all_frames:
|
|
print_file_stats(name, frames)
|
|
|
|
# Shot contamination analysis
|
|
has_shots = any(any(f.shot_fired for f in frames) for _, frames in all_frames)
|
|
if has_shots:
|
|
print("\n=== SHOT CONTAMINATION ANALYSIS ===")
|
|
max_recommended_discard = 0.0
|
|
for name, frames in all_frames:
|
|
result = analyze_shot_contamination(frames)
|
|
if result:
|
|
print(f" {os.path.basename(name)}:")
|
|
print(f" Shots detected: {result['num_shots']}")
|
|
print(f" Speed spike: mean={result['speed_spike_mean']:.1f} cm/s, max={result['speed_spike_max']:.1f} cm/s")
|
|
print(f" Contamination duration: mean={result['contamination_mean_ms']:.1f}ms, "
|
|
f"p95={result['contamination_p95_ms']:.1f}ms, max={result['contamination_max_ms']:.1f}ms")
|
|
print(f" Recommended discard_time: >= {result['recommended_discard_ms']:.0f}ms")
|
|
max_recommended_discard = max(max_recommended_discard, result['recommended_discard_ms'])
|
|
else:
|
|
print(f" {os.path.basename(name)}: no shots detected")
|
|
|
|
if max_recommended_discard > 0:
|
|
print(f"\n >>> MINIMUM SAFE DiscardTime across all files: {max_recommended_discard:.0f}ms <<<")
|
|
else:
|
|
print("\n (No ShotFired data in CSV - record with updated plugin to get contamination analysis)")
|
|
|
|
# Baseline: current default parameters
|
|
default_params = AdaptiveParams()
|
|
print(f"\n=== BASELINE (defaults: sens={default_params.sensitivity}, dz={default_params.dead_zone}, "
|
|
f"minspd={default_params.min_speed}, damp={default_params.damping}, "
|
|
f"buf={default_params.buffer_time_ms}ms, disc={default_params.discard_time_ms}ms) ===")
|
|
|
|
baseline_scores = []
|
|
for name, frames in all_frames:
|
|
pos_errors, aim_errors = simulate_adaptive(frames, default_params)
|
|
score = compute_score(pos_errors, aim_errors)
|
|
baseline_scores.append((name, score))
|
|
print_score_detail(name, score)
|
|
|
|
baseline_agg = aggregate_scores(baseline_scores, args.strategy)
|
|
print(f"\n Aggregate score ({args.strategy}): {baseline_agg:.4f}")
|
|
|
|
# Also show recorded prediction error (as-is from the engine)
|
|
print(f"\n=== RECORDED PREDICTION ERROR (as captured in-engine) ===")
|
|
for name, frames in all_frames:
|
|
err = compute_prediction_error(frames)
|
|
print(f" {os.path.basename(name):30s} Pos: mean={err['pos_mean']:.3f}cm p95={err['pos_p95']:.3f}cm | "
|
|
f"Aim: mean={err['aim_mean']:.3f}deg p95={err['aim_p95']:.3f}deg")
|
|
|
|
# Compute minimum safe discard time from shot contamination analysis
|
|
min_discard_ms = 10.0 # absolute minimum
|
|
if has_shots:
|
|
for name, frames in all_frames:
|
|
result = analyze_shot_contamination(frames)
|
|
if result and result['recommended_discard_ms'] > min_discard_ms:
|
|
min_discard_ms = result['recommended_discard_ms']
|
|
|
|
# Optimize
|
|
print(f"\n=== OPTIMIZATION ({args.strategy}) ===")
|
|
|
|
if args.grid:
|
|
best_params, best_score = optimize_grid_search(all_frames, args.strategy, min_discard_ms)
|
|
else:
|
|
best_params, best_score = optimize_differential_evolution(all_frames, args.strategy, args.max_iter, min_discard_ms)
|
|
|
|
# Results
|
|
print(f"\n{'=' * 70}")
|
|
print(f" BEST PARAMETERS FOUND:")
|
|
print(f" AdaptiveSensitivity = {best_params.sensitivity}")
|
|
print(f" AdaptiveDeadZone = {best_params.dead_zone}")
|
|
print(f" AdaptiveMinSpeed = {best_params.min_speed}")
|
|
print(f" ExtrapolationDamping = {best_params.damping}")
|
|
print(f" AntiRecoilBufferTimeMs = {best_params.buffer_time_ms}")
|
|
print(f" AntiRecoilDiscardTimeMs= {best_params.discard_time_ms}")
|
|
print(f"{'=' * 70}")
|
|
|
|
# Per-file breakdown with optimized params
|
|
print(f"\n=== OPTIMIZED RESULTS ===")
|
|
opt_scores = []
|
|
for name, frames in all_frames:
|
|
pos_errors, aim_errors = simulate_adaptive(frames, best_params)
|
|
score = compute_score(pos_errors, aim_errors)
|
|
opt_scores.append((name, score))
|
|
print_score_detail(name, score)
|
|
|
|
opt_agg = aggregate_scores(opt_scores, args.strategy)
|
|
print(f"\n Aggregate score ({args.strategy}): {opt_agg:.4f}")
|
|
|
|
# Improvement
|
|
print(f"\n=== IMPROVEMENT vs BASELINE ===")
|
|
for (name, baseline), (_, optimized) in zip(baseline_scores, opt_scores):
|
|
pos_pct = ((baseline.pos_mean - optimized.pos_mean) / baseline.pos_mean * 100) if baseline.pos_mean > 0 else 0
|
|
aim_pct = ((baseline.aim_mean - optimized.aim_mean) / baseline.aim_mean * 100) if baseline.aim_mean > 0 else 0
|
|
score_pct = ((baseline.score - optimized.score) / baseline.score * 100) if baseline.score > 0 else 0
|
|
print(f" {os.path.basename(name):30s} Pos: {pos_pct:+.1f}% | Aim: {aim_pct:+.1f}% | Score: {score_pct:+.1f}%")
|
|
|
|
total_pct = ((baseline_agg - opt_agg) / baseline_agg * 100) if baseline_agg > 0 else 0
|
|
print(f" {'TOTAL':30s} Score: {total_pct:+.1f}%")
|
|
|
|
# Plotting
|
|
if args.plot:
|
|
try:
|
|
import matplotlib.pyplot as plt
|
|
|
|
n_files = len(all_frames)
|
|
fig, axes = plt.subplots(n_files, 3, figsize=(18, 5 * n_files), squeeze=False)
|
|
|
|
for row, (name, frames) in enumerate(all_frames):
|
|
timestamps = [f.timestamp - frames[0].timestamp for f in frames]
|
|
short_name = os.path.basename(name)
|
|
|
|
# Baseline errors
|
|
bl_pos, bl_aim = simulate_adaptive(frames, default_params)
|
|
# Optimized errors
|
|
op_pos, op_aim = simulate_adaptive(frames, best_params)
|
|
|
|
# Time axis for simulated errors (offset by window_size)
|
|
t_start = window_size = 12
|
|
sim_timestamps = [frames[i].timestamp - frames[0].timestamp
|
|
for i in range(t_start + 1, t_start + 1 + len(bl_pos))]
|
|
|
|
# Position error
|
|
ax = axes[row][0]
|
|
if len(sim_timestamps) == len(bl_pos):
|
|
ax.plot(sim_timestamps, bl_pos, 'r-', alpha=0.4, linewidth=0.5, label='Baseline')
|
|
ax.plot(sim_timestamps, op_pos, 'g-', alpha=0.4, linewidth=0.5, label='Optimized')
|
|
ax.set_ylabel('Position Error (cm)')
|
|
ax.set_title(f'{short_name} - Position Error')
|
|
ax.legend()
|
|
|
|
# Aim error
|
|
ax = axes[row][1]
|
|
if len(sim_timestamps) >= len(bl_aim):
|
|
t_aim = sim_timestamps[:len(bl_aim)]
|
|
ax.plot(t_aim, bl_aim, 'r-', alpha=0.4, linewidth=0.5, label='Baseline')
|
|
if len(sim_timestamps) >= len(op_aim):
|
|
t_aim = sim_timestamps[:len(op_aim)]
|
|
ax.plot(t_aim, op_aim, 'g-', alpha=0.4, linewidth=0.5, label='Optimized')
|
|
ax.set_ylabel('Aim Error (deg)')
|
|
ax.set_title(f'{short_name} - Aim Error')
|
|
ax.legend()
|
|
|
|
# Speed profile
|
|
ax = axes[row][2]
|
|
speeds = [0]
|
|
for i in range(1, len(frames)):
|
|
dt = frames[i].timestamp - frames[i - 1].timestamp
|
|
if dt > 1e-6:
|
|
d = vec_dist(frames[i].real_pos, frames[i - 1].real_pos)
|
|
speeds.append(d / dt)
|
|
else:
|
|
speeds.append(speeds[-1])
|
|
ax.plot(timestamps, speeds, 'b-', alpha=0.7, linewidth=0.5)
|
|
ax.set_ylabel('Speed (cm/s)')
|
|
ax.set_xlabel('Time (s)')
|
|
ax.set_title(f'{short_name} - Speed Profile')
|
|
|
|
plt.tight_layout()
|
|
plot_path = args.csv_files[0].replace('.csv', '_optimizer.png')
|
|
plt.savefig(plot_path, dpi=150)
|
|
print(f"\nPlot saved: {plot_path}")
|
|
plt.show()
|
|
|
|
except ImportError:
|
|
print("\nmatplotlib not installed. Install with: pip install matplotlib")
|
|
|
|
print("\nDone.")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|