Streaming API Implementation Guide
Overview
The streaming APIs allow the frontend to receive real-time updates as agents process queries via Server-Sent Events (SSE). Instead of waiting for a complete response, the frontend receives events as they occur:
- Token-by-token streaming from the LLM (real-time typing effect)
- Tool execution notifications (show "searching...", "processing...", etc.)
- Agent thinking/processing status
- Final response with full context
- Error events for graceful handling
Architecture:
- SSE via API routes: Real-time streaming during generation
- Socket.io: Continues to emit completed messages (existing behavior preserved)
This enables a responsive UI where users see the agent "thinking" and generating responses in real-time, while your existing Socket.io notification system stays intact.
Quick Start
Basic Text Streaming
const streamChat = async (message: string, chatInstanceId: string, chatModelId: string) => {
const response = await fetch(`/api/chat?stream=true`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
message,
chatInstanceId,
chatModelId,
}),
});
const reader = response.body?.getReader();
const decoder = new TextDecoder();
while (reader) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const event = JSON.parse(line.slice(6));
handleStreamEvent(event);
}
}
}
};
API Endpoints
1. Text Chat Streaming
Endpoint: POST /api/chat?stream=true
Request Body:
{
message: string; // User message
chatInstanceId: string; // Chat instance ID
chatModelId: string; // Chat model ID
stream?: boolean; // Can also be passed in body (defaults to query param)
includeTokens?: boolean; // Include token events (default: true)
}
Query Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
stream | "true" | - | Enable streaming mode |
includeTokens | "true" / "false" | "true" | Include token-level events |
Example:
// Full example with all options
const response = await fetch('/api/chat?stream=true&includeTokens=true', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer YOUR_TOKEN' // If auth required
},
body: JSON.stringify({
message: 'Hello, how can you help me?',
chatInstanceId: 'ci_xxxxx',
chatModelId: 'cm_xxxxx',
}),
});
2. Voice Mode Streaming (Text Response)
Endpoint: POST /api/chat/voice (with stream: true in form data)
This endpoint transcribes audio and streams the text response (no TTS audio generation).
Request (multipart/form-data):
| Field | Type | Required | Description |
|---|---|---|---|
audio | File | Yes | Audio file (webm, mp3, wav, etc.) |
chatModelId | string | Yes | Chat model ID |
chatInstanceId | string | Yes | Chat instance ID |
stream | "true" | No | Enable streaming mode |
Example:
const formData = new FormData();
formData.append('audio', audioBlob, 'recording.webm');
formData.append('chatModelId', 'cm_xxxxx');
formData.append('chatInstanceId', 'ci_xxxxx');
formData.append('stream', 'true');
const response = await fetch('/api/chat/voice', {
method: 'POST',
body: formData,
});
// Parse SSE events
const reader = response.body?.getReader();
const decoder = new TextDecoder();
while (reader) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const event = JSON.parse(line.slice(6));
// Handle: transcription, stream_start, token, agent_response, message_saved, stream_end
handleStreamEvent(event);
}
}
}
Non-streaming fallback: Omit stream field to get the standard JSON response.
3. Voice Response Streaming (with TTS Audio)
Endpoint: POST /api/chat/voice-response (with stream: true in form data)
Request (multipart/form-data):
| Field | Type | Required | Description |
|---|---|---|---|
audio | File | Yes | Audio file (webm, mp3, wav, etc.) |
chatModelId | string | Yes | Chat model ID |
chatInstanceId | string | Yes | Chat instance ID |
stream | "true" | No | Enable streaming mode |
voice | string | No | TTS voice preference (default: "nova") |
ttsEnabled | "true" / "false" | No | Generate TTS audio (default: true) |
voiceConversation | "true" / "false" | No | Voice conversation mode |
Example:
const formData = new FormData();
formData.append('audio', audioBlob, 'recording.webm');
formData.append('chatModelId', 'cm_xxxxx');
formData.append('chatInstanceId', 'ci_xxxxx');
formData.append('stream', 'true');
formData.append('voice', 'nova');
const response = await fetch('/api/chat/voice-response', {
method: 'POST',
body: formData,
});
// Parse SSE events same as text streaming
Stream Events
All events follow a base structure:
interface BaseStreamEvent {
type: StreamEventType; // Event type identifier
timestamp: Date; // When the event occurred
conversationId: string; // Chat instance ID
agentId: string; // Agent that emitted this event
}
Event Types Reference
| Event Type | Description | When Emitted |
|---|---|---|
stream_start | Stream has begun | First event |
token | New token from LLM | During generation |
tool_start | Tool execution started | When agent calls a tool |
tool_end | Tool execution completed | When tool returns |
agent_thinking | Agent is processing | During reasoning |
agent_response | Final agent response | When complete |
message_saved | Message saved to DB | After agent responds |
stream_end | Stream ended | Last event |
error | Error occurred | On error |
Voice-Specific Events
| Event Type | Description |
|---|---|
transcription | STT transcription result |
tts_start | TTS generation started |
tts_complete | TTS audio ready |
tts_error | TTS generation failed |
Event Type Definitions
stream_start
interface StreamStartEvent {
type: 'stream_start';
timestamp: Date;
conversationId: string;
agentId: string;
query: string; // Original user query
}
token
interface TokenEvent {
type: 'token';
timestamp: Date;
conversationId: string;
agentId: string;
content: string; // The new token
cumulativeContent: string; // All tokens so far
}
tool_start
interface ToolStartEvent {
type: 'tool_start';
timestamp: Date;
conversationId: string;
agentId: string;
toolName: string; // Name of the tool
toolInput: any; // Input passed to tool
toolCallId: string; // Unique ID for this call
}
tool_end
interface ToolEndEvent {
type: 'tool_end';
timestamp: Date;
conversationId: string;
agentId: string;
toolName: string;
toolInput: any;
toolOutput: any; // Tool result
toolCallId: string;
durationMs: number; // Execution time
}
agent_thinking
interface AgentThinkingEvent {
type: 'agent_thinking';
timestamp: Date;
conversationId: string;
agentId: string;
description?: string; // What the agent is doing
}
agent_response
interface AgentResponseEvent {
type: 'agent_response';
timestamp: Date;
conversationId: string;
agentId: string;
content: string; // Complete response content
response?: any; // Full response object
}
message_saved
interface MessageSavedEvent {
type: 'message_saved';
timestamp: Date;
conversationId: string;
agentId: string;
messageId: string; // Saved message ID
message: {
id: string;
text: string;
created_at: Date;
updated_at: Date;
isSent: boolean;
chatInstanceId: string;
user: {
id: string;
email: string;
name: string;
image: string | null;
};
};
}
stream_end
interface StreamEndEvent {
type: 'stream_end';
timestamp: Date;
conversationId: string;
agentId: string;
success: boolean;
totalDurationMs?: number;
reason?: string; // If success is false
}
error
interface ErrorEvent {
type: 'error';
timestamp: Date;
conversationId: string;
agentId: string;
message: string; // Error message
code?: string; // Error code
}
transcription (Voice only)
interface TranscriptionEvent {
type: 'transcription';
timestamp: Date;
conversationId: string;
agentId: 'stt';
transcription: {
text: string;
language?: string;
duration?: number;
confidence?: number;
};
performance: {
transcriptionMs: number;
};
}
tts_complete (Voice only)
interface TTSCompleteEvent {
type: 'tts_complete';
timestamp: Date;
conversationId: string;
agentId: 'tts';
audio: {
data: string; // Base64 encoded audio
format: string; // 'mp3', 'wav', etc.
size: number; // Bytes
voice: string; // Voice used
};
performance: {
ttsMs: number;
};
}
React Implementation
Hook: useStreamingChat
// hooks/useStreamingChat.ts
import { useState, useCallback, useRef } from 'react';
interface StreamState {
isStreaming: boolean;
currentContent: string;
currentTool: string | null;
isThinking: boolean;
error: string | null;
}
interface UseStreamingChatOptions {
chatInstanceId: string;
chatModelId: string;
onToken?: (token: string, cumulative: string) => void;
onToolStart?: (toolName: string, input: any) => void;
onToolEnd?: (toolName: string, output: any, durationMs: number) => void;
onComplete?: (content: string, messageId: string) => void;
onError?: (error: string) => void;
}
export function useStreamingChat({
chatInstanceId,
chatModelId,
onToken,
onToolStart,
onToolEnd,
onComplete,
onError,
}: UseStreamingChatOptions) {
const [state, setState] = useState<StreamState>({
isStreaming: false,
currentContent: '',
currentTool: null,
isThinking: false,
error: null,
});
const abortControllerRef = useRef<AbortController | null>(null);
const sendMessage = useCallback(async (message: string) => {
// Cancel any existing stream
if (abortControllerRef.current) {
abortControllerRef.current.abort();
}
abortControllerRef.current = new AbortController();
setState({
isStreaming: true,
currentContent: '',
currentTool: null,
isThinking: false,
error: null,
});
try {
const response = await fetch('/api/chat?stream=true', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message, chatInstanceId, chatModelId }),
signal: abortControllerRef.current.signal,
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}
const reader = response.body?.getReader();
const decoder = new TextDecoder();
if (!reader) {
throw new Error('No response body');
}
let finalContent = '';
let savedMessageId = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n\n');
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
try {
const event = JSON.parse(line.slice(6));
switch (event.type) {
case 'token':
setState(s => ({ ...s, currentContent: event.cumulativeContent }));
onToken?.(event.content, event.cumulativeContent);
break;
case 'tool_start':
setState(s => ({ ...s, currentTool: event.toolName }));
onToolStart?.(event.toolName, event.toolInput);
break;
case 'tool_end':
setState(s => ({ ...s, currentTool: null }));
onToolEnd?.(event.toolName, event.toolOutput, event.durationMs);
break;
case 'agent_thinking':
setState(s => ({ ...s, isThinking: true }));
break;
case 'agent_response':
finalContent = event.content;
setState(s => ({ ...s, currentContent: event.content, isThinking: false }));
break;
case 'message_saved':
savedMessageId = event.messageId;
break;
case 'stream_end':
if (event.success) {
onComplete?.(finalContent, savedMessageId);
}
break;
case 'error':
setState(s => ({ ...s, error: event.message }));
onError?.(event.message);
break;
}
} catch (e) {
console.error('Failed to parse SSE event:', e);
}
}
}
} catch (error: any) {
if (error.name !== 'AbortError') {
const errorMessage = error.message || 'Stream failed';
setState(s => ({ ...s, error: errorMessage }));
onError?.(errorMessage);
}
} finally {
setState(s => ({ ...s, isStreaming: false }));
}
}, [chatInstanceId, chatModelId, onToken, onToolStart, onToolEnd, onComplete, onError]);
const cancel = useCallback(() => {
abortControllerRef.current?.abort();
setState(s => ({ ...s, isStreaming: false }));
}, []);
return {
...state,
sendMessage,
cancel,
};
}
Usage Example
// components/Chat.tsx
import { useStreamingChat } from '@/hooks/useStreamingChat';
import { useState } from 'react';
export function Chat({ chatInstanceId, chatModelId }) {
const [messages, setMessages] = useState<Message[]>([]);
const [input, setInput] = useState('');
const {
isStreaming,
currentContent,
currentTool,
isThinking,
error,
sendMessage,
cancel,
} = useStreamingChat({
chatInstanceId,
chatModelId,
onComplete: (content, messageId) => {
setMessages(prev => [
...prev,
{ id: messageId, role: 'assistant', content }
]);
},
});
const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
if (!input.trim() || isStreaming) return;
// Add user message
setMessages(prev => [
...prev,
{ id: Date.now().toString(), role: 'user', content: input }
]);
sendMessage(input);
setInput('');
};
return (
<div className="chat-container">
<!-- Messages -->
<div className="messages">
{messages.map(msg => (
<div key={msg.id} className={`message ${msg.role}`}>
{msg.content}
</div>
))}
<!-- Streaming response -->
{isStreaming && (
<div className="message assistant streaming">
{isThinking && <span className="thinking">Thinking...</span>}
{currentTool && <span className="tool">Using {currentTool}...</span>}
{currentContent && <span>{currentContent}</span>}
{!currentContent && !isThinking && !currentTool && (
<span className="typing-indicator">...</span>
)}
</div>
)}
{error && <div className="error">{error}</div>}
</div>
<!-- Input -->
<form onSubmit={handleSubmit}>
<input
value={input}
onChange={e => setInput(e.target.value)}
placeholder="Type a message..."
disabled={isStreaming}
/>
{isStreaming ? (
<button type="button" onClick={cancel}>Cancel</button>
) : (
<button type="submit">Send</button>
)}
</form>
</div>
);
}
Voice Streaming Implementation
// hooks/useVoiceStreaming.ts
import { useState, useCallback, useRef } from 'react';
interface VoiceStreamState {
isProcessing: boolean;
transcription: string | null;
currentContent: string;
audioData: string | null; // Base64 audio
error: string | null;
}
export function useVoiceStreaming({
chatInstanceId,
chatModelId,
voice = 'nova',
onTranscription,
onToken,
onAudioReady,
onComplete,
onError,
}: {
chatInstanceId: string;
chatModelId: string;
voice?: string;
onTranscription?: (text: string) => void;
onToken?: (token: string, cumulative: string) => void;
onAudioReady?: (audioBase64: string, format: string) => void;
onComplete?: (response: string) => void;
onError?: (error: string) => void;
}) {
const [state, setState] = useState<VoiceStreamState>({
isProcessing: false,
transcription: null,
currentContent: '',
audioData: null,
error: null,
});
const abortRef = useRef<AbortController | null>(null);
const sendAudio = useCallback(async (audioBlob: Blob) => {
if (abortRef.current) {
abortRef.current.abort();
}
abortRef.current = new AbortController();
setState({
isProcessing: true,
transcription: null,
currentContent: '',
audioData: null,
error: null,
});
try {
const formData = new FormData();
formData.append('audio', audioBlob, 'recording.webm');
formData.append('chatModelId', chatModelId);
formData.append('chatInstanceId', chatInstanceId);
formData.append('stream', 'true');
formData.append('voice', voice);
const response = await fetch('/api/chat/voice-response', {
method: 'POST',
body: formData,
signal: abortRef.current.signal,
});
const reader = response.body?.getReader();
const decoder = new TextDecoder();
if (!reader) throw new Error('No response body');
let finalContent = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n\n');
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const event = JSON.parse(line.slice(6));
switch (event.type) {
case 'transcription':
setState(s => ({ ...s, transcription: event.transcription.text }));
onTranscription?.(event.transcription.text);
break;
case 'token':
setState(s => ({ ...s, currentContent: event.cumulativeContent }));
onToken?.(event.content, event.cumulativeContent);
break;
case 'agent_response':
finalContent = event.content;
setState(s => ({ ...s, currentContent: event.content }));
break;
case 'tts_complete':
setState(s => ({ ...s, audioData: event.audio.data }));
onAudioReady?.(event.audio.data, event.audio.format);
break;
case 'stream_end':
if (event.success) {
onComplete?.(finalContent);
}
break;
case 'error':
case 'tts_error':
setState(s => ({ ...s, error: event.message }));
onError?.(event.message);
break;
}
}
}
} catch (error: any) {
if (error.name !== 'AbortError') {
setState(s => ({ ...s, error: error.message }));
onError?.(error.message);
}
} finally {
setState(s => ({ ...s, isProcessing: false }));
}
}, [chatInstanceId, chatModelId, voice, onTranscription, onToken, onAudioReady, onComplete, onError]);
const cancel = useCallback(() => {
abortRef.current?.abort();
setState(s => ({ ...s, isProcessing: false }));
}, []);
// Helper to play audio response
const playAudio = useCallback((base64Audio: string, format = 'mp3') => {
const audio = new Audio(`data:audio/${format};base64,${base64Audio}`);
audio.play();
return audio;
}, []);
return {
...state,
sendAudio,
cancel,
playAudio,
};
}
Error Handling
// Comprehensive error handling example
const handleStreamEvent = (event: StreamEvent) => {
switch (event.type) {
case 'error':
console.error('Stream error:', event.message);
// Show user-friendly error
if (event.message.includes('rate limit')) {
showToast('Too many requests. Please wait a moment.');
} else if (event.message.includes('unauthorized')) {
showToast('Session expired. Please refresh.');
} else {
showToast('Something went wrong. Please try again.');
}
break;
case 'stream_end':
if (!event.success) {
if (event.reason === 'usage_limit_reached') {
showToast('Usage limit reached. Please upgrade.');
} else if (event.reason === 'handled_by_agent') {
showToast('A human agent will respond shortly.');
}
}
break;
}
};
Fallback to Non-Streaming
If streaming is not supported or fails, fall back to the standard API:
const sendMessage = async (message: string) => {
try {
// Try streaming first
await streamMessage(message);
} catch (streamError) {
console.warn('Streaming failed, falling back to standard API', streamError);
// Fallback to non-streaming
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message, chatInstanceId, chatModelId }),
});
const data = await response.json();
handleStandardResponse(data);
}
};
Performance Considerations
- Token Buffering: Consider debouncing token updates (e.g., 50ms) to reduce re-renders
- Connection Timeouts: Set appropriate timeouts for slow networks
- Abort Handling: Always abort previous streams before starting new ones
- Memory: Clear cumulative content after stream ends to prevent memory leaks
// Token debouncing example
const debouncedSetContent = useMemo(
() => debounce((content: string) => setCurrentContent(content), 50),
[]
);
// Use in token handler
case 'token':
debouncedSetContent(event.cumulativeContent);
break;
Socket.io Integration
The streaming API works alongside Socket.io. After streaming completes:
- SSE delivers real-time tokens and events during generation
- Socket.io receives the final
message:completeevent for other clients
Your existing Socket.io listeners for message events will continue to work. You don't need to modify them.
// Existing Socket.io listener (unchanged)
socket.on('message', (message) => {
// This still works - receives completed messages
addMessageToChat(message);
});
// Streaming gives you real-time updates during generation
// Socket.io notifies other connected clients when complete
Smartflow (Workflow) Streaming
When a CHAT_SERVICE workflow (Smartflow) is configured, streaming works seamlessly:
How It Works
- Workflow with AI_REQUEST + returnResult=true: Full token-level streaming
- Workflow without AI_REQUEST: Executes normally, result yielded as
agent_response - No matching workflow: Falls back to standard agent streaming
Workflow Streaming Flow
User Message → Check CHAT_SERVICE Workflows
↓
[Workflow Found?]
↓ Yes ↓ No
Execute with Streaming Use Agent Streaming
↓
[Has AI_REQUEST with
returnResult=true?]
↓ Yes ↓ No
Stream AI Response Execute normally,
(token by token) yield result
Events from Workflow Streaming
// Workflow with AI_REQUEST (full streaming)
{ type: 'stream_start', agentId: 'workflow', workflowName: 'MyWorkflow' }
{ type: 'token', content: 'Hello', cumulativeContent: 'Hello' }
{ type: 'token', content: ' world', cumulativeContent: 'Hello world' }
{ type: 'agent_response', content: 'Hello world!', agentId: 'assistant', source: 'workflow_ai_request' }
{ type: 'message_saved', messageId: '...', agentId: 'workflow' }
{ type: 'stream_end', success: true, source: 'workflow' }
Detecting Workflow vs Direct Agent
Check the source or agentId field:
switch (event.agentId) {
case 'workflow':
// Response came from a Smartflow
break;
case 'assistant':
// Response came from the AI agent (might be within workflow or direct)
break;
}
// Or check event source
if (event.source === 'workflow_ai_request') {
console.log('This token came from an AI_REQUEST in a workflow');
}
Frontend Example with Workflow Support
const handleStreamEvent = (event: StreamEvent) => {
switch (event.type) {
case 'stream_start':
if (event.workflowName) {
setStatus(`Executing workflow: ${event.workflowName}`);
} else {
setStatus('Processing...');
}
break;
case 'token':
// Same handling regardless of source
setCurrentContent(event.cumulativeContent);
break;
case 'agent_response':
setCurrentContent(event.content);
break;
case 'stream_end':
if (event.source === 'workflow') {
console.log('Response generated by workflow');
}
setStatus('Complete');
break;
}
};
Browser Support
SSE is supported in all modern browsers:
- Chrome 6+
- Firefox 6+
- Safari 5+
- Edge 79+
- Mobile browsers
No polyfills required.