Zum Haaptinhalt sprangen

Streaming API Implementation Guide

Overview

The streaming APIs allow the frontend to receive real-time updates as agents process queries via Server-Sent Events (SSE). Instead of waiting for a complete response, the frontend receives events as they occur:

  • Token-by-token streaming from the LLM (real-time typing effect)
  • Tool execution notifications (show "searching...", "processing...", etc.)
  • Agent thinking/processing status
  • Final response with full context
  • Error events for graceful handling

Architecture:

  • SSE via API routes: Real-time streaming during generation
  • Socket.io: Continues to emit completed messages (existing behavior preserved)

This enables a responsive UI where users see the agent "thinking" and generating responses in real-time, while your existing Socket.io notification system stays intact.


Quick Start

Basic Text Streaming

const streamChat = async (message: string, chatInstanceId: string, chatModelId: string) => {
const response = await fetch(`/api/chat?stream=true`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
message,
chatInstanceId,
chatModelId,
}),
});

const reader = response.body?.getReader();
const decoder = new TextDecoder();

while (reader) {
const { done, value } = await reader.read();
if (done) break;

const chunk = decoder.decode(value);
const lines = chunk.split('\n\n');

for (const line of lines) {
if (line.startsWith('data: ')) {
const event = JSON.parse(line.slice(6));
handleStreamEvent(event);
}
}
}
};

API Endpoints

1. Text Chat Streaming

Endpoint: POST /api/chat?stream=true

Request Body:

{
message: string; // User message
chatInstanceId: string; // Chat instance ID
chatModelId: string; // Chat model ID
stream?: boolean; // Can also be passed in body (defaults to query param)
includeTokens?: boolean; // Include token events (default: true)
}

Query Parameters:

ParameterTypeDefaultDescription
stream"true"-Enable streaming mode
includeTokens"true" / "false""true"Include token-level events

Example:

// Full example with all options
const response = await fetch('/api/chat?stream=true&includeTokens=true', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer YOUR_TOKEN' // If auth required
},
body: JSON.stringify({
message: 'Hello, how can you help me?',
chatInstanceId: 'ci_xxxxx',
chatModelId: 'cm_xxxxx',
}),
});

2. Voice Mode Streaming (Text Response)

Endpoint: POST /api/chat/voice (with stream: true in form data)

This endpoint transcribes audio and streams the text response (no TTS audio generation).

Request (multipart/form-data):

FieldTypeRequiredDescription
audioFileYesAudio file (webm, mp3, wav, etc.)
chatModelIdstringYesChat model ID
chatInstanceIdstringYesChat instance ID
stream"true"NoEnable streaming mode

Example:

const formData = new FormData();
formData.append('audio', audioBlob, 'recording.webm');
formData.append('chatModelId', 'cm_xxxxx');
formData.append('chatInstanceId', 'ci_xxxxx');
formData.append('stream', 'true');

const response = await fetch('/api/chat/voice', {
method: 'POST',
body: formData,
});

// Parse SSE events
const reader = response.body?.getReader();
const decoder = new TextDecoder();

while (reader) {
const { done, value } = await reader.read();
if (done) break;

const chunk = decoder.decode(value);
const lines = chunk.split('\n\n');

for (const line of lines) {
if (line.startsWith('data: ')) {
const event = JSON.parse(line.slice(6));
// Handle: transcription, stream_start, token, agent_response, message_saved, stream_end
handleStreamEvent(event);
}
}
}

Non-streaming fallback: Omit stream field to get the standard JSON response.


3. Voice Response Streaming (with TTS Audio)

Endpoint: POST /api/chat/voice-response (with stream: true in form data)

Request (multipart/form-data):

FieldTypeRequiredDescription
audioFileYesAudio file (webm, mp3, wav, etc.)
chatModelIdstringYesChat model ID
chatInstanceIdstringYesChat instance ID
stream"true"NoEnable streaming mode
voicestringNoTTS voice preference (default: "nova")
ttsEnabled"true" / "false"NoGenerate TTS audio (default: true)
voiceConversation"true" / "false"NoVoice conversation mode

Example:

const formData = new FormData();
formData.append('audio', audioBlob, 'recording.webm');
formData.append('chatModelId', 'cm_xxxxx');
formData.append('chatInstanceId', 'ci_xxxxx');
formData.append('stream', 'true');
formData.append('voice', 'nova');

const response = await fetch('/api/chat/voice-response', {
method: 'POST',
body: formData,
});

// Parse SSE events same as text streaming

Stream Events

All events follow a base structure:

interface BaseStreamEvent {
type: StreamEventType; // Event type identifier
timestamp: Date; // When the event occurred
conversationId: string; // Chat instance ID
agentId: string; // Agent that emitted this event
}

Event Types Reference

Event TypeDescriptionWhen Emitted
stream_startStream has begunFirst event
tokenNew token from LLMDuring generation
tool_startTool execution startedWhen agent calls a tool
tool_endTool execution completedWhen tool returns
agent_thinkingAgent is processingDuring reasoning
agent_responseFinal agent responseWhen complete
message_savedMessage saved to DBAfter agent responds
stream_endStream endedLast event
errorError occurredOn error

Voice-Specific Events

Event TypeDescription
transcriptionSTT transcription result
tts_startTTS generation started
tts_completeTTS audio ready
tts_errorTTS generation failed

Event Type Definitions

stream_start

interface StreamStartEvent {
type: 'stream_start';
timestamp: Date;
conversationId: string;
agentId: string;
query: string; // Original user query
}

token

interface TokenEvent {
type: 'token';
timestamp: Date;
conversationId: string;
agentId: string;
content: string; // The new token
cumulativeContent: string; // All tokens so far
}

tool_start

interface ToolStartEvent {
type: 'tool_start';
timestamp: Date;
conversationId: string;
agentId: string;
toolName: string; // Name of the tool
toolInput: any; // Input passed to tool
toolCallId: string; // Unique ID for this call
}

tool_end

interface ToolEndEvent {
type: 'tool_end';
timestamp: Date;
conversationId: string;
agentId: string;
toolName: string;
toolInput: any;
toolOutput: any; // Tool result
toolCallId: string;
durationMs: number; // Execution time
}

agent_thinking

interface AgentThinkingEvent {
type: 'agent_thinking';
timestamp: Date;
conversationId: string;
agentId: string;
description?: string; // What the agent is doing
}

agent_response

interface AgentResponseEvent {
type: 'agent_response';
timestamp: Date;
conversationId: string;
agentId: string;
content: string; // Complete response content
response?: any; // Full response object
}

message_saved

interface MessageSavedEvent {
type: 'message_saved';
timestamp: Date;
conversationId: string;
agentId: string;
messageId: string; // Saved message ID
message: {
id: string;
text: string;
created_at: Date;
updated_at: Date;
isSent: boolean;
chatInstanceId: string;
user: {
id: string;
email: string;
name: string;
image: string | null;
};
};
}

stream_end

interface StreamEndEvent {
type: 'stream_end';
timestamp: Date;
conversationId: string;
agentId: string;
success: boolean;
totalDurationMs?: number;
reason?: string; // If success is false
}

error

interface ErrorEvent {
type: 'error';
timestamp: Date;
conversationId: string;
agentId: string;
message: string; // Error message
code?: string; // Error code
}

transcription (Voice only)

interface TranscriptionEvent {
type: 'transcription';
timestamp: Date;
conversationId: string;
agentId: 'stt';
transcription: {
text: string;
language?: string;
duration?: number;
confidence?: number;
};
performance: {
transcriptionMs: number;
};
}

tts_complete (Voice only)

interface TTSCompleteEvent {
type: 'tts_complete';
timestamp: Date;
conversationId: string;
agentId: 'tts';
audio: {
data: string; // Base64 encoded audio
format: string; // 'mp3', 'wav', etc.
size: number; // Bytes
voice: string; // Voice used
};
performance: {
ttsMs: number;
};
}

React Implementation

Hook: useStreamingChat

// hooks/useStreamingChat.ts
import { useState, useCallback, useRef } from 'react';

interface StreamState {
isStreaming: boolean;
currentContent: string;
currentTool: string | null;
isThinking: boolean;
error: string | null;
}

interface UseStreamingChatOptions {
chatInstanceId: string;
chatModelId: string;
onToken?: (token: string, cumulative: string) => void;
onToolStart?: (toolName: string, input: any) => void;
onToolEnd?: (toolName: string, output: any, durationMs: number) => void;
onComplete?: (content: string, messageId: string) => void;
onError?: (error: string) => void;
}

export function useStreamingChat({
chatInstanceId,
chatModelId,
onToken,
onToolStart,
onToolEnd,
onComplete,
onError,
}: UseStreamingChatOptions) {
const [state, setState] = useState<StreamState>({
isStreaming: false,
currentContent: '',
currentTool: null,
isThinking: false,
error: null,
});

const abortControllerRef = useRef<AbortController | null>(null);

const sendMessage = useCallback(async (message: string) => {
// Cancel any existing stream
if (abortControllerRef.current) {
abortControllerRef.current.abort();
}

abortControllerRef.current = new AbortController();

setState({
isStreaming: true,
currentContent: '',
currentTool: null,
isThinking: false,
error: null,
});

try {
const response = await fetch('/api/chat?stream=true', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message, chatInstanceId, chatModelId }),
signal: abortControllerRef.current.signal,
});

if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}

const reader = response.body?.getReader();
const decoder = new TextDecoder();

if (!reader) {
throw new Error('No response body');
}

let finalContent = '';
let savedMessageId = '';

while (true) {
const { done, value } = await reader.read();
if (done) break;

const chunk = decoder.decode(value);
const lines = chunk.split('\n\n');

for (const line of lines) {
if (!line.startsWith('data: ')) continue;

try {
const event = JSON.parse(line.slice(6));

switch (event.type) {
case 'token':
setState(s => ({ ...s, currentContent: event.cumulativeContent }));
onToken?.(event.content, event.cumulativeContent);
break;

case 'tool_start':
setState(s => ({ ...s, currentTool: event.toolName }));
onToolStart?.(event.toolName, event.toolInput);
break;

case 'tool_end':
setState(s => ({ ...s, currentTool: null }));
onToolEnd?.(event.toolName, event.toolOutput, event.durationMs);
break;

case 'agent_thinking':
setState(s => ({ ...s, isThinking: true }));
break;

case 'agent_response':
finalContent = event.content;
setState(s => ({ ...s, currentContent: event.content, isThinking: false }));
break;

case 'message_saved':
savedMessageId = event.messageId;
break;

case 'stream_end':
if (event.success) {
onComplete?.(finalContent, savedMessageId);
}
break;

case 'error':
setState(s => ({ ...s, error: event.message }));
onError?.(event.message);
break;
}
} catch (e) {
console.error('Failed to parse SSE event:', e);
}
}
}
} catch (error: any) {
if (error.name !== 'AbortError') {
const errorMessage = error.message || 'Stream failed';
setState(s => ({ ...s, error: errorMessage }));
onError?.(errorMessage);
}
} finally {
setState(s => ({ ...s, isStreaming: false }));
}
}, [chatInstanceId, chatModelId, onToken, onToolStart, onToolEnd, onComplete, onError]);

const cancel = useCallback(() => {
abortControllerRef.current?.abort();
setState(s => ({ ...s, isStreaming: false }));
}, []);

return {
...state,
sendMessage,
cancel,
};
}

Usage Example

// components/Chat.tsx
import { useStreamingChat } from '@/hooks/useStreamingChat';
import { useState } from 'react';

export function Chat({ chatInstanceId, chatModelId }) {
const [messages, setMessages] = useState<Message[]>([]);
const [input, setInput] = useState('');

const {
isStreaming,
currentContent,
currentTool,
isThinking,
error,
sendMessage,
cancel,
} = useStreamingChat({
chatInstanceId,
chatModelId,
onComplete: (content, messageId) => {
setMessages(prev => [
...prev,
{ id: messageId, role: 'assistant', content }
]);
},
});

const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
if (!input.trim() || isStreaming) return;

// Add user message
setMessages(prev => [
...prev,
{ id: Date.now().toString(), role: 'user', content: input }
]);

sendMessage(input);
setInput('');
};

return (
<div className="chat-container">
<!-- Messages -->
<div className="messages">
{messages.map(msg => (
<div key={msg.id} className={`message ${msg.role}`}>
{msg.content}
</div>
))}

<!-- Streaming response -->
{isStreaming && (
<div className="message assistant streaming">
{isThinking && <span className="thinking">Thinking...</span>}
{currentTool && <span className="tool">Using {currentTool}...</span>}
{currentContent && <span>{currentContent}</span>}
{!currentContent && !isThinking && !currentTool && (
<span className="typing-indicator">...</span>
)}
</div>
)}

{error && <div className="error">{error}</div>}
</div>

<!-- Input -->
<form onSubmit={handleSubmit}>
<input
value={input}
onChange={e => setInput(e.target.value)}
placeholder="Type a message..."
disabled={isStreaming}
/>
{isStreaming ? (
<button type="button" onClick={cancel}>Cancel</button>
) : (
<button type="submit">Send</button>
)}
</form>
</div>
);
}

Voice Streaming Implementation

// hooks/useVoiceStreaming.ts
import { useState, useCallback, useRef } from 'react';

interface VoiceStreamState {
isProcessing: boolean;
transcription: string | null;
currentContent: string;
audioData: string | null; // Base64 audio
error: string | null;
}

export function useVoiceStreaming({
chatInstanceId,
chatModelId,
voice = 'nova',
onTranscription,
onToken,
onAudioReady,
onComplete,
onError,
}: {
chatInstanceId: string;
chatModelId: string;
voice?: string;
onTranscription?: (text: string) => void;
onToken?: (token: string, cumulative: string) => void;
onAudioReady?: (audioBase64: string, format: string) => void;
onComplete?: (response: string) => void;
onError?: (error: string) => void;
}) {
const [state, setState] = useState<VoiceStreamState>({
isProcessing: false,
transcription: null,
currentContent: '',
audioData: null,
error: null,
});

const abortRef = useRef<AbortController | null>(null);

const sendAudio = useCallback(async (audioBlob: Blob) => {
if (abortRef.current) {
abortRef.current.abort();
}
abortRef.current = new AbortController();

setState({
isProcessing: true,
transcription: null,
currentContent: '',
audioData: null,
error: null,
});

try {
const formData = new FormData();
formData.append('audio', audioBlob, 'recording.webm');
formData.append('chatModelId', chatModelId);
formData.append('chatInstanceId', chatInstanceId);
formData.append('stream', 'true');
formData.append('voice', voice);

const response = await fetch('/api/chat/voice-response', {
method: 'POST',
body: formData,
signal: abortRef.current.signal,
});

const reader = response.body?.getReader();
const decoder = new TextDecoder();

if (!reader) throw new Error('No response body');

let finalContent = '';

while (true) {
const { done, value } = await reader.read();
if (done) break;

const chunk = decoder.decode(value);
const lines = chunk.split('\n\n');

for (const line of lines) {
if (!line.startsWith('data: ')) continue;

const event = JSON.parse(line.slice(6));

switch (event.type) {
case 'transcription':
setState(s => ({ ...s, transcription: event.transcription.text }));
onTranscription?.(event.transcription.text);
break;

case 'token':
setState(s => ({ ...s, currentContent: event.cumulativeContent }));
onToken?.(event.content, event.cumulativeContent);
break;

case 'agent_response':
finalContent = event.content;
setState(s => ({ ...s, currentContent: event.content }));
break;

case 'tts_complete':
setState(s => ({ ...s, audioData: event.audio.data }));
onAudioReady?.(event.audio.data, event.audio.format);
break;

case 'stream_end':
if (event.success) {
onComplete?.(finalContent);
}
break;

case 'error':
case 'tts_error':
setState(s => ({ ...s, error: event.message }));
onError?.(event.message);
break;
}
}
}
} catch (error: any) {
if (error.name !== 'AbortError') {
setState(s => ({ ...s, error: error.message }));
onError?.(error.message);
}
} finally {
setState(s => ({ ...s, isProcessing: false }));
}
}, [chatInstanceId, chatModelId, voice, onTranscription, onToken, onAudioReady, onComplete, onError]);

const cancel = useCallback(() => {
abortRef.current?.abort();
setState(s => ({ ...s, isProcessing: false }));
}, []);

// Helper to play audio response
const playAudio = useCallback((base64Audio: string, format = 'mp3') => {
const audio = new Audio(`data:audio/${format};base64,${base64Audio}`);
audio.play();
return audio;
}, []);

return {
...state,
sendAudio,
cancel,
playAudio,
};
}

Error Handling

// Comprehensive error handling example
const handleStreamEvent = (event: StreamEvent) => {
switch (event.type) {
case 'error':
console.error('Stream error:', event.message);

// Show user-friendly error
if (event.message.includes('rate limit')) {
showToast('Too many requests. Please wait a moment.');
} else if (event.message.includes('unauthorized')) {
showToast('Session expired. Please refresh.');
} else {
showToast('Something went wrong. Please try again.');
}
break;

case 'stream_end':
if (!event.success) {
if (event.reason === 'usage_limit_reached') {
showToast('Usage limit reached. Please upgrade.');
} else if (event.reason === 'handled_by_agent') {
showToast('A human agent will respond shortly.');
}
}
break;
}
};

Fallback to Non-Streaming

If streaming is not supported or fails, fall back to the standard API:

const sendMessage = async (message: string) => {
try {
// Try streaming first
await streamMessage(message);
} catch (streamError) {
console.warn('Streaming failed, falling back to standard API', streamError);

// Fallback to non-streaming
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message, chatInstanceId, chatModelId }),
});

const data = await response.json();
handleStandardResponse(data);
}
};

Performance Considerations

  1. Token Buffering: Consider debouncing token updates (e.g., 50ms) to reduce re-renders
  2. Connection Timeouts: Set appropriate timeouts for slow networks
  3. Abort Handling: Always abort previous streams before starting new ones
  4. Memory: Clear cumulative content after stream ends to prevent memory leaks
// Token debouncing example
const debouncedSetContent = useMemo(
() => debounce((content: string) => setCurrentContent(content), 50),
[]
);

// Use in token handler
case 'token':
debouncedSetContent(event.cumulativeContent);
break;

Socket.io Integration

The streaming API works alongside Socket.io. After streaming completes:

  1. SSE delivers real-time tokens and events during generation
  2. Socket.io receives the final message:complete event for other clients

Your existing Socket.io listeners for message events will continue to work. You don't need to modify them.

// Existing Socket.io listener (unchanged)
socket.on('message', (message) => {
// This still works - receives completed messages
addMessageToChat(message);
});

// Streaming gives you real-time updates during generation
// Socket.io notifies other connected clients when complete

Smartflow (Workflow) Streaming

When a CHAT_SERVICE workflow (Smartflow) is configured, streaming works seamlessly:

How It Works

  1. Workflow with AI_REQUEST + returnResult=true: Full token-level streaming
  2. Workflow without AI_REQUEST: Executes normally, result yielded as agent_response
  3. No matching workflow: Falls back to standard agent streaming

Workflow Streaming Flow

User Message → Check CHAT_SERVICE Workflows

[Workflow Found?]
↓ Yes ↓ No
Execute with Streaming Use Agent Streaming

[Has AI_REQUEST with
returnResult=true?]
↓ Yes ↓ No
Stream AI Response Execute normally,
(token by token) yield result

Events from Workflow Streaming

// Workflow with AI_REQUEST (full streaming)
{ type: 'stream_start', agentId: 'workflow', workflowName: 'MyWorkflow' }
{ type: 'token', content: 'Hello', cumulativeContent: 'Hello' }
{ type: 'token', content: ' world', cumulativeContent: 'Hello world' }
{ type: 'agent_response', content: 'Hello world!', agentId: 'assistant', source: 'workflow_ai_request' }
{ type: 'message_saved', messageId: '...', agentId: 'workflow' }
{ type: 'stream_end', success: true, source: 'workflow' }

Detecting Workflow vs Direct Agent

Check the source or agentId field:

switch (event.agentId) {
case 'workflow':
// Response came from a Smartflow
break;
case 'assistant':
// Response came from the AI agent (might be within workflow or direct)
break;
}

// Or check event source
if (event.source === 'workflow_ai_request') {
console.log('This token came from an AI_REQUEST in a workflow');
}

Frontend Example with Workflow Support

const handleStreamEvent = (event: StreamEvent) => {
switch (event.type) {
case 'stream_start':
if (event.workflowName) {
setStatus(`Executing workflow: ${event.workflowName}`);
} else {
setStatus('Processing...');
}
break;

case 'token':
// Same handling regardless of source
setCurrentContent(event.cumulativeContent);
break;

case 'agent_response':
setCurrentContent(event.content);
break;

case 'stream_end':
if (event.source === 'workflow') {
console.log('Response generated by workflow');
}
setStatus('Complete');
break;
}
};

Browser Support

SSE is supported in all modern browsers:

  • Chrome 6+
  • Firefox 6+
  • Safari 5+
  • Edge 79+
  • Mobile browsers

No polyfills required.

Bereet fir Är
Benotzererfarung ze verbesseren?

Déployéiert AI Assistenten déi Clienten begeeschteren an mat Ärem Betrib skaliéieren.

GDPR Konform