PS_Ballistics/Tools/analyze_antirecoil.py
j.foucher af723c944b Rework adaptive extrapolation: deceleration detection + dead zone + debug HUD
Replace variance-based confidence (caused constant lag) with targeted
deceleration detection: compares recent speed (last 25% of safe window)
to average speed. During steady movement ratio≈1 → zero lag.
Only reduces extrapolation when actual braking is detected.

- AdaptiveSensitivity: now a power exponent (0.1-5.0, default 1.0)
- AdaptiveDeadZone: new parameter (default 0.8) to ignore normal
  speed fluctuations and only react to real deceleration
- DebugAntiRecoilHUD: real-time display of ratio, confidence, speeds, errors
- EndPlay: auto-close CSV file when stopping play (no more locked files)
- Python script updated to match new deceleration-based algorithm

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-16 18:25:54 +01:00

362 lines
13 KiB
Python

"""
Anti-Recoil Prediction Analyzer
================================
Reads CSV files recorded by the EBBarrel CSV recording feature and finds
optimal AdaptiveSensitivity and scaling factor for the Adaptive Extrapolation mode.
Usage:
python analyze_antirecoil.py <path_to_csv>
python analyze_antirecoil.py <path_to_csv> --plot (requires matplotlib)
The script:
1. Loads per-frame data (real position/aim vs predicted position/aim)
2. Computes prediction error at each frame
3. Simulates different AdaptiveSensitivity values offline
4. Finds the value that minimizes total prediction error
5. Reports recommended parameters
"""
import csv
import sys
import math
import os
from dataclasses import dataclass
from typing import List, Tuple
@dataclass
class Frame:
timestamp: float
real_pos: Tuple[float, float, float]
real_aim: Tuple[float, float, float]
pred_pos: Tuple[float, float, float]
pred_aim: Tuple[float, float, float]
safe_count: int
buffer_count: int
extrap_time: float
def load_csv(path: str) -> List[Frame]:
frames = []
with open(path, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
frames.append(Frame(
timestamp=float(row['Timestamp']),
real_pos=(float(row['RealPosX']), float(row['RealPosY']), float(row['RealPosZ'])),
real_aim=(float(row['RealAimX']), float(row['RealAimY']), float(row['RealAimZ'])),
pred_pos=(float(row['PredPosX']), float(row['PredPosY']), float(row['PredPosZ'])),
pred_aim=(float(row['PredAimX']), float(row['PredAimY']), float(row['PredAimZ'])),
safe_count=int(row['SafeCount']),
buffer_count=int(row['BufferCount']),
extrap_time=float(row['ExtrapolationTime']),
))
return frames
def vec_dist(a, b):
return math.sqrt(sum((ai - bi) ** 2 for ai, bi in zip(a, b)))
def vec_sub(a, b):
return tuple(ai - bi for ai, bi in zip(a, b))
def vec_add(a, b):
return tuple(ai + bi for ai, bi in zip(a, b))
def vec_scale(a, s):
return tuple(ai * s for ai in a)
def vec_len(a):
return math.sqrt(sum(ai * ai for ai in a))
def vec_normalize(a):
l = vec_len(a)
if l < 1e-10:
return (0, 0, 0)
return tuple(ai / l for ai in a)
def angle_between(a, b):
"""Angle in degrees between two direction vectors."""
dot = sum(ai * bi for ai, bi in zip(a, b))
dot = max(-1.0, min(1.0, dot))
return math.degrees(math.acos(dot))
def compute_prediction_error(frames: List[Frame]) -> dict:
"""Compute error between predicted and actual (real) positions/aims."""
pos_errors = []
aim_errors = []
for f in frames:
pos_err = vec_dist(f.pred_pos, f.real_pos)
pos_errors.append(pos_err)
aim_a = vec_normalize(f.pred_aim)
aim_b = vec_normalize(f.real_aim)
if vec_len(aim_a) > 0.5 and vec_len(aim_b) > 0.5:
aim_err = angle_between(aim_a, aim_b)
aim_errors.append(aim_err)
if not pos_errors:
return {'pos_mean': 0, 'pos_p95': 0, 'pos_max': 0, 'aim_mean': 0, 'aim_p95': 0, 'aim_max': 0}
pos_errors.sort()
aim_errors.sort()
p95_idx_pos = int(len(pos_errors) * 0.95)
p95_idx_aim = int(len(aim_errors) * 0.95) if aim_errors else 0
return {
'pos_mean': sum(pos_errors) / len(pos_errors),
'pos_p95': pos_errors[min(p95_idx_pos, len(pos_errors) - 1)],
'pos_max': pos_errors[-1],
'aim_mean': sum(aim_errors) / len(aim_errors) if aim_errors else 0,
'aim_p95': aim_errors[min(p95_idx_aim, len(aim_errors) - 1)] if aim_errors else 0,
'aim_max': aim_errors[-1] if aim_errors else 0,
}
def simulate_adaptive(frames: List[Frame], sensitivity: float) -> List[Tuple[float, float]]:
"""
Simulate the adaptive extrapolation offline with given sensitivity.
Uses separate pos/aim confidences with relative variance (matching C++ code).
Returns list of (position_error, aim_error) per frame.
"""
pos_errors = []
aim_errors = []
# Sliding window for velocity estimation (matches C++ safe window ~18 samples)
window_size = 12
SMALL = 1e-10
for i in range(window_size + 1, len(frames) - 1):
# Build velocity pairs from recent real positions and aims
pos_vels = []
aim_vels = []
for j in range(1, min(window_size, i)):
dt = frames[i - j].timestamp - frames[i - j - 1].timestamp
if dt > 1e-6:
pos_vel = vec_scale(vec_sub(frames[i - j].real_pos, frames[i - j - 1].real_pos), 1.0 / dt)
aim_vel = vec_scale(vec_sub(frames[i - j].real_aim, frames[i - j - 1].real_aim), 1.0 / dt)
pos_vels.append(pos_vel)
aim_vels.append(aim_vel)
if len(pos_vels) < 4:
continue
n = len(pos_vels)
# Weighted average velocity (recent samples weighted more, matching C++)
total_w = 0.0
avg_pos_vel = (0.0, 0.0, 0.0)
avg_aim_vel = (0.0, 0.0, 0.0)
for k in range(n):
w = (k + 1) ** 2
avg_pos_vel = vec_add(avg_pos_vel, vec_scale(pos_vels[k], w))
avg_aim_vel = vec_add(avg_aim_vel, vec_scale(aim_vels[k], w))
total_w += w
avg_pos_vel = vec_scale(avg_pos_vel, 1.0 / total_w)
avg_aim_vel = vec_scale(avg_aim_vel, 1.0 / total_w)
# Deceleration detection: recent speed (last 25%) vs average speed
recent_start = max(0, n - max(1, n // 4))
recent_count = n - recent_start
recent_pos_vel = (0.0, 0.0, 0.0)
recent_aim_vel = (0.0, 0.0, 0.0)
for k in range(recent_start, n):
recent_pos_vel = vec_add(recent_pos_vel, pos_vels[k])
recent_aim_vel = vec_add(recent_aim_vel, aim_vels[k])
recent_pos_vel = vec_scale(recent_pos_vel, 1.0 / recent_count)
recent_aim_vel = vec_scale(recent_aim_vel, 1.0 / recent_count)
avg_pos_speed = vec_len(avg_pos_vel)
avg_aim_speed = vec_len(avg_aim_vel)
recent_pos_speed = vec_len(recent_pos_vel)
recent_aim_speed = vec_len(recent_aim_vel)
# Confidence = (recentSpeed / avgSpeed) ^ sensitivity
pos_confidence = 1.0
if avg_pos_speed > SMALL:
pos_ratio = min(recent_pos_speed / avg_pos_speed, 1.0)
pos_confidence = pos_ratio ** sensitivity
aim_confidence = 1.0
if avg_aim_speed > SMALL:
aim_ratio = min(recent_aim_speed / avg_aim_speed, 1.0)
aim_confidence = aim_ratio ** sensitivity
# Extrapolation time
extrap_dt = frames[i].extrap_time if frames[i].extrap_time > 0 else 0.011
# Predict from last safe position
pred_pos = vec_add(frames[i - 1].real_pos, vec_scale(avg_pos_vel, extrap_dt * pos_confidence))
pred_aim_raw = vec_add(frames[i - 1].real_aim, vec_scale(avg_aim_vel, extrap_dt * aim_confidence))
pred_aim = vec_normalize(pred_aim_raw)
# Errors
pos_errors.append(vec_dist(pred_pos, frames[i].real_pos))
real_aim_n = vec_normalize(frames[i].real_aim)
if vec_len(pred_aim) > 0.5 and vec_len(real_aim_n) > 0.5:
aim_errors.append(angle_between(pred_aim, real_aim_n))
return pos_errors, aim_errors
def find_optimal_parameters(frames: List[Frame]) -> dict:
"""Search for optimal AdaptiveSensitivity (power curve exponent for deceleration detection)."""
print("\nSearching for optimal AdaptiveSensitivity (power exponent)...")
print("-" * 60)
best_score = float('inf')
best_sensitivity = 1.0
# Search grid — exponent range 0.1 to 5.0
sensitivities = [0.1, 0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 2.0, 2.5, 3.0, 4.0, 5.0]
results = []
for sens in sensitivities:
pos_errors, aim_errors = simulate_adaptive(frames, sens)
if not pos_errors:
continue
pos_errors_s = sorted(pos_errors)
aim_errors_s = sorted(aim_errors) if aim_errors else [0]
pos_mean = sum(pos_errors) / len(pos_errors)
pos_p95 = pos_errors_s[int(len(pos_errors_s) * 0.95)]
aim_mean = sum(aim_errors) / len(aim_errors) if aim_errors else 0
aim_p95 = aim_errors_s[int(len(aim_errors_s) * 0.95)] if aim_errors else 0
# Score: combine position and aim errors (aim weighted more since it's the main issue)
score = pos_mean * 0.3 + pos_p95 * 0.2 + aim_mean * 0.3 + aim_p95 * 0.2
results.append((sens, pos_mean, pos_p95, aim_mean, aim_p95, score))
if score < best_score:
best_score = score
best_sensitivity = sens
# Print results
results.sort(key=lambda x: x[5])
print(f"\n{'Sensitivity':>12} {'Pos Mean':>10} {'Pos P95':>10} {'Aim Mean':>10} {'Aim P95':>10} {'Score':>10}")
print("-" * 65)
for sens, pm, pp, am, ap, score in results[:10]:
marker = " <-- BEST" if sens == best_sensitivity else ""
print(f"{sens:>12.2f} {pm:>10.3f} {pp:>10.3f} {am:>10.3f} {ap:>10.3f} {score:>10.3f}{marker}")
return {
'sensitivity': best_sensitivity,
'score': best_score,
}
def main():
if len(sys.argv) < 2:
print("Usage: python analyze_antirecoil.py <csv_file> [--plot]")
print("\nCSV files are saved by the EBBarrel component in:")
print(" <Project>/Saved/Logs/AntiRecoil_*.csv")
sys.exit(1)
csv_path = sys.argv[1]
do_plot = '--plot' in sys.argv
if not os.path.exists(csv_path):
print(f"Error: File not found: {csv_path}")
sys.exit(1)
print(f"Loading: {csv_path}")
frames = load_csv(csv_path)
print(f"Loaded {len(frames)} frames")
if len(frames) < 50:
print("Warning: very few frames. Record at least a few seconds of movement for good results.")
# Basic stats
duration = frames[-1].timestamp - frames[0].timestamp
avg_fps = len(frames) / duration if duration > 0 else 0
print(f"Duration: {duration:.1f}s | Avg FPS: {avg_fps:.1f}")
print(f"Avg safe samples: {sum(f.safe_count for f in frames) / len(frames):.1f}")
print(f"Avg buffer samples: {sum(f.buffer_count for f in frames) / len(frames):.1f}")
print(f"Avg extrapolation time: {sum(f.extrap_time for f in frames) / len(frames) * 1000:.1f}ms")
# Current prediction error (as recorded)
print("\n=== CURRENT PREDICTION ERROR (as recorded) ===")
current_err = compute_prediction_error(frames)
print(f"Position error - Mean: {current_err['pos_mean']:.3f}cm | P95: {current_err['pos_p95']:.3f}cm | Max: {current_err['pos_max']:.3f}cm")
print(f"Aim error - Mean: {current_err['aim_mean']:.3f}deg | P95: {current_err['aim_p95']:.3f}deg | Max: {current_err['aim_max']:.3f}deg")
# Find optimal parameters
print("\n=== PARAMETER OPTIMIZATION ===")
optimal = find_optimal_parameters(frames)
print(f"\n{'=' * 50}")
print(f" RECOMMENDED SETTINGS:")
print(f" AdaptiveSensitivity = {optimal['sensitivity']:.2f}")
print(f"{'=' * 50}")
# Plotting
if do_plot:
try:
import matplotlib.pyplot as plt
fig, axes = plt.subplots(3, 1, figsize=(14, 10), sharex=True)
timestamps = [f.timestamp - frames[0].timestamp for f in frames]
# Position error
pos_errors = [vec_dist(f.pred_pos, f.real_pos) for f in frames]
axes[0].plot(timestamps, pos_errors, 'r-', alpha=0.5, linewidth=0.5)
axes[0].set_ylabel('Position Error (cm)')
axes[0].set_title('Prediction Error Over Time')
axes[0].axhline(y=current_err['pos_mean'], color='r', linestyle='--', alpha=0.5, label=f"Mean: {current_err['pos_mean']:.2f}cm")
axes[0].legend()
# Aim error
aim_errors = []
for f in frames:
a = vec_normalize(f.pred_aim)
b = vec_normalize(f.real_aim)
if vec_len(a) > 0.5 and vec_len(b) > 0.5:
aim_errors.append(angle_between(a, b))
else:
aim_errors.append(0)
axes[1].plot(timestamps, aim_errors, 'b-', alpha=0.5, linewidth=0.5)
axes[1].set_ylabel('Aim Error (degrees)')
axes[1].axhline(y=current_err['aim_mean'], color='b', linestyle='--', alpha=0.5, label=f"Mean: {current_err['aim_mean']:.2f}deg")
axes[1].legend()
# Speed (from real positions)
speeds = [0]
for i in range(1, len(frames)):
dt = frames[i].timestamp - frames[i-1].timestamp
if dt > 1e-6:
d = vec_dist(frames[i].real_pos, frames[i-1].real_pos)
speeds.append(d / dt)
else:
speeds.append(speeds[-1])
axes[2].plot(timestamps, speeds, 'g-', alpha=0.7, linewidth=0.5)
axes[2].set_ylabel('Speed (cm/s)')
axes[2].set_xlabel('Time (s)')
plt.tight_layout()
plot_path = csv_path.replace('.csv', '_analysis.png')
plt.savefig(plot_path, dpi=150)
print(f"\nPlot saved: {plot_path}")
plt.show()
except ImportError:
print("\nmatplotlib not installed. Install with: pip install matplotlib")
print("\nDone.")
if __name__ == '__main__':
main()