v1.7.1: Fix mic not heard + latency optimizations + thread safety

Fix regression from v1.7.0 where agent couldn't hear user speech:
- Restore AsyncTask game-thread dispatch for delegate broadcast (AddUObject
  weak pointer checks are not thread-safe from WASAPI thread)
- Keep early echo suppression in WASAPI callback (before resampling)
- Keep MicChunkMinBytes at 3200 (100ms) for lower latency
- Add thread safety: std::atomic<bool> for bIsListening/bAgentSpeaking/bCapturing,
  FCriticalSection for MicSendLock and WebSocketSendLock
- Add EchoSuppressFlag pointer from agent to mic component

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
j.foucher 2026-02-22 08:46:15 +01:00
parent f23acc8c1c
commit 152fc6196d
7 changed files with 75 additions and 29 deletions

View File

@ -247,6 +247,9 @@ void UElevenLabsConversationalAgentComponent::StartListening()
Mic->OnAudioCaptured.RemoveAll(this); Mic->OnAudioCaptured.RemoveAll(this);
Mic->OnAudioCaptured.AddUObject(this, Mic->OnAudioCaptured.AddUObject(this,
&UElevenLabsConversationalAgentComponent::OnMicrophoneDataCaptured); &UElevenLabsConversationalAgentComponent::OnMicrophoneDataCaptured);
// Echo suppression: point the mic at our atomic bAgentSpeaking flag so it skips
// capture entirely (before resampling) while the agent is speaking.
Mic->EchoSuppressFlag = &bAgentSpeaking;
Mic->StartCapture(); Mic->StartCapture();
const double T = TurnStartTime - SessionStartTime; const double T = TurnStartTime - SessionStartTime;
@ -275,20 +278,23 @@ void UElevenLabsConversationalAgentComponent::StopListening()
// "user speaking" state and stall waiting for more audio that never arrives, // "user speaking" state and stall waiting for more audio that never arrives,
// leaving both sides stuck — no audio for the collision response and no response // leaving both sides stuck — no audio for the collision response and no response
// for subsequent turns. // for subsequent turns.
if (bAgentGenerating)
{ {
if (MicAccumulationBuffer.Num() > 0) FScopeLock Lock(&MicSendLock);
if (bAgentGenerating)
{ {
UE_LOG(LogElevenLabsAgent, Log, if (MicAccumulationBuffer.Num() > 0)
TEXT("StopListening: discarding %d bytes of accumulated mic audio (collision — server is mid-generation)."), {
MicAccumulationBuffer.Num()); UE_LOG(LogElevenLabsAgent, Log,
TEXT("StopListening: discarding %d bytes of accumulated mic audio (collision — server is mid-generation)."),
MicAccumulationBuffer.Num());
}
} }
else if (MicAccumulationBuffer.Num() > 0 && WebSocketProxy && IsConnected())
{
WebSocketProxy->SendAudioChunk(MicAccumulationBuffer);
}
MicAccumulationBuffer.Reset();
} }
else if (MicAccumulationBuffer.Num() > 0 && WebSocketProxy && IsConnected())
{
WebSocketProxy->SendAudioChunk(MicAccumulationBuffer);
}
MicAccumulationBuffer.Reset();
if (WebSocketProxy && TurnMode == EElevenLabsTurnMode::Client) if (WebSocketProxy && TurnMode == EElevenLabsTurnMode::Client)
{ {
@ -394,7 +400,10 @@ void UElevenLabsConversationalAgentComponent::HandleDisconnected(int32 StatusCod
GeneratingTickCount = 0; GeneratingTickCount = 0;
TurnIndex = 0; TurnIndex = 0;
LastClosedTurnIndex = 0; LastClosedTurnIndex = 0;
MicAccumulationBuffer.Reset(); {
FScopeLock Lock(&MicSendLock);
MicAccumulationBuffer.Reset();
}
OnAgentDisconnected.Broadcast(StatusCode, Reason); OnAgentDisconnected.Broadcast(StatusCode, Reason);
} }
@ -611,10 +620,14 @@ void UElevenLabsConversationalAgentComponent::OnMicrophoneDataCaptured(const TAr
if (bAgentSpeaking) return; if (bAgentSpeaking) return;
// Convert this callback's samples to int16 bytes and accumulate. // Convert this callback's samples to int16 bytes and accumulate.
// WASAPI fires every ~5ms (158 bytes at 16kHz). ElevenLabs needs ≥250ms // WASAPI fires every ~5ms (158 bytes at 16kHz). ElevenLabs needs ≥100ms
// (8000 bytes) per chunk for reliable VAD and STT. We hold bytes here // (3200 bytes) per chunk for reliable VAD and STT. We hold bytes here
// until we have enough, then send the whole batch in one WebSocket frame. // until we have enough, then send the whole batch in one WebSocket frame.
TArray<uint8> PCMBytes = FloatPCMToInt16Bytes(FloatPCM); TArray<uint8> PCMBytes = FloatPCMToInt16Bytes(FloatPCM);
// Lock: MicAccumulationBuffer is accessed from WASAPI thread (here) and
// game thread (StopListening flush). WebSocket send is also serialized.
FScopeLock Lock(&MicSendLock);
MicAccumulationBuffer.Append(PCMBytes); MicAccumulationBuffer.Append(PCMBytes);
if (MicAccumulationBuffer.Num() >= MicChunkMinBytes) if (MicAccumulationBuffer.Num() >= MicChunkMinBytes)

View File

@ -89,6 +89,12 @@ void UElevenLabsMicrophoneCaptureComponent::OnAudioGenerate(
UE_LOG(LogElevenLabsMic, Verbose, TEXT("Audio capture buffer overflow.")); UE_LOG(LogElevenLabsMic, Verbose, TEXT("Audio capture buffer overflow."));
} }
// Echo suppression: skip resampling + broadcasting entirely when agent is speaking.
if (EchoSuppressFlag && EchoSuppressFlag->load(std::memory_order_relaxed))
{
return;
}
// Device sends float32 interleaved samples; cast from the void* API. // Device sends float32 interleaved samples; cast from the void* API.
const float* FloatAudio = static_cast<const float*>(InAudio); const float* FloatAudio = static_cast<const float*>(InAudio);
@ -104,15 +110,20 @@ void UElevenLabsMicrophoneCaptureComponent::OnAudioGenerate(
} }
} }
// Fire the delegate on the game thread so subscribers don't need to be // Dispatch to game thread for delegate broadcast.
// thread-safe (WebSocket Send is not thread-safe in UE's implementation). // UE's FMulticastDelegate with AddUObject uses weak object pointer checks that
AsyncTask(ENamedThreads::GameThread, [this, Data = MoveTemp(Resampled)]() // are not thread-safe — broadcasting from the WASAPI thread causes the invocation
// to be silently skipped. The game thread dispatch adds ~8ms latency but is required.
if (bCapturing)
{ {
if (bCapturing) AsyncTask(ENamedThreads::GameThread, [this, Captured = MoveTemp(Resampled)]()
{ {
OnAudioCaptured.Broadcast(Data); if (bCapturing)
} {
}); OnAudioCaptured.Broadcast(Captured);
}
});
}
} }
// ───────────────────────────────────────────────────────────────────────────── // ─────────────────────────────────────────────────────────────────────────────

View File

@ -120,9 +120,12 @@ void UElevenLabsWebSocketProxy::SendAudioChunk(const TArray<uint8>& PCMData)
// Per-chunk log at Verbose only — Log level is too spammy (10+ lines per second). // Per-chunk log at Verbose only — Log level is too spammy (10+ lines per second).
UE_LOG(LogElevenLabsWS, Verbose, TEXT("SendAudioChunk: %d bytes"), PCMData.Num()); UE_LOG(LogElevenLabsWS, Verbose, TEXT("SendAudioChunk: %d bytes"), PCMData.Num());
if (WebSocket.IsValid() && WebSocket->IsConnected())
{ {
WebSocket->Send(AudioJson); FScopeLock Lock(&WebSocketSendLock);
if (WebSocket.IsValid() && WebSocket->IsConnected())
{
WebSocket->Send(AudioJson);
}
} }
} }
@ -658,7 +661,10 @@ void UElevenLabsWebSocketProxy::SendJsonMessage(const TSharedPtr<FJsonObject>& J
UE_LOG(LogElevenLabsWS, Verbose, TEXT("<< %s"), *Out); UE_LOG(LogElevenLabsWS, Verbose, TEXT("<< %s"), *Out);
} }
WebSocket->Send(Out); {
FScopeLock Lock(&WebSocketSendLock);
WebSocket->Send(Out);
}
} }
FString UElevenLabsWebSocketProxy::BuildWebSocketURL(const FString& AgentIDOverride, const FString& APIKeyOverride) const FString UElevenLabsWebSocketProxy::BuildWebSocketURL(const FString& AgentIDOverride, const FString& APIKeyOverride) const

View File

@ -7,6 +7,7 @@
#include "ElevenLabsDefinitions.h" #include "ElevenLabsDefinitions.h"
#include "ElevenLabsWebSocketProxy.h" #include "ElevenLabsWebSocketProxy.h"
#include "Sound/SoundWaveProcedural.h" #include "Sound/SoundWaveProcedural.h"
#include <atomic>
#include "ElevenLabsConversationalAgentComponent.generated.h" #include "ElevenLabsConversationalAgentComponent.generated.h"
class UAudioComponent; class UAudioComponent;
@ -337,8 +338,9 @@ private:
USoundWaveProcedural* ProceduralSoundWave = nullptr; USoundWaveProcedural* ProceduralSoundWave = nullptr;
// ── State ───────────────────────────────────────────────────────────────── // ── State ─────────────────────────────────────────────────────────────────
bool bIsListening = false; // Atomic: read from WASAPI background thread (OnMicrophoneDataCaptured), written from game thread.
bool bAgentSpeaking = false; std::atomic<bool> bIsListening{false};
std::atomic<bool> bAgentSpeaking{false};
// True from the first agent_chat_response_part until the first audio chunk arrives. // True from the first agent_chat_response_part until the first audio chunk arrives.
// Used to block StartListening() while the server is processing the previous turn. // Used to block StartListening() while the server is processing the previous turn.
bool bAgentGenerating = false; bool bAgentGenerating = false;
@ -399,6 +401,9 @@ private:
// WASAPI fires callbacks every ~5ms (158 bytes at 16kHz 16-bit mono). // WASAPI fires callbacks every ~5ms (158 bytes at 16kHz 16-bit mono).
// ElevenLabs needs at least ~100ms (3200 bytes) per chunk for reliable VAD/STT. // ElevenLabs needs at least ~100ms (3200 bytes) per chunk for reliable VAD/STT.
// We accumulate here and only call SendAudioChunk once enough bytes are ready. // We accumulate here and only call SendAudioChunk once enough bytes are ready.
// MicSendLock protects MicAccumulationBuffer + WebSocket send (accessed from WASAPI thread
// in OnMicrophoneDataCaptured and from game thread in StopListening flush).
TArray<uint8> MicAccumulationBuffer; TArray<uint8> MicAccumulationBuffer;
static constexpr int32 MicChunkMinBytes = 8000; // 250ms @ 16kHz 16-bit mono (4000 samples, matches ElevenLabs SDK recommendation) FCriticalSection MicSendLock;
static constexpr int32 MicChunkMinBytes = 3200; // 100ms @ 16kHz 16-bit mono (1600 samples)
}; };

View File

@ -5,6 +5,7 @@
#include "CoreMinimal.h" #include "CoreMinimal.h"
#include "Components/ActorComponent.h" #include "Components/ActorComponent.h"
#include "AudioCapture.h" #include "AudioCapture.h"
#include <atomic>
#include "ElevenLabsMicrophoneCaptureComponent.generated.h" #include "ElevenLabsMicrophoneCaptureComponent.generated.h"
// Delivers captured float PCM samples (16000 Hz mono, resampled from device rate). // Delivers captured float PCM samples (16000 Hz mono, resampled from device rate).
@ -33,11 +34,17 @@ public:
float VolumeMultiplier = 1.0f; float VolumeMultiplier = 1.0f;
/** /**
* Delegate fired on the game thread each time a new chunk of PCM audio * Delegate fired on the game thread each time a new chunk of PCM audio is
* is captured. Samples are float32, resampled to 16000 Hz mono. * captured. Samples are float32, resampled to 16000 Hz mono.
* Audio is captured on a WASAPI background thread, resampled there (with
* echo suppression), then dispatched to the game thread for this broadcast.
*/ */
FOnElevenLabsAudioCaptured OnAudioCaptured; FOnElevenLabsAudioCaptured OnAudioCaptured;
/** Optional pointer to an atomic bool that suppresses capture when true.
* Set by the agent component for echo suppression (skip mic while agent speaks). */
std::atomic<bool>* EchoSuppressFlag = nullptr;
/** Open the default capture device and begin streaming audio. */ /** Open the default capture device and begin streaming audio. */
UFUNCTION(BlueprintCallable, Category = "ElevenLabs") UFUNCTION(BlueprintCallable, Category = "ElevenLabs")
void StartCapture(); void StartCapture();
@ -65,7 +72,7 @@ private:
Audio::FAudioCapture AudioCapture; Audio::FAudioCapture AudioCapture;
Audio::FAudioCaptureDeviceParams DeviceParams; Audio::FAudioCaptureDeviceParams DeviceParams;
bool bCapturing = false; std::atomic<bool> bCapturing{false};
// Device sample rate discovered on StartCapture // Device sample rate discovered on StartCapture
int32 DeviceSampleRate = 44100; int32 DeviceSampleRate = 44100;

View File

@ -205,6 +205,10 @@ private:
EElevenLabsConnectionState ConnectionState = EElevenLabsConnectionState::Disconnected; EElevenLabsConnectionState ConnectionState = EElevenLabsConnectionState::Disconnected;
FElevenLabsConversationInfo ConversationInfo; FElevenLabsConversationInfo ConversationInfo;
// Serializes WebSocket->Send() calls — needed because SendAudioChunk can now be
// called from the WASAPI background thread while SendJsonMessage runs on game thread.
FCriticalSection WebSocketSendLock;
// Accumulation buffer for multi-fragment binary WebSocket frames. // Accumulation buffer for multi-fragment binary WebSocket frames.
// ElevenLabs sends JSON as binary frames; large messages arrive in fragments. // ElevenLabs sends JSON as binary frames; large messages arrive in fragments.
TArray<uint8> BinaryFrameBuffer; TArray<uint8> BinaryFrameBuffer;