Compare commits

...

5 Commits

Author SHA1 Message Date
d60f8d8484 Add response latency metrics to ElevenLabs debug HUD
Track 4 latencies per conversation turn (computed only when bDebug is active):
- STT→Gen: user stops talking → server starts generating
- Gen→Audio: server generating → first audio chunk received
- Total: user stops talking → first audio chunk (end-to-end)
- End-to-Ear: user stops talking → audio playback starts (includes pre-buffer)

New timestamps: GenerationStartTime (HandleAgentResponseStarted),
PlaybackStartTime (3 OnAudioPlaybackStarted sites). Values persist on
HUD between turns, reset when new turn starts.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-05 17:51:25 +01:00
6d4ef21269 Merge feature/multi-player-shared-agent into main 2026-03-05 17:39:05 +01:00
c922fd304c Fix multi-player regressions: audio/text drops, thread safety
1. StartConversationWithSelectedAgent: remove early return when WebSocket
   is already connected (persistent mode). Always call ServerJoinConversation
   so the pawn is added to NetConnectedPawns and bNetIsConversing is set.

2. ServerSendMicAudioFromPlayer: bypass speaker arbitration in standalone
   mode (<=1 connected pawn). Send audio directly to avoid silent drops
   caused by pawn not being in NetConnectedPawns array. Add warning logs
   for multi-player drops to aid debugging.

3. OnMicrophoneDataCaptured: restore direct WebSocketProxy->SendAudioChunk
   on the server path. This callback runs on the WASAPI audio thread —
   accessing game-thread state (NetConnectedPawns, LastSpeakTime) was
   causing undefined behavior. Internal mic is always the local player,
   no speaker arbitration needed.

4. StopListening flush: send directly to WebSocket (active speaker already
   established, no arbitration needed for the tail of the current turn).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-05 15:33:43 +01:00
ca10689bb6 Fix thread-safety crash in LipSync anim node: TMap race condition
GetCurrentBlendshapes() was copying CurrentBlendshapes on the anim worker
thread while the game thread mutated it (TSet::UnhashElements crash).
Use a snapshot pattern: game thread copies to ThreadSafeBlendshapes under
FCriticalSection at end of TickComponent, anim node reads the snapshot.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-05 15:16:10 +01:00
8d4065944c Multi-player shared agent: multiple players can converse with the same agent simultaneously
Replace exclusive single-player agent lock with shared multi-player model:
- NetConversatingPawn/Player → NetConnectedPawns array + NetActiveSpeakerPawn
- Server-side speaker arbitration with hysteresis (0.3s) prevents gaze ping-pong
- Speaker idle timeout (3.0s) clears active speaker after silence
- Agent gaze follows the active speaker via replicated OnRep_ActiveSpeaker
- New ServerJoinConversation/ServerLeaveConversation RPCs (idempotent join/leave)
- Backward-compatible: old ServerRequest/Release delegate to new Join/Leave
- InteractionComponent no longer skips occupied agents
- DrawDebugHUD shows connected player count and active speaker
- All mic audio paths (FeedExternalAudio, OnMicCapture, StopListening flush) route
  through speaker arbitration

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-05 15:01:26 +01:00
6 changed files with 616 additions and 235 deletions

View File

@ -159,22 +159,54 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::TickComponent(float DeltaTime, ELevel
{ {
AudioPlaybackComponent->Play(); AudioPlaybackComponent->Play();
} }
PlaybackStartTime = FPlatformTime::Seconds();
if (bDebug && TurnEndTime > 0.0)
{
LastLatencies.EndToEarMs = static_cast<float>((PlaybackStartTime - TurnEndTime) * 1000.0);
}
OnAudioPlaybackStarted.Broadcast(); OnAudioPlaybackStarted.Broadcast();
} }
} }
} }
// Network: detect if the conversating player disconnected (server only). // Network: detect if connected players disconnected (server only).
if (GetOwnerRole() == ROLE_Authority && bNetIsConversing && NetConversatingPlayer) if (GetOwnerRole() == ROLE_Authority && bNetIsConversing && NetConnectedPawns.Num() > 0)
{ {
if (!IsValid(NetConversatingPlayer) || !NetConversatingPlayer->GetPawn()) for (int32 i = NetConnectedPawns.Num() - 1; i >= 0; --i)
{
APawn* Pawn = NetConnectedPawns[i];
if (!IsValid(Pawn))
{
UE_LOG(LogPS_AI_ConvAgent_ElevenLabs, Warning,
TEXT("Connected player pawn [%d] disconnected — removing."), i);
NetConnectedPawns.RemoveAt(i, 1, EAllowShrinking::No);
LastSpeakTime.Remove(Pawn);
if (NetActiveSpeakerPawn == Pawn)
{
SetActiveSpeaker(NetConnectedPawns.Num() > 0 ? NetConnectedPawns[0] : nullptr);
}
}
}
if (NetConnectedPawns.Num() == 0)
{ {
UE_LOG(LogPS_AI_ConvAgent_ElevenLabs, Warning, UE_LOG(LogPS_AI_ConvAgent_ElevenLabs, Warning,
TEXT("Conversating player disconnected — releasing NPC.")); TEXT("All connected players disconnected — releasing NPC."));
ServerReleaseConversation_Implementation(); ServerReleaseConversation_Implementation();
} }
} }
// Speaker idle timeout: clear active speaker after silence.
if (GetOwnerRole() == ROLE_Authority && NetActiveSpeakerPawn && SpeakerIdleTimeout > 0.0f)
{
if (const double* LastActive = LastSpeakTime.Find(NetActiveSpeakerPawn))
{
if (FPlatformTime::Seconds() - *LastActive > SpeakerIdleTimeout)
{
SetActiveSpeaker(nullptr);
}
}
}
// ── Reconnection ──────────────────────────────────────────────────────── // ── Reconnection ────────────────────────────────────────────────────────
if (bWantsReconnect && FPlatformTime::Seconds() >= NextReconnectTime) if (bWantsReconnect && FPlatformTime::Seconds() >= NextReconnectTime)
{ {
@ -186,9 +218,10 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::TickComponent(float DeltaTime, ELevel
UE_LOG(LogPS_AI_ConvAgent_ElevenLabs, Error, UE_LOG(LogPS_AI_ConvAgent_ElevenLabs, Error,
TEXT("Reconnection failed after %d attempts — giving up."), MaxReconnectAttempts); TEXT("Reconnection failed after %d attempts — giving up."), MaxReconnectAttempts);
bNetIsConversing = false; bNetIsConversing = false;
NetConnectedPawns.Empty();
NetActiveSpeakerPawn = nullptr;
LastSpeakTime.Empty();
ApplyConversationGaze(); ApplyConversationGaze();
NetConversatingPlayer = nullptr;
NetConversatingPawn = nullptr;
OnAgentDisconnected.Broadcast(1006, TEXT("Reconnection failed")); OnAgentDisconnected.Broadcast(1006, TEXT("Reconnection failed"));
} }
else else
@ -284,30 +317,11 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::StartConversation()
{ {
if (GetOwnerRole() == ROLE_Authority) if (GetOwnerRole() == ROLE_Authority)
{ {
// Set conversation state (used by ApplyConversationGaze, gaze, LOD, etc.). // Standalone / listen-server: join via the local player controller.
// In standalone these aren't replicated but are still needed as local state flags.
APlayerController* PC = GetWorld() ? GetWorld()->GetFirstPlayerController() : nullptr; APlayerController* PC = GetWorld() ? GetWorld()->GetFirstPlayerController() : nullptr;
bNetIsConversing = true; if (PC)
NetConversatingPlayer = PC;
NetConversatingPawn = PC ? PC->GetPawn() : nullptr;
// In persistent mode the WebSocket was already opened by the first StartConversation.
// Reuse the existing connection — only set up conversation state.
if (bPersistentSession && IsConnected())
{ {
// WebSocket already alive — just set up conversation state (gaze, etc.). ServerJoinConversation_Implementation(PC);
ApplyConversationGaze();
OnAgentConnected.Broadcast(WebSocketProxy->GetConversationInfo());
// Auto-start listening if configured (same as HandleConnected).
if (bAutoStartListening && TurnMode == EPS_AI_ConvAgent_TurnMode_ElevenLabs::Server)
{
StartListening();
}
}
else
{
StartConversation_Internal();
} }
} }
else else
@ -316,13 +330,13 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::StartConversation()
// Server RPCs on NPC actors — no owning connection). // Server RPCs on NPC actors — no owning connection).
if (auto* Relay = FindLocalRelayComponent()) if (auto* Relay = FindLocalRelayComponent())
{ {
Relay->ServerRelayStartConversation(GetOwner()); Relay->ServerRelayJoinConversation(GetOwner());
} }
else else
{ {
// Fallback: try direct RPC (will fail with "No owning connection" warning). // Fallback: try direct RPC (will fail with "No owning connection" warning).
APlayerController* PC = GetWorld() ? GetWorld()->GetFirstPlayerController() : nullptr; APlayerController* PC = GetWorld() ? GetWorld()->GetFirstPlayerController() : nullptr;
if (PC) ServerRequestConversation(PC); if (PC) ServerJoinConversation(PC);
} }
} }
} }
@ -370,44 +384,12 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::EndConversation()
{ {
if (GetOwnerRole() == ROLE_Authority) if (GetOwnerRole() == ROLE_Authority)
{ {
// Cancel any pending reconnection (ephemeral mode only — persistent keeps reconnecting). // Standalone / listen-server: leave via the local player controller.
if (!bPersistentSession) APlayerController* PC = GetWorld() ? GetWorld()->GetFirstPlayerController() : nullptr;
if (PC)
{ {
bWantsReconnect = false; ServerLeaveConversation_Implementation(PC);
ReconnectAttemptCount = 0;
} }
StopListening();
// ISSUE-4: StopListening() may set bWaitingForAgentResponse=true (normal turn end path).
// Cancel it immediately — there is no response coming because we are ending the session.
// Without this, TickComponent could fire OnAgentResponseTimeout after EndConversation().
bWaitingForAgentResponse = false;
StopAgentAudio();
// In persistent mode, keep the WebSocket open — only manage conversation state.
if (!bPersistentSession)
{
if (WebSocketProxy)
{
bIntentionalDisconnect = true;
WebSocketProxy->Disconnect();
// OnClosed callback will fire OnAgentDisconnected.
WebSocketProxy = nullptr;
}
}
else
{
// Persistent mode: WebSocket stays alive but the interaction is over.
// Broadcast OnAgentDisconnected so expression components deactivate
// (body, facial, etc.). The WebSocket OnClosed never fires here.
OnAgentDisconnected.Broadcast(1000, TEXT("EndConversation (persistent)"));
}
// Reset replicated state so other players can talk to this NPC.
bNetIsConversing = false;
ApplyConversationGaze();
NetConversatingPlayer = nullptr;
NetConversatingPawn = nullptr;
} }
else else
{ {
@ -415,7 +397,7 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::EndConversation()
// Server RPCs on NPC actors — no owning connection). // Server RPCs on NPC actors — no owning connection).
if (auto* Relay = FindLocalRelayComponent()) if (auto* Relay = FindLocalRelayComponent())
{ {
Relay->ServerRelayEndConversation(GetOwner()); Relay->ServerRelayLeaveConversation(GetOwner());
} }
else else
{ {
@ -472,6 +454,8 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::StartListening()
} }
} }
bWaitingForAgentResponse = false; // New user turn — cancel any pending response timeout. bWaitingForAgentResponse = false; // New user turn — cancel any pending response timeout.
GenerationStartTime = 0.0;
PlaybackStartTime = 0.0;
++TurnIndex; ++TurnIndex;
bIsListening = true; bIsListening = true;
TurnStartTime = FPlatformTime::Seconds(); TurnStartTime = FPlatformTime::Seconds();
@ -566,6 +550,9 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::StopListening()
{ {
if (GetOwnerRole() == ROLE_Authority) if (GetOwnerRole() == ROLE_Authority)
{ {
// Flush the final chunk directly to WebSocket.
// This is the tail of the current turn — the active speaker
// is already established, no arbitration needed.
if (WebSocketProxy && IsConnected()) if (WebSocketProxy && IsConnected())
{ {
WebSocketProxy->SendAudioChunk(MicAccumulationBuffer); WebSocketProxy->SendAudioChunk(MicAccumulationBuffer);
@ -723,7 +710,17 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::FeedExternalAudio(const TArray<float>
{ {
if (GetOwnerRole() == ROLE_Authority) if (GetOwnerRole() == ROLE_Authority)
{ {
if (WebSocketProxy) WebSocketProxy->SendAudioChunk(MicAccumulationBuffer); // On the server (listen server / standalone), route through speaker
// arbitration so the correct player's audio is forwarded to ElevenLabs.
APawn* LocalPawn = nullptr;
if (UWorld* World = GetWorld())
{
if (APlayerController* PC = World->GetFirstPlayerController())
{
LocalPawn = PC->GetPawn();
}
}
ServerSendMicAudioFromPlayer(LocalPawn, MicAccumulationBuffer);
} }
else else
{ {
@ -821,14 +818,18 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::HandleConnected(const FPS_AI_ConvAgen
UE_LOG(LogPS_AI_ConvAgent_ElevenLabs, Log, TEXT("[T+0.00s] Agent connected. ConversationID=%s"), *Info.ConversationID); UE_LOG(LogPS_AI_ConvAgent_ElevenLabs, Log, TEXT("[T+0.00s] Agent connected. ConversationID=%s"), *Info.ConversationID);
OnAgentConnected.Broadcast(Info); OnAgentConnected.Broadcast(Info);
// Network: notify the requesting remote client that conversation started. // Network: notify all connected remote clients that conversation started.
// Client RPCs on NPC actors have no owning connection — route through the // Client RPCs on NPC actors have no owning connection — route through the
// player pawn's InteractionComponent which IS owned by the client. // player pawn's InteractionComponent which IS owned by the client.
if (GetOwnerRole() == ROLE_Authority && NetConversatingPawn) if (GetOwnerRole() == ROLE_Authority)
{ {
if (auto* Relay = NetConversatingPawn->FindComponentByClass<UPS_AI_ConvAgent_InteractionComponent>()) for (APawn* Pawn : NetConnectedPawns)
{ {
Relay->ClientRelayConversationStarted(GetOwner(), Info); if (!Pawn) continue;
if (auto* Relay = Pawn->FindComponentByClass<UPS_AI_ConvAgent_InteractionComponent>())
{
Relay->ClientRelayConversationStarted(GetOwner(), Info);
}
} }
} }
@ -892,7 +893,7 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::HandleDisconnected(int32 StatusCode,
TEXT("Unexpected disconnect — will attempt reconnection in %.0fs (max %d attempts)."), TEXT("Unexpected disconnect — will attempt reconnection in %.0fs (max %d attempts)."),
Delay, MaxReconnectAttempts); Delay, MaxReconnectAttempts);
OnAgentError.Broadcast(TEXT("Connection lost — reconnecting...")); OnAgentError.Broadcast(TEXT("Connection lost — reconnecting..."));
// Keep bNetIsConversing / NetConversatingPawn so the NPC stays occupied. // Keep bNetIsConversing / NetConnectedPawns so the NPC stays occupied.
return; return;
} }
@ -901,9 +902,10 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::HandleDisconnected(int32 StatusCode,
if (GetOwnerRole() == ROLE_Authority) if (GetOwnerRole() == ROLE_Authority)
{ {
bNetIsConversing = false; bNetIsConversing = false;
NetConnectedPawns.Empty();
NetActiveSpeakerPawn = nullptr;
LastSpeakTime.Empty();
ApplyConversationGaze(); ApplyConversationGaze();
NetConversatingPlayer = nullptr;
NetConversatingPawn = nullptr;
} }
OnAgentDisconnected.Broadcast(StatusCode, Reason); OnAgentDisconnected.Broadcast(StatusCode, Reason);
@ -1055,6 +1057,12 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::HandleAgentResponseStarted()
} }
const double Now = FPlatformTime::Seconds(); const double Now = FPlatformTime::Seconds();
GenerationStartTime = Now;
if (bDebug && TurnEndTime > 0.0)
{
LastLatencies.STTToGenMs = static_cast<float>((Now - TurnEndTime) * 1000.0);
}
const double T = Now - SessionStartTime; const double T = Now - SessionStartTime;
const double LatencyFromTurnEnd = TurnEndTime > 0.0 ? Now - TurnEndTime : 0.0; const double LatencyFromTurnEnd = TurnEndTime > 0.0 ? Now - TurnEndTime : 0.0;
if (bIsListening) if (bIsListening)
@ -1340,6 +1348,12 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::EnqueueAgentAudio(const TArray<uint8>
UE_LOG(LogPS_AI_ConvAgent_ElevenLabs, Log, UE_LOG(LogPS_AI_ConvAgent_ElevenLabs, Log,
TEXT("[T+%.2fs] [Turn %d] Agent speaking — first audio chunk. (%.2fs after turn end)"), TEXT("[T+%.2fs] [Turn %d] Agent speaking — first audio chunk. (%.2fs after turn end)"),
T, LastClosedTurnIndex, LatencyFromTurnEnd); T, LastClosedTurnIndex, LatencyFromTurnEnd);
// Update latency snapshot for HUD display.
if (TurnEndTime > 0.0)
LastLatencies.TotalMs = static_cast<float>((AgentSpeakStart - TurnEndTime) * 1000.0);
if (GenerationStartTime > 0.0)
LastLatencies.GenToAudioMs = static_cast<float>((AgentSpeakStart - GenerationStartTime) * 1000.0);
} }
OnAgentStartedSpeaking.Broadcast(); OnAgentStartedSpeaking.Broadcast();
@ -1371,6 +1385,11 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::EnqueueAgentAudio(const TArray<uint8>
{ {
AudioPlaybackComponent->Play(); AudioPlaybackComponent->Play();
} }
PlaybackStartTime = FPlatformTime::Seconds();
if (bDebug && TurnEndTime > 0.0)
{
LastLatencies.EndToEarMs = static_cast<float>((PlaybackStartTime - TurnEndTime) * 1000.0);
}
OnAudioPlaybackStarted.Broadcast(); OnAudioPlaybackStarted.Broadcast();
} }
} }
@ -1397,6 +1416,11 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::EnqueueAgentAudio(const TArray<uint8>
{ {
AudioPlaybackComponent->Play(); AudioPlaybackComponent->Play();
} }
PlaybackStartTime = FPlatformTime::Seconds();
if (bDebug && TurnEndTime > 0.0)
{
LastLatencies.EndToEarMs = static_cast<float>((PlaybackStartTime - TurnEndTime) * 1000.0);
}
OnAudioPlaybackStarted.Broadcast(); OnAudioPlaybackStarted.Broadcast();
} }
SilentTickCount = 0; SilentTickCount = 0;
@ -1527,7 +1551,14 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::OnMicrophoneDataCaptured(const TArray
{ {
if (GetOwnerRole() == ROLE_Authority) if (GetOwnerRole() == ROLE_Authority)
{ {
if (WebSocketProxy) WebSocketProxy->SendAudioChunk(MicAccumulationBuffer); // Internal mic = local player on the server. Send directly to WebSocket.
// This callback runs on the WASAPI audio thread — accessing game-thread
// state (NetConnectedPawns, LastSpeakTime, etc.) is NOT safe here.
// Speaker arbitration is only needed for multi-player external mic.
if (WebSocketProxy && WebSocketProxy->IsConnected())
{
WebSocketProxy->SendAudioChunk(MicAccumulationBuffer);
}
} }
else else
{ {
@ -1589,8 +1620,8 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::GetLifetimeReplicatedProps(
{ {
Super::GetLifetimeReplicatedProps(OutLifetimeProps); Super::GetLifetimeReplicatedProps(OutLifetimeProps);
DOREPLIFETIME(UPS_AI_ConvAgent_ElevenLabsComponent, bNetIsConversing); DOREPLIFETIME(UPS_AI_ConvAgent_ElevenLabsComponent, bNetIsConversing);
DOREPLIFETIME(UPS_AI_ConvAgent_ElevenLabsComponent, NetConversatingPlayer); DOREPLIFETIME(UPS_AI_ConvAgent_ElevenLabsComponent, NetConnectedPawns);
DOREPLIFETIME(UPS_AI_ConvAgent_ElevenLabsComponent, NetConversatingPawn); DOREPLIFETIME(UPS_AI_ConvAgent_ElevenLabsComponent, NetActiveSpeakerPawn);
DOREPLIFETIME(UPS_AI_ConvAgent_ElevenLabsComponent, CurrentEmotion); DOREPLIFETIME(UPS_AI_ConvAgent_ElevenLabsComponent, CurrentEmotion);
DOREPLIFETIME(UPS_AI_ConvAgent_ElevenLabsComponent, CurrentEmotionIntensity); DOREPLIFETIME(UPS_AI_ConvAgent_ElevenLabsComponent, CurrentEmotionIntensity);
} }
@ -1600,31 +1631,30 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::OnRep_ConversationState()
AActor* Owner = GetOwner(); AActor* Owner = GetOwner();
UE_LOG(LogPS_AI_ConvAgent_ElevenLabs, Log, UE_LOG(LogPS_AI_ConvAgent_ElevenLabs, Log,
TEXT("[NET-REP] OnRep_ConversationState — bNetIsConversing=%s NetConversatingPawn=%s NetConversatingPlayer=%s Owner=%s Role=%d"), TEXT("[NET-REP] OnRep_ConversationState — bNetIsConversing=%s ConnectedPawns=%d ActiveSpeaker=%s Owner=%s Role=%d"),
bNetIsConversing ? TEXT("true") : TEXT("false"), bNetIsConversing ? TEXT("true") : TEXT("false"),
NetConversatingPawn ? *NetConversatingPawn->GetName() : TEXT("NULL"), NetConnectedPawns.Num(),
NetConversatingPlayer ? *NetConversatingPlayer->GetName() : TEXT("NULL"), NetActiveSpeakerPawn ? *NetActiveSpeakerPawn->GetName() : TEXT("NULL"),
Owner ? *Owner->GetName() : TEXT("NULL"), Owner ? *Owner->GetName() : TEXT("NULL"),
static_cast<int32>(GetOwnerRole())); static_cast<int32>(GetOwnerRole()));
if (Owner) if (Owner)
{ {
// Update gaze target on all clients so the NPC head/eyes track the // Update gaze target on all clients so the NPC head/eyes track the
// conversating player. TargetActor is normally set by InteractionComponent // active speaker. TargetActor is normally set by InteractionComponent
// on the local pawn, but remote clients never run that code path. // on the local pawn, but remote clients never run that code path.
if (UPS_AI_ConvAgent_GazeComponent* Gaze = Owner->FindComponentByClass<UPS_AI_ConvAgent_GazeComponent>()) if (UPS_AI_ConvAgent_GazeComponent* Gaze = Owner->FindComponentByClass<UPS_AI_ConvAgent_GazeComponent>())
{ {
// Use NetConversatingPawn (replicated to ALL clients) instead of if (bNetIsConversing && NetConnectedPawns.Num() > 0)
// NetConversatingPlayer->GetPawn() — PlayerControllers are only
// replicated to their owning client (bOnlyRelevantToOwner=true).
if (bNetIsConversing && NetConversatingPawn)
{ {
Gaze->bActive = true; Gaze->bActive = true;
Gaze->TargetActor = NetConversatingPawn; // Use active speaker if set, otherwise first connected pawn.
APawn* GazeTarget = NetActiveSpeakerPawn ? NetActiveSpeakerPawn : NetConnectedPawns.Last();
Gaze->TargetActor = GazeTarget;
Gaze->ResetBodyTarget(); Gaze->ResetBodyTarget();
Gaze->bEnableBodyTracking = true; Gaze->bEnableBodyTracking = true;
UE_LOG(LogPS_AI_ConvAgent_ElevenLabs, Log, UE_LOG(LogPS_AI_ConvAgent_ElevenLabs, Log,
TEXT("[NET-REP] Gaze ACTIVATED, TargetActor set to %s"), *NetConversatingPawn->GetName()); TEXT("[NET-REP] Gaze ACTIVATED, TargetActor set to %s"), *GazeTarget->GetName());
} }
else else
{ {
@ -1632,9 +1662,9 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::OnRep_ConversationState()
Gaze->TargetActor = nullptr; Gaze->TargetActor = nullptr;
Gaze->bEnableBodyTracking = false; Gaze->bEnableBodyTracking = false;
UE_LOG(LogPS_AI_ConvAgent_ElevenLabs, Warning, UE_LOG(LogPS_AI_ConvAgent_ElevenLabs, Warning,
TEXT("[NET-REP] Gaze TargetActor cleared — bNetIsConversing=%s Pawn=%s"), TEXT("[NET-REP] Gaze TargetActor cleared — bNetIsConversing=%s ConnectedPawns=%d"),
bNetIsConversing ? TEXT("true") : TEXT("false"), bNetIsConversing ? TEXT("true") : TEXT("false"),
NetConversatingPawn ? TEXT("valid") : TEXT("NULL")); NetConnectedPawns.Num());
} }
} }
else else
@ -1668,6 +1698,27 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::OnRep_ConversationState()
} }
} }
void UPS_AI_ConvAgent_ElevenLabsComponent::OnRep_ActiveSpeaker()
{
// Update gaze on clients when the speaking player changes.
AActor* Owner = GetOwner();
if (!Owner) return;
if (auto* Gaze = Owner->FindComponentByClass<UPS_AI_ConvAgent_GazeComponent>())
{
if (NetActiveSpeakerPawn)
{
Gaze->TargetActor = NetActiveSpeakerPawn;
Gaze->ResetBodyTarget();
UE_LOG(LogPS_AI_ConvAgent_ElevenLabs, Log,
TEXT("[NET-REP] ActiveSpeaker changed to %s"), *NetActiveSpeakerPawn->GetName());
}
// Don't clear gaze when null — keep looking at last speaker.
}
OnActiveSpeakerChanged.Broadcast(NetActiveSpeakerPawn, nullptr);
}
void UPS_AI_ConvAgent_ElevenLabsComponent::OnRep_Emotion() void UPS_AI_ConvAgent_ElevenLabsComponent::OnRep_Emotion()
{ {
// Fire the existing delegate so FacialExpressionComponent picks it up on clients. // Fire the existing delegate so FacialExpressionComponent picks it up on clients.
@ -1677,98 +1728,240 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::OnRep_Emotion()
// ───────────────────────────────────────────────────────────────────────────── // ─────────────────────────────────────────────────────────────────────────────
// Network: Server RPCs // Network: Server RPCs
// ───────────────────────────────────────────────────────────────────────────── // ─────────────────────────────────────────────────────────────────────────────
void UPS_AI_ConvAgent_ElevenLabsComponent::ServerRequestConversation_Implementation( void UPS_AI_ConvAgent_ElevenLabsComponent::ServerJoinConversation_Implementation(
APlayerController* RequestingPlayer) APlayerController* RequestingPlayer)
{ {
if (bNetIsConversing) APawn* Pawn = RequestingPlayer ? RequestingPlayer->GetPawn() : nullptr;
if (!Pawn) return;
// Already connected? No-op (idempotent).
if (NetConnectedPawns.Contains(Pawn)) return;
// Add to connected set.
NetConnectedPawns.Add(Pawn);
UE_LOG(LogPS_AI_ConvAgent_ElevenLabs, Log,
TEXT("[NET] Player %s JOINED conversation (now %d players)."),
*Pawn->GetName(), NetConnectedPawns.Num());
// First player joining: open WebSocket.
if (!bNetIsConversing)
{ {
// Route failure notification through the player's InteractionComponent relay. bNetIsConversing = true;
// Client RPCs on NPC actors have no owning connection.
if (RequestingPlayer) if (bPersistentSession && IsConnected())
{ {
if (APawn* Pawn = RequestingPlayer->GetPawn()) // WebSocket already alive — set up conversation state.
{ ApplyConversationGaze();
if (auto* Relay = Pawn->FindComponentByClass<UPS_AI_ConvAgent_InteractionComponent>()) OnAgentConnected.Broadcast(WebSocketProxy->GetConversationInfo());
{
Relay->ClientRelayConversationFailed(TEXT("NPC is already in conversation with another player."));
}
}
}
return;
}
bNetIsConversing = true; // Notify the joining player.
NetConversatingPlayer = RequestingPlayer; if (auto* Relay = Pawn->FindComponentByClass<UPS_AI_ConvAgent_InteractionComponent>())
NetConversatingPawn = RequestingPlayer ? RequestingPlayer->GetPawn() : nullptr;
// Update NPC gaze on the server (OnRep never fires on Authority).
ApplyConversationGaze();
// In persistent mode the WebSocket is already open — skip reconnection.
if (bPersistentSession && IsConnected())
{
// Notify the requesting client that conversation started (normally done in HandleConnected).
if (NetConversatingPawn)
{
if (auto* Relay = NetConversatingPawn->FindComponentByClass<UPS_AI_ConvAgent_InteractionComponent>())
{ {
Relay->ClientRelayConversationStarted(GetOwner(), WebSocketProxy->GetConversationInfo()); Relay->ClientRelayConversationStarted(GetOwner(), WebSocketProxy->GetConversationInfo());
} }
}
// Auto-start listening if configured. if (bAutoStartListening && TurnMode == EPS_AI_ConvAgent_TurnMode_ElevenLabs::Server)
if (bAutoStartListening && TurnMode == EPS_AI_ConvAgent_TurnMode_ElevenLabs::Server) {
StartListening();
}
}
else
{ {
StartListening(); StartConversation_Internal();
} }
} }
else else
{ {
StartConversation_Internal(); // Agent already in conversation — notify the joining player if WebSocket is connected.
if (IsConnected())
{
if (auto* Relay = Pawn->FindComponentByClass<UPS_AI_ConvAgent_InteractionComponent>())
{
Relay->ClientRelayConversationStarted(GetOwner(), WebSocketProxy->GetConversationInfo());
}
}
ApplyConversationGaze();
}
// If no active speaker yet, set gaze to the new arrival.
if (!NetActiveSpeakerPawn)
{
SetActiveSpeaker(Pawn);
} }
} }
// Backward compat: delegate to ServerJoinConversation.
void UPS_AI_ConvAgent_ElevenLabsComponent::ServerRequestConversation_Implementation(
APlayerController* RequestingPlayer)
{
ServerJoinConversation_Implementation(RequestingPlayer);
}
void UPS_AI_ConvAgent_ElevenLabsComponent::ServerLeaveConversation_Implementation(
APlayerController* LeavingPlayer)
{
APawn* Pawn = LeavingPlayer ? LeavingPlayer->GetPawn() : nullptr;
if (!Pawn) return;
if (!NetConnectedPawns.Contains(Pawn)) return;
NetConnectedPawns.Remove(Pawn);
LastSpeakTime.Remove(Pawn);
UE_LOG(LogPS_AI_ConvAgent_ElevenLabs, Log,
TEXT("[NET] Player %s LEFT conversation (now %d players)."),
*Pawn->GetName(), NetConnectedPawns.Num());
// If the leaving player was the active speaker, switch to another or null.
if (NetActiveSpeakerPawn == Pawn)
{
SetActiveSpeaker(NetConnectedPawns.Num() > 0 ? NetConnectedPawns[0] : nullptr);
}
// If no players left, fully end the conversation.
if (NetConnectedPawns.Num() == 0)
{
// Cancel any pending reconnection (ephemeral mode only).
if (!bPersistentSession)
{
bWantsReconnect = false;
ReconnectAttemptCount = 0;
}
StopListening();
bWaitingForAgentResponse = false;
StopAgentAudio();
// In persistent mode, keep the WebSocket open.
if (!bPersistentSession)
{
if (WebSocketProxy)
{
bIntentionalDisconnect = true;
WebSocketProxy->Disconnect();
WebSocketProxy = nullptr;
}
}
else
{
// Persistent mode: WebSocket stays alive but the interaction is over.
OnAgentDisconnected.Broadcast(1000, TEXT("EndConversation (persistent)"));
}
bNetIsConversing = false;
NetActiveSpeakerPawn = nullptr;
LastSpeakTime.Empty();
ApplyConversationGaze();
}
else
{
// Other players still connected — update gaze.
ApplyConversationGaze();
}
}
// Backward compat: ServerReleaseConversation has no player parameter.
// Try to resolve the caller from the first connected player.
void UPS_AI_ConvAgent_ElevenLabsComponent::ServerReleaseConversation_Implementation() void UPS_AI_ConvAgent_ElevenLabsComponent::ServerReleaseConversation_Implementation()
{ {
// Cancel any pending reconnection (ephemeral mode only). // Legacy path: find the first local player controller and leave.
if (!bPersistentSession) APlayerController* PC = GetWorld() ? GetWorld()->GetFirstPlayerController() : nullptr;
if (PC)
{ {
bWantsReconnect = false; ServerLeaveConversation_Implementation(PC);
ReconnectAttemptCount = 0;
} }
else if (NetConnectedPawns.Num() > 0)
StopListening();
bWaitingForAgentResponse = false;
StopAgentAudio();
// In persistent mode, keep the WebSocket open.
if (!bPersistentSession)
{ {
if (WebSocketProxy) // Fallback: remove all connected pawns (full release).
{ NetConnectedPawns.Empty();
bIntentionalDisconnect = true; NetActiveSpeakerPawn = nullptr;
WebSocketProxy->Disconnect(); LastSpeakTime.Empty();
WebSocketProxy = nullptr; bNetIsConversing = false;
} StopListening();
bWaitingForAgentResponse = false;
StopAgentAudio();
ApplyConversationGaze();
} }
// Clear gaze before nullifying the pawn pointer (ApplyConversationGaze
// uses NetConversatingPawn to guard against clearing someone else's target).
bNetIsConversing = false;
ApplyConversationGaze();
NetConversatingPlayer = nullptr;
NetConversatingPawn = nullptr;
} }
void UPS_AI_ConvAgent_ElevenLabsComponent::ServerSendMicAudio_Implementation( void UPS_AI_ConvAgent_ElevenLabsComponent::ServerSendMicAudio_Implementation(
const TArray<uint8>& PCMBytes) const TArray<uint8>& PCMBytes)
{ {
// Legacy single-player path: forward directly (no speaker arbitration).
if (WebSocketProxy && WebSocketProxy->IsConnected()) if (WebSocketProxy && WebSocketProxy->IsConnected())
{ {
WebSocketProxy->SendAudioChunk(PCMBytes); WebSocketProxy->SendAudioChunk(PCMBytes);
} }
} }
void UPS_AI_ConvAgent_ElevenLabsComponent::ServerSendMicAudioFromPlayer(
APawn* SpeakerPawn, const TArray<uint8>& PCMBytes)
{
if (!WebSocketProxy || !WebSocketProxy->IsConnected()) return;
// Standalone / single-player: bypass speaker arbitration entirely.
// There's only one player — no need for Contains check or speaker switching.
if (NetConnectedPawns.Num() <= 1)
{
WebSocketProxy->SendAudioChunk(PCMBytes);
// Keep speaker state in sync for debug display.
if (SpeakerPawn && NetActiveSpeakerPawn != SpeakerPawn)
{
SetActiveSpeaker(SpeakerPawn);
}
return;
}
// Multi-player path: full speaker arbitration.
if (!SpeakerPawn)
{
UE_LOG(LogPS_AI_ConvAgent_ElevenLabs, Warning,
TEXT("ServerSendMicAudioFromPlayer: null SpeakerPawn — audio dropped."));
return;
}
if (!NetConnectedPawns.Contains(SpeakerPawn))
{
UE_LOG(LogPS_AI_ConvAgent_ElevenLabs, Warning,
TEXT("ServerSendMicAudioFromPlayer: pawn %s not in NetConnectedPawns (%d players) — audio dropped."),
*SpeakerPawn->GetName(), NetConnectedPawns.Num());
return;
}
const double Now = FPlatformTime::Seconds();
LastSpeakTime.FindOrAdd(SpeakerPawn) = Now;
// If this player IS the active speaker, forward immediately.
if (NetActiveSpeakerPawn == SpeakerPawn)
{
WebSocketProxy->SendAudioChunk(PCMBytes);
return;
}
// Speaker switch: only switch if current speaker has been silent
// for at least SpeakerSwitchHysteresis seconds.
bool bCanSwitch = true;
if (NetActiveSpeakerPawn)
{
if (const double* LastActive = LastSpeakTime.Find(NetActiveSpeakerPawn))
{
if (Now - *LastActive < SpeakerSwitchHysteresis)
{
bCanSwitch = false; // Current speaker still active recently.
}
}
}
if (bCanSwitch)
{
SetActiveSpeaker(SpeakerPawn);
WebSocketProxy->SendAudioChunk(PCMBytes);
}
// else: drop this audio — current speaker still has the floor.
}
void UPS_AI_ConvAgent_ElevenLabsComponent::ServerSendTextMessage_Implementation( void UPS_AI_ConvAgent_ElevenLabsComponent::ServerSendTextMessage_Implementation(
const FString& Text) const FString& Text)
{ {
@ -1971,7 +2164,10 @@ bool UPS_AI_ConvAgent_ElevenLabsComponent::IsLocalPlayerConversating() const
{ {
if (APlayerController* PC = World->GetFirstPlayerController()) if (APlayerController* PC = World->GetFirstPlayerController())
{ {
return NetConversatingPlayer == PC; if (APawn* Pawn = PC->GetPawn())
{
return NetConnectedPawns.Contains(Pawn);
}
} }
} }
return false; return false;
@ -1997,8 +2193,12 @@ bool UPS_AI_ConvAgent_ElevenLabsComponent::ShouldUseExternalMic() const
// Network client: audio arrives via relay RPCs from InteractionComponent // Network client: audio arrives via relay RPCs from InteractionComponent
if (GetOwnerRole() != ROLE_Authority) return true; if (GetOwnerRole() != ROLE_Authority) return true;
// Authority with a remote player: audio arrives via ServerSendMicAudio RPC // Authority with remote players: audio arrives via ServerSendMicAudio RPC
if (NetConversatingPlayer && !NetConversatingPlayer->IsLocalController()) return true; // Check if any connected pawn is NOT locally controlled.
for (APawn* Pawn : NetConnectedPawns)
{
if (Pawn && !Pawn->IsLocallyControlled()) return true;
}
// InteractionComponent on local player's pawn: it manages mic + routes audio // InteractionComponent on local player's pawn: it manages mic + routes audio
if (UWorld* World = GetWorld()) if (UWorld* World = GetWorld())
@ -2026,18 +2226,36 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::ApplyConversationGaze()
auto* Gaze = Owner->FindComponentByClass<UPS_AI_ConvAgent_GazeComponent>(); auto* Gaze = Owner->FindComponentByClass<UPS_AI_ConvAgent_GazeComponent>();
if (!Gaze) return; if (!Gaze) return;
if (bNetIsConversing && NetConversatingPawn) if (bNetIsConversing && NetConnectedPawns.Num() > 0)
{ {
Gaze->bActive = true; Gaze->bActive = true;
Gaze->TargetActor = NetConversatingPawn; // Look at active speaker if set, otherwise last connected pawn.
Gaze->ResetBodyTarget(); AActor* GazeTarget = NetActiveSpeakerPawn
? static_cast<AActor*>(NetActiveSpeakerPawn)
: static_cast<AActor*>(NetConnectedPawns.Last());
if (Gaze->TargetActor != GazeTarget)
{
Gaze->TargetActor = GazeTarget;
Gaze->ResetBodyTarget();
}
Gaze->bEnableBodyTracking = true; Gaze->bEnableBodyTracking = true;
} }
else else
{ {
// Only clear if the gaze is still pointing at the departing player. // Only clear if the gaze is not pointing at an unrelated target.
// Another InteractionComponent may have already set a new TargetActor. // An InteractionComponent may have already set a new TargetActor.
if (!Gaze->TargetActor || Gaze->TargetActor == NetConversatingPawn) bool bShouldClear = !Gaze->TargetActor;
if (!bShouldClear)
{
// Clear if the target is one of our (ex-)connected pawns or null.
for (APawn* Pawn : NetConnectedPawns)
{
if (Gaze->TargetActor == Pawn) { bShouldClear = true; break; }
}
// Also clear if the target was the active speaker.
if (Gaze->TargetActor == NetActiveSpeakerPawn) bShouldClear = true;
}
if (bShouldClear)
{ {
Gaze->bActive = false; Gaze->bActive = false;
Gaze->TargetActor = nullptr; Gaze->TargetActor = nullptr;
@ -2046,6 +2264,28 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::ApplyConversationGaze()
} }
} }
void UPS_AI_ConvAgent_ElevenLabsComponent::SetActiveSpeaker(APawn* NewSpeaker)
{
if (NetActiveSpeakerPawn == NewSpeaker) return;
APawn* Previous = NetActiveSpeakerPawn;
NetActiveSpeakerPawn = NewSpeaker;
// Update gaze on server (OnRep_ActiveSpeaker fires on clients).
ApplyConversationGaze();
// Broadcast locally (server).
OnActiveSpeakerChanged.Broadcast(NewSpeaker, Previous);
if (bDebug)
{
UE_LOG(LogPS_AI_ConvAgent_ElevenLabs, Log,
TEXT("[NET] ActiveSpeaker changed: %s → %s"),
Previous ? *Previous->GetName() : TEXT("(none)"),
NewSpeaker ? *NewSpeaker->GetName() : TEXT("(none)"));
}
}
// ───────────────────────────────────────────────────────────────────────────── // ─────────────────────────────────────────────────────────────────────────────
// On-screen debug display // On-screen debug display
// ───────────────────────────────────────────────────────────────────────────── // ─────────────────────────────────────────────────────────────────────────────
@ -2112,8 +2352,27 @@ void UPS_AI_ConvAgent_ElevenLabsComponent::DrawDebugHUD() const
FString::Printf(TEXT(" Timing: session=%.1fs turn=%.1fs"), FString::Printf(TEXT(" Timing: session=%.1fs turn=%.1fs"),
SessionSec, TurnSec)); SessionSec, TurnSec));
// Multi-player
{
FString SpeakerName = NetActiveSpeakerPawn
? NetActiveSpeakerPawn->GetName()
: TEXT("none");
GEngine->AddOnScreenDebugMessage(BaseKey + 7, DisplayTime, MainColor,
FString::Printf(TEXT(" Players: %d Speaker: %s"),
NetConnectedPawns.Num(), *SpeakerName));
}
// Latencies (from last completed turn)
if (LastLatencies.TotalMs > 0.0f)
{
GEngine->AddOnScreenDebugMessage(BaseKey + 8, DisplayTime, MainColor,
FString::Printf(TEXT(" Latency: total=%.0fms (stt>gen=%.0fms gen>audio=%.0fms) ear=%.0fms"),
LastLatencies.TotalMs, LastLatencies.STTToGenMs,
LastLatencies.GenToAudioMs, LastLatencies.EndToEarMs));
}
// Reconnection // Reconnection
GEngine->AddOnScreenDebugMessage(BaseKey + 7, DisplayTime, GEngine->AddOnScreenDebugMessage(BaseKey + 9, DisplayTime,
bWantsReconnect ? FColor::Red : MainColor, bWantsReconnect ? FColor::Red : MainColor,
FString::Printf(TEXT(" Reconnect: %d/%d attempts%s"), FString::Printf(TEXT(" Reconnect: %d/%d attempts%s"),
ReconnectAttemptCount, MaxReconnectAttempts, ReconnectAttemptCount, MaxReconnectAttempts,

View File

@ -209,28 +209,13 @@ UPS_AI_ConvAgent_ElevenLabsComponent* UPS_AI_ConvAgent_InteractionComponent::Eva
} }
} }
// Get local player's pawn for occupied-NPC check.
// Use pawn (replicated to ALL clients) instead of PlayerController
// (only replicated to owning client due to bOnlyRelevantToOwner=true).
APlayerController* LocalPC = World->GetFirstPlayerController();
APawn* LocalPawn = LocalPC ? LocalPC->GetPawn() : nullptr;
for (UPS_AI_ConvAgent_ElevenLabsComponent* Agent : Agents) for (UPS_AI_ConvAgent_ElevenLabsComponent* Agent : Agents)
{ {
AActor* AgentActor = Agent->GetOwner(); AActor* AgentActor = Agent->GetOwner();
if (!AgentActor) continue; if (!AgentActor) continue;
// Network: skip agents that are in conversation with a different player. // Multi-player: do NOT skip occupied agents. Any agent in range can be
// Use NetConversatingPawn (replicated to all) instead of NetConversatingPlayer // selected — the player will join the shared conversation via ServerJoinConversation.
// (NULL on remote clients because APlayerController has bOnlyRelevantToOwner=true).
// Null-check NetConversatingPawn: it may not have replicated yet when
// bNetIsConversing arrives first (OnRep ordering is not guaranteed).
if (Agent->bNetIsConversing
&& Agent->NetConversatingPawn
&& Agent->NetConversatingPawn != LocalPawn)
{
continue;
}
const FVector AgentLocation = AgentActor->GetActorLocation() + FVector(0.0f, 0.0f, AgentEyeLevelOffset); const FVector AgentLocation = AgentActor->GetActorLocation() + FVector(0.0f, 0.0f, AgentEyeLevelOffset);
const FVector ToAgent = AgentLocation - ViewLocation; const FVector ToAgent = AgentLocation - ViewLocation;
@ -293,13 +278,26 @@ void UPS_AI_ConvAgent_InteractionComponent::SetSelectedAgent(UPS_AI_ConvAgent_El
OldAgent->GetOwner() ? *OldAgent->GetOwner()->GetName() : TEXT("(null)")); OldAgent->GetOwner() ? *OldAgent->GetOwner()->GetName() : TEXT("(null)"));
} }
// ── Conversation: end if auto-started ──────────────────────────── // ── Conversation: leave shared conversation if auto-started ─────
// If we auto-started the conversation on selection, end it now so the // Use Leave instead of End so other players can keep talking to the agent.
// NPC becomes available for other players. EndConversation() also calls
// StopListening() internally, so we skip the separate StopListening below.
if (bAutoStartConversation && (OldAgent->IsConnected() || OldAgent->bNetIsConversing)) if (bAutoStartConversation && (OldAgent->IsConnected() || OldAgent->bNetIsConversing))
{ {
OldAgent->EndConversation(); if (GetOwnerRole() == ROLE_Authority || (GetWorld() && GetWorld()->GetNetMode() == NM_Standalone))
{
APlayerController* PC = nullptr;
if (APawn* Pawn = Cast<APawn>(GetOwner()))
{
PC = Cast<APlayerController>(Pawn->GetController());
}
if (PC)
{
OldAgent->ServerLeaveConversation_Implementation(PC);
}
}
else
{
ServerRelayLeaveConversation(OldAgent->GetOwner());
}
} }
else if (bAutoManageListening) else if (bAutoManageListening)
{ {
@ -355,24 +353,32 @@ void UPS_AI_ConvAgent_InteractionComponent::SetSelectedAgent(UPS_AI_ConvAgent_El
NewAgent->GetOwner() ? *NewAgent->GetOwner()->GetName() : TEXT("(null)")); NewAgent->GetOwner() ? *NewAgent->GetOwner()->GetName() : TEXT("(null)"));
} }
// Network: auto-start conversation if no active conversation. // Multi-player: join the shared conversation (idempotent — no-op if already joined).
// In persistent session mode, the WebSocket stays connected but // In persistent session mode, the WebSocket stays connected but
// bNetIsConversing is false between interactions — we still need // bNetIsConversing is false between interactions — we still need
// to call StartConversation() to re-activate gaze and mic. // to call Join to re-activate gaze and mic.
// Only when bAutoStartConversation is true — otherwise the user must // Only when bAutoStartConversation is true — otherwise the user must
// call StartConversationWithSelectedAgent() explicitly (e.g. on key press). // call StartConversationWithSelectedAgent() explicitly (e.g. on key press).
if (bAutoStartConversation && !NewAgent->bNetIsConversing) if (bAutoStartConversation)
{ {
// On the server, call StartConversation() directly. // On the server, call directly.
// On clients, route through our relay RPC (clients can't call // On clients, route through our relay RPC (clients can't call
// Server RPCs on NPC actors — no owning connection). // Server RPCs on NPC actors — no owning connection).
if (GetOwnerRole() == ROLE_Authority || (GetWorld() && GetWorld()->GetNetMode() == NM_Standalone)) if (GetOwnerRole() == ROLE_Authority || (GetWorld() && GetWorld()->GetNetMode() == NM_Standalone))
{ {
NewAgent->StartConversation(); APlayerController* PC = nullptr;
if (APawn* Pawn = Cast<APawn>(GetOwner()))
{
PC = Cast<APlayerController>(Pawn->GetController());
}
if (PC)
{
NewAgent->ServerJoinConversation_Implementation(PC);
}
} }
else else
{ {
ServerRelayStartConversation(NewAgent->GetOwner()); ServerRelayJoinConversation(NewAgent->GetOwner());
} }
// Ensure mic is capturing so we can route audio to the new agent. // Ensure mic is capturing so we can route audio to the new agent.
@ -492,29 +498,32 @@ void UPS_AI_ConvAgent_InteractionComponent::StartConversationWithSelectedAgent()
return; return;
} }
if (Agent->IsConnected() || Agent->bNetIsConversing)
{
if (bDebug)
{
UE_LOG(LogPS_AI_ConvAgent_Select, Log, TEXT("StartConversationWithSelectedAgent: agent already connected/conversing."));
}
return;
}
if (bDebug) if (bDebug)
{ {
UE_LOG(LogPS_AI_ConvAgent_Select, Log, TEXT("StartConversationWithSelectedAgent: starting conversation with %s"), UE_LOG(LogPS_AI_ConvAgent_Select, Log, TEXT("StartConversationWithSelectedAgent: %s (connected=%s conversing=%s)"),
Agent->GetOwner() ? *Agent->GetOwner()->GetName() : TEXT("(null)")); Agent->GetOwner() ? *Agent->GetOwner()->GetName() : TEXT("(null)"),
Agent->IsConnected() ? TEXT("true") : TEXT("false"),
Agent->bNetIsConversing ? TEXT("true") : TEXT("false"));
} }
// Route through relay on clients (can't call Server RPCs on NPC actors). // Always call Join (idempotent) — even if the WebSocket is already connected
// (persistent session mode), we need to add the pawn to NetConnectedPawns
// and set bNetIsConversing to true.
if (GetOwnerRole() == ROLE_Authority || (GetWorld() && GetWorld()->GetNetMode() == NM_Standalone)) if (GetOwnerRole() == ROLE_Authority || (GetWorld() && GetWorld()->GetNetMode() == NM_Standalone))
{ {
Agent->StartConversation(); APlayerController* PC = nullptr;
if (APawn* Pawn = Cast<APawn>(GetOwner()))
{
PC = Cast<APlayerController>(Pawn->GetController());
}
if (PC)
{
Agent->ServerJoinConversation_Implementation(PC);
}
} }
else else
{ {
ServerRelayStartConversation(Agent->GetOwner()); ServerRelayJoinConversation(Agent->GetOwner());
} }
// Ensure mic is capturing so we can route audio to the agent. // Ensure mic is capturing so we can route audio to the agent.
@ -726,7 +735,7 @@ void UPS_AI_ConvAgent_InteractionComponent::GetLifetimeReplicatedProps(
// the player's pawn (owned by the client) and forward to the NPC on the server. // the player's pawn (owned by the client) and forward to the NPC on the server.
// ───────────────────────────────────────────────────────────────────────────── // ─────────────────────────────────────────────────────────────────────────────
void UPS_AI_ConvAgent_InteractionComponent::ServerRelayStartConversation_Implementation( void UPS_AI_ConvAgent_InteractionComponent::ServerRelayJoinConversation_Implementation(
AActor* AgentActor) AActor* AgentActor)
{ {
if (!AgentActor) return; if (!AgentActor) return;
@ -739,21 +748,39 @@ void UPS_AI_ConvAgent_InteractionComponent::ServerRelayStartConversation_Impleme
{ {
PC = Cast<APlayerController>(Pawn->GetController()); PC = Cast<APlayerController>(Pawn->GetController());
} }
if (!PC) return; if (!PC) return;
// Forward to the NPC's implementation directly (we're already on the server). Agent->ServerJoinConversation_Implementation(PC);
Agent->ServerRequestConversation_Implementation(PC);
} }
void UPS_AI_ConvAgent_InteractionComponent::ServerRelayEndConversation_Implementation( void UPS_AI_ConvAgent_InteractionComponent::ServerRelayLeaveConversation_Implementation(
AActor* AgentActor) AActor* AgentActor)
{ {
if (!AgentActor) return; if (!AgentActor) return;
auto* Agent = AgentActor->FindComponentByClass<UPS_AI_ConvAgent_ElevenLabsComponent>(); auto* Agent = AgentActor->FindComponentByClass<UPS_AI_ConvAgent_ElevenLabsComponent>();
if (!Agent) return; if (!Agent) return;
Agent->ServerReleaseConversation_Implementation(); APlayerController* PC = nullptr;
if (APawn* Pawn = Cast<APawn>(GetOwner()))
{
PC = Cast<APlayerController>(Pawn->GetController());
}
if (!PC) return;
Agent->ServerLeaveConversation_Implementation(PC);
}
// Backward compat wrappers.
void UPS_AI_ConvAgent_InteractionComponent::ServerRelayStartConversation_Implementation(
AActor* AgentActor)
{
ServerRelayJoinConversation_Implementation(AgentActor);
}
void UPS_AI_ConvAgent_InteractionComponent::ServerRelayEndConversation_Implementation(
AActor* AgentActor)
{
ServerRelayLeaveConversation_Implementation(AgentActor);
} }
void UPS_AI_ConvAgent_InteractionComponent::ServerRelayMicAudio_Implementation( void UPS_AI_ConvAgent_InteractionComponent::ServerRelayMicAudio_Implementation(
@ -767,15 +794,11 @@ void UPS_AI_ConvAgent_InteractionComponent::ServerRelayMicAudio_Implementation(
// instead of 3200 bytes per 100ms chunk). Decode back to raw PCM here // instead of 3200 bytes per 100ms chunk). Decode back to raw PCM here
// before forwarding to the WebSocket which expects uncompressed int16. // before forwarding to the WebSocket which expects uncompressed int16.
TArray<uint8> DecodedPCM; TArray<uint8> DecodedPCM;
if (Agent->DecompressMicAudio(AudioBytes, DecodedPCM)) const TArray<uint8>& PCMToSend = Agent->DecompressMicAudio(AudioBytes, DecodedPCM) ? DecodedPCM : AudioBytes;
{
Agent->ServerSendMicAudio_Implementation(DecodedPCM); // Pass the sender pawn for multi-player speaker arbitration.
} APawn* SenderPawn = Cast<APawn>(GetOwner());
else Agent->ServerSendMicAudioFromPlayer(SenderPawn, PCMToSend);
{
// Raw PCM fallback (no Opus or data is already uncompressed).
Agent->ServerSendMicAudio_Implementation(AudioBytes);
}
} }
void UPS_AI_ConvAgent_InteractionComponent::ServerRelaySendText_Implementation( void UPS_AI_ConvAgent_InteractionComponent::ServerRelaySendText_Implementation(

View File

@ -1109,6 +1109,12 @@ void UPS_AI_ConvAgent_LipSyncComponent::TickComponent(float DeltaTime, ELevelTic
} }
} }
// Snapshot for thread-safe anim node read (GetCurrentBlendshapes)
{
FScopeLock Lock(&BlendshapeLock);
ThreadSafeBlendshapes = CurrentBlendshapes;
}
// Auto-apply morph targets if a target mesh is set // Auto-apply morph targets if a target mesh is set
if (TargetMesh) if (TargetMesh)
{ {
@ -1201,6 +1207,10 @@ void UPS_AI_ConvAgent_LipSyncComponent::ResetToNeutral()
// Clear blendshapes so the mouth returns to fully neutral // Clear blendshapes so the mouth returns to fully neutral
CurrentBlendshapes.Reset(); CurrentBlendshapes.Reset();
{
FScopeLock Lock(&BlendshapeLock);
ThreadSafeBlendshapes.Reset();
}
PreviousBlendshapes.Reset(); PreviousBlendshapes.Reset();
LastConsumedVisemes.Reset(); LastConsumedVisemes.Reset();
} }

View File

@ -91,6 +91,13 @@ DECLARE_DYNAMIC_MULTICAST_DELEGATE_TwoParams(FOnAgentEmotionChanged,
DECLARE_DYNAMIC_MULTICAST_DELEGATE_OneParam(FOnAgentClientToolCall, DECLARE_DYNAMIC_MULTICAST_DELEGATE_OneParam(FOnAgentClientToolCall,
const FPS_AI_ConvAgent_ClientToolCall_ElevenLabs&, ToolCall); const FPS_AI_ConvAgent_ClientToolCall_ElevenLabs&, ToolCall);
/**
* Fired when the active speaker changes in a multi-player shared conversation.
* Use this for UI indicators showing who is talking, or to drive camera focus.
*/
DECLARE_DYNAMIC_MULTICAST_DELEGATE_TwoParams(FOnActiveSpeakerChanged,
APawn*, NewSpeaker, APawn*, PreviousSpeaker);
// Non-dynamic delegate for raw agent audio (high-frequency, C++ consumers only). // Non-dynamic delegate for raw agent audio (high-frequency, C++ consumers only).
// Delivers PCM chunks as int16, 16kHz mono, little-endian. // Delivers PCM chunks as int16, 16kHz mono, little-endian.
DECLARE_MULTICAST_DELEGATE_OneParam(FOnAgentAudioData, const TArray<uint8>& /*PCMData*/); DECLARE_MULTICAST_DELEGATE_OneParam(FOnAgentAudioData, const TArray<uint8>& /*PCMData*/);
@ -298,6 +305,12 @@ public:
meta = (ToolTip = "Fires for custom client tool calls (not set_emotion).\nYou must respond via GetWebSocketProxy()->SendClientToolResult().")) meta = (ToolTip = "Fires for custom client tool calls (not set_emotion).\nYou must respond via GetWebSocketProxy()->SendClientToolResult()."))
FOnAgentClientToolCall OnAgentClientToolCall; FOnAgentClientToolCall OnAgentClientToolCall;
/** Fired when the active speaker changes in a multi-player shared conversation.
* NewSpeaker is null when no one is speaking (idle timeout). */
UPROPERTY(BlueprintAssignable, Category = "PS AI ConvAgent|ElevenLabs|Events",
meta = (ToolTip = "Fires when the speaking player changes.\nNewSpeaker is null when no one is speaking."))
FOnActiveSpeakerChanged OnActiveSpeakerChanged;
/** The current emotion of the agent, as set by the "set_emotion" client tool. Defaults to Neutral. */ /** The current emotion of the agent, as set by the "set_emotion" client tool. Defaults to Neutral. */
UPROPERTY(ReplicatedUsing = OnRep_Emotion, BlueprintReadOnly, Category = "PS AI ConvAgent|ElevenLabs") UPROPERTY(ReplicatedUsing = OnRep_Emotion, BlueprintReadOnly, Category = "PS AI ConvAgent|ElevenLabs")
EPS_AI_ConvAgent_Emotion CurrentEmotion = EPS_AI_ConvAgent_Emotion::Neutral; EPS_AI_ConvAgent_Emotion CurrentEmotion = EPS_AI_ConvAgent_Emotion::Neutral;
@ -313,20 +326,39 @@ public:
// ── Network state (replicated) ─────────────────────────────────────────── // ── Network state (replicated) ───────────────────────────────────────────
/** True when a player is currently in conversation with this NPC. /** True when one or more players are in conversation with this NPC.
* Replicated to all clients so InteractionComponents can skip occupied NPCs. */ * Replicated to all clients for UI feedback, gaze, LOD, etc. */
UPROPERTY(ReplicatedUsing = OnRep_ConversationState, BlueprintReadOnly, Category = "PS AI ConvAgent|Network") UPROPERTY(ReplicatedUsing = OnRep_ConversationState, BlueprintReadOnly, Category = "PS AI ConvAgent|Network")
bool bNetIsConversing = false; bool bNetIsConversing = false;
/** The player controller currently in conversation with this NPC (null if free). /** All player pawns currently in conversation with this NPC.
* Only valid on server and owning client (PlayerControllers are not replicated to other clients). */ * Multiple players can share the same agent audio is routed via speaker arbitration.
* Replicated to ALL clients for gaze target and LOD distance checks. */
UPROPERTY(ReplicatedUsing = OnRep_ConversationState, BlueprintReadOnly, Category = "PS AI ConvAgent|Network") UPROPERTY(ReplicatedUsing = OnRep_ConversationState, BlueprintReadOnly, Category = "PS AI ConvAgent|Network")
TObjectPtr<APlayerController> NetConversatingPlayer = nullptr; TArray<TObjectPtr<APawn>> NetConnectedPawns;
/** The pawn of the conversating player. Replicated to ALL clients (unlike PlayerController). /** The player currently speaking (active audio sender). Null if no one is speaking.
* Used by remote clients for gaze target (head/eye tracking) and LOD distance checks. */ * Used by GazeComponent for target switching the NPC looks at whoever is talking.
UPROPERTY(ReplicatedUsing = OnRep_ConversationState, BlueprintReadOnly, Category = "PS AI ConvAgent|Network") * Replicated to ALL clients so gaze updates everywhere. */
TObjectPtr<APawn> NetConversatingPawn = nullptr; UPROPERTY(ReplicatedUsing = OnRep_ActiveSpeaker, BlueprintReadOnly, Category = "PS AI ConvAgent|Network")
TObjectPtr<APawn> NetActiveSpeakerPawn = nullptr;
// ── Multi-player speaker arbitration (server only) ──────────────────────
/** Minimum seconds of silence from the current speaker before allowing a speaker switch.
* Prevents rapid ping-pong switching when both players make brief sounds. */
UPROPERTY(EditAnywhere, BlueprintReadWrite, Category = "PS AI ConvAgent|Network|MultiPlayer",
meta = (ClampMin = "0.0", ClampMax = "5.0",
ToolTip = "Minimum silence from current speaker before switching to another.\nPrevents rapid gaze flip-flop."))
float SpeakerSwitchHysteresis = 0.3f;
/** Seconds after last speech before the active speaker is cleared.
* When cleared, gaze returns to the last speaker position (or closest connected player).
* Set to 0 to never clear (last speaker stays active indefinitely). */
UPROPERTY(EditAnywhere, BlueprintReadWrite, Category = "PS AI ConvAgent|Network|MultiPlayer",
meta = (ClampMin = "0.0",
ToolTip = "Seconds of silence before clearing the active speaker.\n0 = last speaker stays active indefinitely."))
float SpeakerIdleTimeout = 3.0f;
// ── Network LOD ────────────────────────────────────────────────────────── // ── Network LOD ──────────────────────────────────────────────────────────
@ -345,12 +377,21 @@ public:
// ── Network RPCs ───────────────────────────────────────────────────────── // ── Network RPCs ─────────────────────────────────────────────────────────
/** Request exclusive conversation with this NPC. Called by clients; the server /** Join a shared conversation with this NPC. Multiple players can join.
* checks availability and opens the WebSocket connection if the NPC is free. */ * If this is the first player, the WebSocket connection is opened.
* If the agent is already in conversation, the player simply joins. */
UFUNCTION(Server, Reliable)
void ServerJoinConversation(APlayerController* RequestingPlayer);
/** Leave the shared conversation. If this is the last player, the conversation ends. */
UFUNCTION(Server, Reliable)
void ServerLeaveConversation(APlayerController* LeavingPlayer);
/** [Backward compat] Delegates to ServerJoinConversation. */
UFUNCTION(Server, Reliable) UFUNCTION(Server, Reliable)
void ServerRequestConversation(APlayerController* RequestingPlayer); void ServerRequestConversation(APlayerController* RequestingPlayer);
/** Release this NPC so other players can talk to it. */ /** [Backward compat] Delegates to ServerLeaveConversation. */
UFUNCTION(Server, Reliable) UFUNCTION(Server, Reliable)
void ServerReleaseConversation(); void ServerReleaseConversation();
@ -452,6 +493,10 @@ public:
UFUNCTION(BlueprintCallable, Category = "PS AI ConvAgent|ElevenLabs") UFUNCTION(BlueprintCallable, Category = "PS AI ConvAgent|ElevenLabs")
void FeedExternalAudio(const TArray<float>& FloatPCM); void FeedExternalAudio(const TArray<float>& FloatPCM);
/** Receive mic audio from a specific player with speaker arbitration.
* Server decides whether to forward to ElevenLabs based on who is currently speaking. */
void ServerSendMicAudioFromPlayer(APawn* SpeakerPawn, const TArray<uint8>& PCMBytes);
// ── State queries ───────────────────────────────────────────────────────── // ── State queries ─────────────────────────────────────────────────────────
UFUNCTION(BlueprintPure, Category = "PS AI ConvAgent|ElevenLabs") UFUNCTION(BlueprintPure, Category = "PS AI ConvAgent|ElevenLabs")
@ -499,6 +544,9 @@ private:
UFUNCTION() UFUNCTION()
void OnRep_ConversationState(); void OnRep_ConversationState();
UFUNCTION()
void OnRep_ActiveSpeaker();
UFUNCTION() UFUNCTION()
void OnRep_Emotion(); void OnRep_Emotion();
@ -584,6 +632,19 @@ private:
double TurnStartTime = 0.0; // Set in StartListening — when mic opens. double TurnStartTime = 0.0; // Set in StartListening — when mic opens.
double TurnEndTime = 0.0; // Set in StopListening — when mic closes. double TurnEndTime = 0.0; // Set in StopListening — when mic closes.
double AgentSpeakStart = 0.0; // Set in EnqueueAgentAudio (first chunk) — when audio begins. double AgentSpeakStart = 0.0; // Set in EnqueueAgentAudio (first chunk) — when audio begins.
double GenerationStartTime = 0.0; // Set in HandleAgentResponseStarted — server starts generating.
double PlaybackStartTime = 0.0; // Set when audio playback actually starts (post pre-buffer).
// Last-turn latency snapshot (ms) — updated per turn, displayed on debug HUD.
// Persists between turns so the HUD always shows the most recent measurement.
struct FDebugLatencies
{
float STTToGenMs = 0.0f; // TurnEnd → server starts generating
float GenToAudioMs = 0.0f; // Server generating → first audio chunk
float TotalMs = 0.0f; // TurnEnd → first audio chunk
float EndToEarMs = 0.0f; // TurnEnd → audio playback starts (user-perceived)
};
FDebugLatencies LastLatencies;
// Accumulates incoming PCM bytes until the audio component needs data. // Accumulates incoming PCM bytes until the audio component needs data.
// Uses a read offset instead of RemoveAt(0,N) to avoid O(n) memmove every // Uses a read offset instead of RemoveAt(0,N) to avoid O(n) memmove every
@ -659,7 +720,7 @@ private:
// ── Network helpers ────────────────────────────────────────────────────── // ── Network helpers ──────────────────────────────────────────────────────
/** Distance from this NPC to the local player's pawn. Returns MAX_FLT if unavailable. */ /** Distance from this NPC to the local player's pawn. Returns MAX_FLT if unavailable. */
float GetDistanceToLocalPlayer() const; float GetDistanceToLocalPlayer() const;
/** True if the local player controller is the one currently in conversation. */ /** True if the local player controller is one of the connected players. */
bool IsLocalPlayerConversating() const; bool IsLocalPlayerConversating() const;
/** Internal: performs the actual WebSocket setup (called by both local and RPC paths). */ /** Internal: performs the actual WebSocket setup (called by both local and RPC paths). */
void StartConversation_Internal(); void StartConversation_Internal();
@ -672,10 +733,18 @@ private:
bool ShouldUseExternalMic() const; bool ShouldUseExternalMic() const;
/** Update the NPC's GazeComponent from the current conversation state. /** Update the NPC's GazeComponent from the current conversation state.
* Called on the server when bNetIsConversing / NetConversatingPawn change, * Uses NetActiveSpeakerPawn if set, otherwise the first connected pawn.
* because OnRep_ConversationState never fires on the Authority. */ * Called on the server (OnRep never fires on the Authority). */
void ApplyConversationGaze(); void ApplyConversationGaze();
// ── Multi-player speaker arbitration (server only) ──────────────────────
/** Set the active speaker and update gaze. Skips if same as current. */
void SetActiveSpeaker(APawn* NewSpeaker);
/** Last non-silent audio timestamp per connected player (server only). */
TMap<TObjectPtr<APawn>, double> LastSpeakTime;
/** Draw on-screen debug info (called from TickComponent when bDebug). */ /** Draw on-screen debug info (called from TickComponent when bDebug). */
void DrawDebugHUD() const; void DrawDebugHUD() const;
}; };

View File

@ -201,11 +201,21 @@ public:
// These relay RPCs live on the player's pawn (which IS owned by the client), // These relay RPCs live on the player's pawn (which IS owned by the client),
// forwarding commands to the NPC's ElevenLabsComponent on the server. // forwarding commands to the NPC's ElevenLabsComponent on the server.
/** Relay: request exclusive conversation with an NPC agent. */ /** Relay: join a shared conversation with an NPC agent. */
UFUNCTION(Server, Reliable)
void ServerRelayJoinConversation(AActor* AgentActor);
/** Relay: leave a shared conversation with an NPC agent. */
UFUNCTION(Server, Reliable)
void ServerRelayLeaveConversation(AActor* AgentActor);
/** Relay: [backward compat] request conversation with an NPC agent.
* Delegates to ServerRelayJoinConversation. */
UFUNCTION(Server, Reliable) UFUNCTION(Server, Reliable)
void ServerRelayStartConversation(AActor* AgentActor); void ServerRelayStartConversation(AActor* AgentActor);
/** Relay: release conversation with an NPC agent. */ /** Relay: [backward compat] release conversation with an NPC agent.
* Delegates to ServerRelayLeaveConversation. */
UFUNCTION(Server, Reliable) UFUNCTION(Server, Reliable)
void ServerRelayEndConversation(AActor* AgentActor); void ServerRelayEndConversation(AActor* AgentActor);

View File

@ -170,9 +170,14 @@ public:
UFUNCTION(BlueprintCallable, Category = "PS AI ConvAgent|LipSync") UFUNCTION(BlueprintCallable, Category = "PS AI ConvAgent|LipSync")
TMap<FName, float> GetCurrentVisemes() const { return SmoothedVisemes; } TMap<FName, float> GetCurrentVisemes() const { return SmoothedVisemes; }
/** Get current ARKit blendshape weights (MetaHuman compatible: jawOpen, mouthFunnel, mouthClose, etc.). */ /** Get current ARKit blendshape weights (MetaHuman compatible: jawOpen, mouthFunnel, mouthClose, etc.).
* Thread-safe: returns a snapshot updated each tick. Safe to call from anim worker threads. */
UFUNCTION(BlueprintCallable, Category = "PS AI ConvAgent|LipSync") UFUNCTION(BlueprintCallable, Category = "PS AI ConvAgent|LipSync")
TMap<FName, float> GetCurrentBlendshapes() const { return CurrentBlendshapes; } TMap<FName, float> GetCurrentBlendshapes() const
{
FScopeLock Lock(&BlendshapeLock);
return ThreadSafeBlendshapes;
}
/** True when the agent is currently producing speech audio. /** True when the agent is currently producing speech audio.
* When false, lip sync releases mouth curves to let emotion curves through. */ * When false, lip sync releases mouth curves to let emotion curves through. */
@ -268,9 +273,14 @@ private:
// Smoothed viseme weights (interpolated each tick, exposed via GetCurrentVisemes) // Smoothed viseme weights (interpolated each tick, exposed via GetCurrentVisemes)
TMap<FName, float> SmoothedVisemes; TMap<FName, float> SmoothedVisemes;
// ARKit blendshape weights derived from SmoothedVisemes (exposed via GetCurrentBlendshapes) // ARKit blendshape weights derived from SmoothedVisemes (game-thread working copy)
TMap<FName, float> CurrentBlendshapes; TMap<FName, float> CurrentBlendshapes;
// Thread-safe snapshot of CurrentBlendshapes, updated each tick under BlendshapeLock.
// Read by the anim worker thread via GetCurrentBlendshapes().
TMap<FName, float> ThreadSafeBlendshapes;
mutable FCriticalSection BlendshapeLock;
// Previous frame's blendshape values for additional output smoothing // Previous frame's blendshape values for additional output smoothing
TMap<FName, float> PreviousBlendshapes; TMap<FName, float> PreviousBlendshapes;