Anti-recoil prediction improvements: - New ARM_AdaptiveExtrapolation mode: velocity variance-based confidence with separate pos/aim tracking and low-speed protection - New ARM_WeightedLinearRegression mode: preserves original simple linear fit - ARM_WeightedRegression upgraded to quadratic (y=a+bt+ct²) with linear/quadratic blend and velocity-reversal clamping - ExtrapolationDamping parameter (all modes): exp decay on extrapolated velocity - CSV recording (RecordPredictionCSV) for offline parameter tuning - Python analysis tool (Tools/analyze_antirecoil.py) to find optimal AdaptiveSensitivity from recorded data Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
366 lines
14 KiB
Python
366 lines
14 KiB
Python
"""
|
|
Anti-Recoil Prediction Analyzer
|
|
================================
|
|
Reads CSV files recorded by the EBBarrel CSV recording feature and finds
|
|
optimal AdaptiveSensitivity and scaling factor for the Adaptive Extrapolation mode.
|
|
|
|
Usage:
|
|
python analyze_antirecoil.py <path_to_csv>
|
|
python analyze_antirecoil.py <path_to_csv> --plot (requires matplotlib)
|
|
|
|
The script:
|
|
1. Loads per-frame data (real position/aim vs predicted position/aim)
|
|
2. Computes prediction error at each frame
|
|
3. Simulates different AdaptiveSensitivity values offline
|
|
4. Finds the value that minimizes total prediction error
|
|
5. Reports recommended parameters
|
|
"""
|
|
|
|
import csv
|
|
import sys
|
|
import math
|
|
import os
|
|
from dataclasses import dataclass
|
|
from typing import List, Tuple
|
|
|
|
|
|
@dataclass
|
|
class Frame:
|
|
timestamp: float
|
|
real_pos: Tuple[float, float, float]
|
|
real_aim: Tuple[float, float, float]
|
|
pred_pos: Tuple[float, float, float]
|
|
pred_aim: Tuple[float, float, float]
|
|
safe_count: int
|
|
buffer_count: int
|
|
extrap_time: float
|
|
|
|
|
|
def load_csv(path: str) -> List[Frame]:
|
|
frames = []
|
|
with open(path, 'r') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
frames.append(Frame(
|
|
timestamp=float(row['Timestamp']),
|
|
real_pos=(float(row['RealPosX']), float(row['RealPosY']), float(row['RealPosZ'])),
|
|
real_aim=(float(row['RealAimX']), float(row['RealAimY']), float(row['RealAimZ'])),
|
|
pred_pos=(float(row['PredPosX']), float(row['PredPosY']), float(row['PredPosZ'])),
|
|
pred_aim=(float(row['PredAimX']), float(row['PredAimY']), float(row['PredAimZ'])),
|
|
safe_count=int(row['SafeCount']),
|
|
buffer_count=int(row['BufferCount']),
|
|
extrap_time=float(row['ExtrapolationTime']),
|
|
))
|
|
return frames
|
|
|
|
|
|
def vec_dist(a, b):
|
|
return math.sqrt(sum((ai - bi) ** 2 for ai, bi in zip(a, b)))
|
|
|
|
|
|
def vec_sub(a, b):
|
|
return tuple(ai - bi for ai, bi in zip(a, b))
|
|
|
|
|
|
def vec_add(a, b):
|
|
return tuple(ai + bi for ai, bi in zip(a, b))
|
|
|
|
|
|
def vec_scale(a, s):
|
|
return tuple(ai * s for ai in a)
|
|
|
|
|
|
def vec_len(a):
|
|
return math.sqrt(sum(ai * ai for ai in a))
|
|
|
|
|
|
def vec_normalize(a):
|
|
l = vec_len(a)
|
|
if l < 1e-10:
|
|
return (0, 0, 0)
|
|
return tuple(ai / l for ai in a)
|
|
|
|
|
|
def angle_between(a, b):
|
|
"""Angle in degrees between two direction vectors."""
|
|
dot = sum(ai * bi for ai, bi in zip(a, b))
|
|
dot = max(-1.0, min(1.0, dot))
|
|
return math.degrees(math.acos(dot))
|
|
|
|
|
|
def compute_prediction_error(frames: List[Frame]) -> dict:
|
|
"""Compute error between predicted and actual (real) positions/aims."""
|
|
pos_errors = []
|
|
aim_errors = []
|
|
|
|
for f in frames:
|
|
pos_err = vec_dist(f.pred_pos, f.real_pos)
|
|
pos_errors.append(pos_err)
|
|
|
|
aim_a = vec_normalize(f.pred_aim)
|
|
aim_b = vec_normalize(f.real_aim)
|
|
if vec_len(aim_a) > 0.5 and vec_len(aim_b) > 0.5:
|
|
aim_err = angle_between(aim_a, aim_b)
|
|
aim_errors.append(aim_err)
|
|
|
|
if not pos_errors:
|
|
return {'pos_mean': 0, 'pos_p95': 0, 'pos_max': 0, 'aim_mean': 0, 'aim_p95': 0, 'aim_max': 0}
|
|
|
|
pos_errors.sort()
|
|
aim_errors.sort()
|
|
|
|
p95_idx_pos = int(len(pos_errors) * 0.95)
|
|
p95_idx_aim = int(len(aim_errors) * 0.95) if aim_errors else 0
|
|
|
|
return {
|
|
'pos_mean': sum(pos_errors) / len(pos_errors),
|
|
'pos_p95': pos_errors[min(p95_idx_pos, len(pos_errors) - 1)],
|
|
'pos_max': pos_errors[-1],
|
|
'aim_mean': sum(aim_errors) / len(aim_errors) if aim_errors else 0,
|
|
'aim_p95': aim_errors[min(p95_idx_aim, len(aim_errors) - 1)] if aim_errors else 0,
|
|
'aim_max': aim_errors[-1] if aim_errors else 0,
|
|
}
|
|
|
|
|
|
def simulate_adaptive(frames: List[Frame], sensitivity: float) -> List[Tuple[float, float]]:
|
|
"""
|
|
Simulate the adaptive extrapolation offline with given sensitivity.
|
|
Uses separate pos/aim confidences with relative variance (matching C++ code).
|
|
Returns list of (position_error, aim_error) per frame.
|
|
"""
|
|
pos_errors = []
|
|
aim_errors = []
|
|
|
|
# Sliding window for velocity estimation
|
|
window_size = 12
|
|
|
|
SMALL = 1e-10
|
|
|
|
for i in range(window_size + 1, len(frames) - 1):
|
|
# Build velocity pairs from recent real positions and aims
|
|
pos_velocities = []
|
|
aim_velocities = []
|
|
weights = []
|
|
for j in range(1, min(window_size, i)):
|
|
dt = frames[i - j].timestamp - frames[i - j - 1].timestamp
|
|
if dt > 1e-6:
|
|
pos_vel = vec_scale(vec_sub(frames[i - j].real_pos, frames[i - j - 1].real_pos), 1.0 / dt)
|
|
aim_vel = vec_scale(vec_sub(frames[i - j].real_aim, frames[i - j - 1].real_aim), 1.0 / dt)
|
|
pos_velocities.append(pos_vel)
|
|
aim_velocities.append(aim_vel)
|
|
w = (window_size - j) ** 2
|
|
weights.append(w)
|
|
|
|
if len(pos_velocities) < 2:
|
|
continue
|
|
|
|
total_w = sum(weights)
|
|
|
|
# Weighted average velocity (position)
|
|
avg_pos_vel = (0.0, 0.0, 0.0)
|
|
avg_aim_vel = (0.0, 0.0, 0.0)
|
|
for pv, av, w in zip(pos_velocities, aim_velocities, weights):
|
|
avg_pos_vel = vec_add(avg_pos_vel, vec_scale(pv, w / total_w))
|
|
avg_aim_vel = vec_add(avg_aim_vel, vec_scale(av, w / total_w))
|
|
|
|
# Weighted variance (position and aim separately)
|
|
pos_var_sum = 0.0
|
|
aim_var_sum = 0.0
|
|
for pv, av, w in zip(pos_velocities, aim_velocities, weights):
|
|
pd = vec_sub(pv, avg_pos_vel)
|
|
ad = vec_sub(av, avg_aim_vel)
|
|
pos_var_sum += (pd[0]**2 + pd[1]**2 + pd[2]**2) * w
|
|
aim_var_sum += (ad[0]**2 + ad[1]**2 + ad[2]**2) * w
|
|
pos_variance = pos_var_sum / total_w
|
|
aim_variance = aim_var_sum / total_w
|
|
|
|
# Separate confidences with RELATIVE variance (matching C++ code)
|
|
pos_speed2 = sum(v**2 for v in avg_pos_vel)
|
|
aim_speed2 = sum(v**2 for v in avg_aim_vel)
|
|
|
|
# Extrapolation time
|
|
extrap_dt = frames[i].extrap_time if frames[i].extrap_time > 0 else 0.011
|
|
|
|
pos_confidence = 1.0
|
|
if pos_speed2 > SMALL:
|
|
pos_rel_var = pos_variance / pos_speed2
|
|
pos_confidence = 1.0 / (1.0 + pos_rel_var * sensitivity)
|
|
# Low-speed protection: if extrapolation offset < 1cm, blend confidence toward 1
|
|
pos_extrap_offset = math.sqrt(pos_speed2) * extrap_dt
|
|
pos_offset_significance = max(0.0, min(pos_extrap_offset / 1.0, 1.0)) # 1cm threshold
|
|
pos_confidence = 1.0 + (pos_confidence - 1.0) * pos_offset_significance
|
|
|
|
aim_confidence = 1.0
|
|
if aim_speed2 > SMALL:
|
|
aim_rel_var = aim_variance / aim_speed2
|
|
aim_confidence = 1.0 / (1.0 + aim_rel_var * sensitivity)
|
|
# Low-speed protection: if aim extrapolation offset is tiny, blend toward 1
|
|
aim_extrap_offset = math.sqrt(aim_speed2) * extrap_dt
|
|
aim_offset_significance = max(0.0, min(aim_extrap_offset / 0.01, 1.0)) # 0.01 unit threshold
|
|
aim_confidence = 1.0 + (aim_confidence - 1.0) * aim_offset_significance
|
|
|
|
# Predict
|
|
pred_pos = vec_add(frames[i - 1].real_pos, vec_scale(avg_pos_vel, extrap_dt * pos_confidence))
|
|
pred_aim_raw = vec_add(frames[i - 1].real_aim, vec_scale(avg_aim_vel, extrap_dt * aim_confidence))
|
|
pred_aim = vec_normalize(pred_aim_raw)
|
|
|
|
# Errors
|
|
pos_errors.append(vec_dist(pred_pos, frames[i].real_pos))
|
|
real_aim_n = vec_normalize(frames[i].real_aim)
|
|
if vec_len(pred_aim) > 0.5 and vec_len(real_aim_n) > 0.5:
|
|
aim_errors.append(angle_between(pred_aim, real_aim_n))
|
|
|
|
return pos_errors, aim_errors
|
|
|
|
|
|
def find_optimal_parameters(frames: List[Frame]) -> dict:
|
|
"""Search for optimal AdaptiveSensitivity (no scaling factor needed)."""
|
|
print("\nSearching for optimal AdaptiveSensitivity...")
|
|
print("-" * 60)
|
|
|
|
best_score = float('inf')
|
|
best_sensitivity = 1.0
|
|
|
|
# Search grid — only sensitivity, no more scaling factor
|
|
sensitivities = [0.1, 0.25, 0.5, 0.75, 1.0, 1.5, 2.0, 2.5, 3.0, 4.0, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 50.0, 75.0, 100.0, 150.0, 200.0]
|
|
|
|
results = []
|
|
|
|
for sens in sensitivities:
|
|
pos_errors, aim_errors = simulate_adaptive(frames, sens)
|
|
if not pos_errors:
|
|
continue
|
|
pos_errors_s = sorted(pos_errors)
|
|
aim_errors_s = sorted(aim_errors) if aim_errors else [0]
|
|
|
|
pos_mean = sum(pos_errors) / len(pos_errors)
|
|
pos_p95 = pos_errors_s[int(len(pos_errors_s) * 0.95)]
|
|
aim_mean = sum(aim_errors) / len(aim_errors) if aim_errors else 0
|
|
aim_p95 = aim_errors_s[int(len(aim_errors_s) * 0.95)] if aim_errors else 0
|
|
|
|
# Score: combine position and aim errors (aim weighted more since it's the main issue)
|
|
score = pos_mean * 0.3 + pos_p95 * 0.2 + aim_mean * 0.3 + aim_p95 * 0.2
|
|
|
|
results.append((sens, pos_mean, pos_p95, aim_mean, aim_p95, score))
|
|
|
|
if score < best_score:
|
|
best_score = score
|
|
best_sensitivity = sens
|
|
|
|
# Print results
|
|
results.sort(key=lambda x: x[5])
|
|
print(f"\n{'Sensitivity':>12} {'Pos Mean':>10} {'Pos P95':>10} {'Aim Mean':>10} {'Aim P95':>10} {'Score':>10}")
|
|
print("-" * 65)
|
|
for sens, pm, pp, am, ap, score in results[:10]:
|
|
marker = " <-- BEST" if sens == best_sensitivity else ""
|
|
print(f"{sens:>12.2f} {pm:>10.3f} {pp:>10.3f} {am:>10.3f} {ap:>10.3f} {score:>10.3f}{marker}")
|
|
|
|
return {
|
|
'sensitivity': best_sensitivity,
|
|
'score': best_score,
|
|
}
|
|
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print("Usage: python analyze_antirecoil.py <csv_file> [--plot]")
|
|
print("\nCSV files are saved by the EBBarrel component in:")
|
|
print(" <Project>/Saved/Logs/AntiRecoil_*.csv")
|
|
sys.exit(1)
|
|
|
|
csv_path = sys.argv[1]
|
|
do_plot = '--plot' in sys.argv
|
|
|
|
if not os.path.exists(csv_path):
|
|
print(f"Error: File not found: {csv_path}")
|
|
sys.exit(1)
|
|
|
|
print(f"Loading: {csv_path}")
|
|
frames = load_csv(csv_path)
|
|
print(f"Loaded {len(frames)} frames")
|
|
|
|
if len(frames) < 50:
|
|
print("Warning: very few frames. Record at least a few seconds of movement for good results.")
|
|
|
|
# Basic stats
|
|
duration = frames[-1].timestamp - frames[0].timestamp
|
|
avg_fps = len(frames) / duration if duration > 0 else 0
|
|
print(f"Duration: {duration:.1f}s | Avg FPS: {avg_fps:.1f}")
|
|
print(f"Avg safe samples: {sum(f.safe_count for f in frames) / len(frames):.1f}")
|
|
print(f"Avg buffer samples: {sum(f.buffer_count for f in frames) / len(frames):.1f}")
|
|
print(f"Avg extrapolation time: {sum(f.extrap_time for f in frames) / len(frames) * 1000:.1f}ms")
|
|
|
|
# Current prediction error (as recorded)
|
|
print("\n=== CURRENT PREDICTION ERROR (as recorded) ===")
|
|
current_err = compute_prediction_error(frames)
|
|
print(f"Position error - Mean: {current_err['pos_mean']:.3f}cm | P95: {current_err['pos_p95']:.3f}cm | Max: {current_err['pos_max']:.3f}cm")
|
|
print(f"Aim error - Mean: {current_err['aim_mean']:.3f}deg | P95: {current_err['aim_p95']:.3f}deg | Max: {current_err['aim_max']:.3f}deg")
|
|
|
|
# Find optimal parameters
|
|
print("\n=== PARAMETER OPTIMIZATION ===")
|
|
optimal = find_optimal_parameters(frames)
|
|
|
|
print(f"\n{'=' * 50}")
|
|
print(f" RECOMMENDED SETTINGS:")
|
|
print(f" AdaptiveSensitivity = {optimal['sensitivity']:.2f}")
|
|
print(f"{'=' * 50}")
|
|
|
|
# Plotting
|
|
if do_plot:
|
|
try:
|
|
import matplotlib.pyplot as plt
|
|
|
|
fig, axes = plt.subplots(3, 1, figsize=(14, 10), sharex=True)
|
|
|
|
timestamps = [f.timestamp - frames[0].timestamp for f in frames]
|
|
|
|
# Position error
|
|
pos_errors = [vec_dist(f.pred_pos, f.real_pos) for f in frames]
|
|
axes[0].plot(timestamps, pos_errors, 'r-', alpha=0.5, linewidth=0.5)
|
|
axes[0].set_ylabel('Position Error (cm)')
|
|
axes[0].set_title('Prediction Error Over Time')
|
|
axes[0].axhline(y=current_err['pos_mean'], color='r', linestyle='--', alpha=0.5, label=f"Mean: {current_err['pos_mean']:.2f}cm")
|
|
axes[0].legend()
|
|
|
|
# Aim error
|
|
aim_errors = []
|
|
for f in frames:
|
|
a = vec_normalize(f.pred_aim)
|
|
b = vec_normalize(f.real_aim)
|
|
if vec_len(a) > 0.5 and vec_len(b) > 0.5:
|
|
aim_errors.append(angle_between(a, b))
|
|
else:
|
|
aim_errors.append(0)
|
|
axes[1].plot(timestamps, aim_errors, 'b-', alpha=0.5, linewidth=0.5)
|
|
axes[1].set_ylabel('Aim Error (degrees)')
|
|
axes[1].axhline(y=current_err['aim_mean'], color='b', linestyle='--', alpha=0.5, label=f"Mean: {current_err['aim_mean']:.2f}deg")
|
|
axes[1].legend()
|
|
|
|
# Speed (from real positions)
|
|
speeds = [0]
|
|
for i in range(1, len(frames)):
|
|
dt = frames[i].timestamp - frames[i-1].timestamp
|
|
if dt > 1e-6:
|
|
d = vec_dist(frames[i].real_pos, frames[i-1].real_pos)
|
|
speeds.append(d / dt)
|
|
else:
|
|
speeds.append(speeds[-1])
|
|
axes[2].plot(timestamps, speeds, 'g-', alpha=0.7, linewidth=0.5)
|
|
axes[2].set_ylabel('Speed (cm/s)')
|
|
axes[2].set_xlabel('Time (s)')
|
|
|
|
plt.tight_layout()
|
|
plot_path = csv_path.replace('.csv', '_analysis.png')
|
|
plt.savefig(plot_path, dpi=150)
|
|
print(f"\nPlot saved: {plot_path}")
|
|
plt.show()
|
|
|
|
except ImportError:
|
|
print("\nmatplotlib not installed. Install with: pip install matplotlib")
|
|
|
|
print("\nDone.")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|