Fix collision stuck state + add streaming partial text event

- Skip mic buffer flush during collision avoidance (bAgentGenerating guard
  in StopListening) to prevent sending audio to a mid-generation server
  which caused both sides to stall permanently
- Add OnAgentPartialResponse event: streams LLM text fragments from
  agent_chat_response_part in real-time (opt-in via bEnableAgentPartialResponse),
  separate from the existing OnAgentTextResponse (full text at end)
- French agent server drop after 3 turns is a server-side issue, not client

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
j.foucher 2026-02-20 20:24:50 +01:00
parent d63c1776b5
commit 1b883f532f
5 changed files with 102 additions and 13 deletions

View File

@ -158,6 +158,8 @@ void UElevenLabsConversationalAgentComponent::StartConversation()
&UElevenLabsConversationalAgentComponent::HandleInterrupted); &UElevenLabsConversationalAgentComponent::HandleInterrupted);
WebSocketProxy->OnAgentResponseStarted.AddDynamic(this, WebSocketProxy->OnAgentResponseStarted.AddDynamic(this,
&UElevenLabsConversationalAgentComponent::HandleAgentResponseStarted); &UElevenLabsConversationalAgentComponent::HandleAgentResponseStarted);
WebSocketProxy->OnAgentResponsePart.AddDynamic(this,
&UElevenLabsConversationalAgentComponent::HandleAgentResponsePart);
} }
// Pass configuration to the proxy before connecting. // Pass configuration to the proxy before connecting.
@ -266,7 +268,23 @@ void UElevenLabsConversationalAgentComponent::StopListening()
// Flush any partially-accumulated mic audio before signalling end-of-turn. // Flush any partially-accumulated mic audio before signalling end-of-turn.
// This ensures the final words aren't discarded just because the last callback // This ensures the final words aren't discarded just because the last callback
// didn't push the buffer over the MicChunkMinBytes threshold. // didn't push the buffer over the MicChunkMinBytes threshold.
if (MicAccumulationBuffer.Num() > 0 && WebSocketProxy && IsConnected()) //
// EXCEPT during collision avoidance: bAgentGenerating is already true when
// HandleAgentResponseStarted calls StopListening (it sets the flag before calling us).
// Flushing audio to a server that is mid-generation can cause it to re-enter
// "user speaking" state and stall waiting for more audio that never arrives,
// leaving both sides stuck — no audio for the collision response and no response
// for subsequent turns.
if (bAgentGenerating)
{
if (MicAccumulationBuffer.Num() > 0)
{
UE_LOG(LogElevenLabsAgent, Log,
TEXT("StopListening: discarding %d bytes of accumulated mic audio (collision — server is mid-generation)."),
MicAccumulationBuffer.Num());
}
}
else if (MicAccumulationBuffer.Num() > 0 && WebSocketProxy && IsConnected())
{ {
WebSocketProxy->SendAudioChunk(MicAccumulationBuffer); WebSocketProxy->SendAudioChunk(MicAccumulationBuffer);
} }
@ -423,7 +441,8 @@ void UElevenLabsConversationalAgentComponent::HandleAgentResponseStarted()
{ {
// The server has started generating a response (first agent_chat_response_part). // The server has started generating a response (first agent_chat_response_part).
// Set bAgentGenerating BEFORE StopListening so that any StartListening call // Set bAgentGenerating BEFORE StopListening so that any StartListening call
// triggered by the Blueprint's OnAgentStartedGenerating handler is blocked. // triggered by the Blueprint's OnAgentStartedGenerating handler is blocked,
// and so that StopListening knows to skip the mic buffer flush (collision path).
bAgentGenerating = true; bAgentGenerating = true;
bWaitingForAgentResponse = false; // Server is generating — response timeout cancelled. bWaitingForAgentResponse = false; // Server is generating — response timeout cancelled.
@ -433,21 +452,39 @@ void UElevenLabsConversationalAgentComponent::HandleAgentResponseStarted()
if (bIsListening) if (bIsListening)
{ {
// Collision: server started generating Turn N's response while Turn M (M>N) mic was open. // Collision: server started generating Turn N's response while Turn M (M>N) mic was open.
// Log both turn indices so the timeline is unambiguous. // The server's VAD detected a pause in the user's speech and started generating
// prematurely — the user hasn't finished speaking yet.
//
// Stop the mic WITHOUT flushing the accumulated audio buffer (see StopListening's
// bAgentGenerating guard). Flushing would send audio to a server that is mid-generation,
// causing it to re-enter "user speaking" state and stall — both sides stuck.
//
// Do NOT send an interrupt here: the ElevenLabs server does not always send the
// interruption ack, which would leave bIgnoreIncomingContent=true and silently
// discard all subsequent content. Instead, let the server's response play out:
// - If audio arrives → EnqueueAgentAudio sets bAgentSpeaking, response plays normally.
// - If audio never arrives → generating timeout (10s) clears bAgentGenerating.
// Either way the state machine recovers and Blueprint can reopen the mic.
UE_LOG(LogElevenLabsAgent, Log, UE_LOG(LogElevenLabsAgent, Log,
TEXT("[T+%.2fs] [Turn %d → Turn %d collision] Agent generating Turn %d response — mic (Turn %d) was open, stopping. (%.2fs after turn end)"), TEXT("[T+%.2fs] [Turn %d → Turn %d collision] Agent generating Turn %d response — mic (Turn %d) was open, stopping. (%.2fs after turn end)"),
T, LastClosedTurnIndex, TurnIndex, LastClosedTurnIndex, TurnIndex, LatencyFromTurnEnd); T, LastClosedTurnIndex, TurnIndex, LastClosedTurnIndex, TurnIndex, LatencyFromTurnEnd);
StopListening(); StopListening();
} }
else
{
UE_LOG(LogElevenLabsAgent, Log, UE_LOG(LogElevenLabsAgent, Log,
TEXT("[T+%.2fs] [Turn %d] Agent generating. (%.2fs after turn end)"), TEXT("[T+%.2fs] [Turn %d] Agent generating. (%.2fs after turn end)"),
T, LastClosedTurnIndex, LatencyFromTurnEnd); T, LastClosedTurnIndex, LatencyFromTurnEnd);
}
OnAgentStartedGenerating.Broadcast(); OnAgentStartedGenerating.Broadcast();
} }
void UElevenLabsConversationalAgentComponent::HandleAgentResponsePart(const FString& PartialText)
{
if (bEnableAgentPartialResponse)
{
OnAgentPartialResponse.Broadcast(PartialText);
}
}
// ───────────────────────────────────────────────────────────────────────────── // ─────────────────────────────────────────────────────────────────────────────
// Audio playback // Audio playback
// ───────────────────────────────────────────────────────────────────────────── // ─────────────────────────────────────────────────────────────────────────────

View File

@ -400,7 +400,7 @@ void UElevenLabsWebSocketProxy::OnWsMessage(const FString& Message)
} }
else if (MsgType == ElevenLabsMessageType::AgentChatResponsePart) else if (MsgType == ElevenLabsMessageType::AgentChatResponsePart)
{ {
HandleAgentChatResponsePart(); HandleAgentChatResponsePart(Root);
} }
else if (MsgType == ElevenLabsMessageType::AgentResponseCorrection) else if (MsgType == ElevenLabsMessageType::AgentResponseCorrection)
{ {
@ -602,7 +602,7 @@ void UElevenLabsWebSocketProxy::HandleAgentResponse(const TSharedPtr<FJsonObject
OnAgentResponse.Broadcast(ResponseText); OnAgentResponse.Broadcast(ResponseText);
} }
void UElevenLabsWebSocketProxy::HandleAgentChatResponsePart() void UElevenLabsWebSocketProxy::HandleAgentChatResponsePart(const TSharedPtr<FJsonObject>& Root)
{ {
// Ignore response parts that belong to a generation we have already interrupted. // Ignore response parts that belong to a generation we have already interrupted.
// Without this guard, old parts arriving after SendInterrupt() would re-trigger // Without this guard, old parts arriving after SendInterrupt() would re-trigger
@ -628,7 +628,21 @@ void UElevenLabsWebSocketProxy::HandleAgentChatResponsePart()
T, LatencyFromTurnEnd); T, LatencyFromTurnEnd);
OnAgentResponseStarted.Broadcast(); OnAgentResponseStarted.Broadcast();
} }
// Subsequent parts logged at Verbose only (can be dozens per response).
// Extract the streaming text fragment and broadcast it.
// API structure:
// { "type": "agent_chat_response_part",
// "agent_chat_response_part_event": { "agent_response_part": "partial text" }
// }
const TSharedPtr<FJsonObject>* PartEvent = nullptr;
if (Root->TryGetObjectField(TEXT("agent_chat_response_part_event"), PartEvent) && PartEvent)
{
FString PartText;
if ((*PartEvent)->TryGetStringField(TEXT("agent_response_part"), PartText) && !PartText.IsEmpty())
{
OnAgentResponsePart.Broadcast(PartText);
}
}
} }
void UElevenLabsWebSocketProxy::HandleInterruption(const TSharedPtr<FJsonObject>& Root) void UElevenLabsWebSocketProxy::HandleInterruption(const TSharedPtr<FJsonObject>& Root)

View File

@ -43,6 +43,15 @@ DECLARE_DYNAMIC_MULTICAST_DELEGATE(FOnAgentInterrupted);
*/ */
DECLARE_DYNAMIC_MULTICAST_DELEGATE(FOnAgentStartedGenerating); DECLARE_DYNAMIC_MULTICAST_DELEGATE(FOnAgentStartedGenerating);
/**
* Fired for every agent_chat_response_part streams the agent's text as the LLM
* generates it, token by token. Use this for real-time subtitles / text display.
* Each call provides the text fragment from that individual part (NOT accumulated).
* The final complete text is still available via OnAgentTextResponse (agent_response).
*/
DECLARE_DYNAMIC_MULTICAST_DELEGATE_OneParam(FOnAgentPartialResponse,
const FString&, PartialText);
/** /**
* Fired when the server has not started generating a response within ResponseTimeoutSeconds * Fired when the server has not started generating a response within ResponseTimeoutSeconds
* after the user stopped speaking (StopListening was called). * after the user stopped speaking (StopListening was called).
@ -138,6 +147,15 @@ public:
UPROPERTY(EditAnywhere, BlueprintReadWrite, Category = "ElevenLabs|Events") UPROPERTY(EditAnywhere, BlueprintReadWrite, Category = "ElevenLabs|Events")
bool bEnableAgentTextResponse = true; bool bEnableAgentTextResponse = true;
/**
* Forward streaming text parts (agent_chat_response_part events) to the
* OnAgentPartialResponse delegate. Each part is a text fragment as the LLM
* generates it use this for real-time subtitles that appear while the agent
* speaks, instead of waiting for the full text (OnAgentTextResponse).
*/
UPROPERTY(EditAnywhere, BlueprintReadWrite, Category = "ElevenLabs|Events")
bool bEnableAgentPartialResponse = false;
/** /**
* How many seconds to wait for the server to start generating a response * How many seconds to wait for the server to start generating a response
* after the user stops speaking (StopListening) before firing OnAgentResponseTimeout. * after the user stops speaking (StopListening) before firing OnAgentResponseTimeout.
@ -168,6 +186,14 @@ public:
UPROPERTY(BlueprintAssignable, Category = "ElevenLabs|Events") UPROPERTY(BlueprintAssignable, Category = "ElevenLabs|Events")
FOnAgentTextResponse OnAgentTextResponse; FOnAgentTextResponse OnAgentTextResponse;
/**
* Streaming text fragments as the LLM generates them.
* Fires for every agent_chat_response_part each call gives one text chunk.
* Enable with bEnableAgentPartialResponse.
*/
UPROPERTY(BlueprintAssignable, Category = "ElevenLabs|Events")
FOnAgentPartialResponse OnAgentPartialResponse;
UPROPERTY(BlueprintAssignable, Category = "ElevenLabs|Events") UPROPERTY(BlueprintAssignable, Category = "ElevenLabs|Events")
FOnAgentStartedSpeaking OnAgentStartedSpeaking; FOnAgentStartedSpeaking OnAgentStartedSpeaking;
@ -285,6 +311,9 @@ private:
UFUNCTION() UFUNCTION()
void HandleAgentResponseStarted(); void HandleAgentResponseStarted();
UFUNCTION()
void HandleAgentResponsePart(const FString& PartialText);
// ── Audio playback ──────────────────────────────────────────────────────── // ── Audio playback ────────────────────────────────────────────────────────
void InitAudioPlayback(); void InitAudioPlayback();
void EnqueueAgentAudio(const TArray<uint8>& PCMData); void EnqueueAgentAudio(const TArray<uint8>& PCMData);

View File

@ -43,6 +43,11 @@ DECLARE_DYNAMIC_MULTICAST_DELEGATE(FOnElevenLabsInterrupted);
*/ */
DECLARE_DYNAMIC_MULTICAST_DELEGATE(FOnElevenLabsAgentResponseStarted); DECLARE_DYNAMIC_MULTICAST_DELEGATE(FOnElevenLabsAgentResponseStarted);
/** Fired for every agent_chat_response_part — streams the LLM text as it is generated.
* PartialText is the text fragment from this individual part (NOT accumulated). */
DECLARE_DYNAMIC_MULTICAST_DELEGATE_OneParam(FOnElevenLabsAgentResponsePart,
const FString&, PartialText);
// ───────────────────────────────────────────────────────────────────────────── // ─────────────────────────────────────────────────────────────────────────────
// WebSocket Proxy // WebSocket Proxy
@ -94,6 +99,10 @@ public:
UPROPERTY(BlueprintAssignable, Category = "ElevenLabs|Events") UPROPERTY(BlueprintAssignable, Category = "ElevenLabs|Events")
FOnElevenLabsAgentResponseStarted OnAgentResponseStarted; FOnElevenLabsAgentResponseStarted OnAgentResponseStarted;
/** Fired for every agent_chat_response_part with the streaming text fragment. */
UPROPERTY(BlueprintAssignable, Category = "ElevenLabs|Events")
FOnElevenLabsAgentResponsePart OnAgentResponsePart;
// ── Lifecycle ───────────────────────────────────────────────────────────── // ── Lifecycle ─────────────────────────────────────────────────────────────
/** /**
@ -182,7 +191,7 @@ private:
void HandleAudioResponse(const TSharedPtr<FJsonObject>& Payload); void HandleAudioResponse(const TSharedPtr<FJsonObject>& Payload);
void HandleTranscript(const TSharedPtr<FJsonObject>& Payload); void HandleTranscript(const TSharedPtr<FJsonObject>& Payload);
void HandleAgentResponse(const TSharedPtr<FJsonObject>& Payload); void HandleAgentResponse(const TSharedPtr<FJsonObject>& Payload);
void HandleAgentChatResponsePart(); void HandleAgentChatResponsePart(const TSharedPtr<FJsonObject>& Payload);
void HandleInterruption(const TSharedPtr<FJsonObject>& Payload); void HandleInterruption(const TSharedPtr<FJsonObject>& Payload);
void HandlePing(const TSharedPtr<FJsonObject>& Payload); void HandlePing(const TSharedPtr<FJsonObject>& Payload);