
Imagine you're building a customer support chatbot that needs to handle complex technical queries. A user asks: "Can you walk me through setting up distributed logging across our microservices architecture with proper correlation IDs and error aggregation?" With traditional request-response patterns, your user sits staring at a blank screen for 15-20 seconds while the LLM processes this complex query. They start wondering if the system froze. They might even refresh the page or give up entirely.
Now imagine the same scenario with streaming responses. Within 500 milliseconds, words start flowing onto the screen: "I'll help you set up distributed logging for your microservices. Let's start with..." The user immediately knows the system is working and can begin reading and processing the response as it arrives. This isn't just a better user experience—it's a fundamentally different interaction paradigm that makes AI feel more conversational and responsive.
Streaming responses represent a critical shift from traditional batch processing to real-time, interactive AI interfaces. But implementing them properly requires understanding not just the API calls, but the entire architecture stack: WebSocket connections, backpressure handling, client-side rendering patterns, error recovery, and the subtle but crucial details that separate professional implementations from demos that break under load.
What you'll learn:
You should have solid experience with asynchronous JavaScript/Python programming, REST APIs, and basic WebSocket concepts. Familiarity with React or similar reactive frameworks is helpful but not required. You should understand HTTP streaming concepts and have worked with LLM APIs like OpenAI's GPT models.
Before diving into implementation, let's understand what happens when an LLM generates a streaming response. Traditional API calls are atomic—you send a request and receive a complete response. Streaming breaks this into a continuous flow of partial responses called "chunks" or "tokens."
# Traditional approach - blocks until complete
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": "Explain quantum computing"}]
)
complete_text = response.choices[0].message.content
print(complete_text) # All at once after 10+ seconds
# Streaming approach - yields partial responses
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": "Explain quantum computing"}],
stream=True
)
for chunk in response:
if chunk.choices[0].delta.content:
partial_text = chunk.choices[0].delta.content
print(partial_text, end="", flush=True) # Appears incrementally
The magic happens in that stream=True parameter, but the real complexity lies in everything that comes after: how do you transport these chunks to your frontend, render them smoothly, handle connection failures, and maintain state consistency?
SSE provides a simple, standards-based approach for streaming data from server to client. Unlike WebSockets, SSE is unidirectional and automatically handles connection recovery, making it ideal for LLM streaming where the client primarily consumes data.
from flask import Flask, Response, request
import json
import openai
from typing import Iterator
app = Flask(__name__)
def generate_streaming_response(messages: list) -> Iterator[str]:
"""Generate SSE-formatted streaming response from OpenAI."""
try:
response = openai.ChatCompletion.create(
model="gpt-4",
messages=messages,
stream=True,
temperature=0.7
)
for chunk in response:
# Handle different chunk types
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
# Format as SSE event
data = json.dumps({
"type": "content",
"content": content,
"timestamp": time.time()
})
yield f"data: {data}\n\n"
elif chunk.choices[0].finish_reason:
# Signal completion
data = json.dumps({
"type": "complete",
"finish_reason": chunk.choices[0].finish_reason
})
yield f"data: {data}\n\n"
except Exception as e:
# Always send error through the stream
error_data = json.dumps({
"type": "error",
"message": str(e),
"error_code": getattr(e, 'code', 'unknown')
})
yield f"data: {error_data}\n\n"
@app.route('/api/chat/stream', methods=['POST'])
def stream_chat():
messages = request.json.get('messages', [])
return Response(
generate_streaming_response(messages),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Cache-Control'
}
)
This implementation handles several critical aspects:
data: {json}\n\n formatFor bidirectional communication or when you need more control over the connection lifecycle, WebSockets provide greater flexibility:
import asyncio
import websockets
import json
from typing import Dict, Set
import logging
class StreamingChatManager:
def __init__(self):
self.active_connections: Dict[str, websockets.WebSocketServerProtocol] = {}
self.connection_metadata: Dict[str, dict] = {}
async def register_connection(self, websocket: websockets.WebSocketServerProtocol,
connection_id: str, user_id: str):
"""Register and track WebSocket connections."""
self.active_connections[connection_id] = websocket
self.connection_metadata[connection_id] = {
"user_id": user_id,
"connected_at": time.time(),
"messages_sent": 0
}
logging.info(f"Connection {connection_id} registered for user {user_id}")
async def unregister_connection(self, connection_id: str):
"""Clean up connection tracking."""
self.active_connections.pop(connection_id, None)
self.connection_metadata.pop(connection_id, None)
logging.info(f"Connection {connection_id} unregistered")
async def stream_llm_response(self, connection_id: str, messages: list,
stream_config: dict = None):
"""Stream LLM response through WebSocket with advanced features."""
websocket = self.active_connections.get(connection_id)
if not websocket:
logging.error(f"No active connection for {connection_id}")
return
config = stream_config or {}
chunk_buffer = []
buffer_size = config.get('buffer_size', 1) # Tokens per chunk
try:
# Send initial status
await websocket.send(json.dumps({
"type": "stream_start",
"request_id": connection_id,
"model": config.get('model', 'gpt-4')
}))
response = await openai.ChatCompletion.acreate(
model=config.get('model', 'gpt-4'),
messages=messages,
stream=True,
temperature=config.get('temperature', 0.7),
max_tokens=config.get('max_tokens', 2000)
)
async for chunk in response:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
chunk_buffer.append(content)
# Send buffered chunks to smooth out delivery
if len(chunk_buffer) >= buffer_size:
combined_content = ''.join(chunk_buffer)
await websocket.send(json.dumps({
"type": "content_chunk",
"content": combined_content,
"chunk_index": self.connection_metadata[connection_id]["messages_sent"]
}))
chunk_buffer = []
self.connection_metadata[connection_id]["messages_sent"] += 1
# Add small delay to prevent overwhelming client
await asyncio.sleep(0.01)
# Send any remaining buffered content
if chunk_buffer:
combined_content = ''.join(chunk_buffer)
await websocket.send(json.dumps({
"type": "content_chunk",
"content": combined_content,
"chunk_index": self.connection_metadata[connection_id]["messages_sent"]
}))
# Signal completion
await websocket.send(json.dumps({
"type": "stream_complete",
"total_chunks": self.connection_metadata[connection_id]["messages_sent"] + 1,
"completion_time": time.time()
}))
except websockets.exceptions.ConnectionClosed:
logging.info(f"Connection {connection_id} closed during streaming")
await self.unregister_connection(connection_id)
except Exception as e:
await websocket.send(json.dumps({
"type": "error",
"error": str(e),
"error_type": type(e).__name__
}))
logging.error(f"Streaming error for {connection_id}: {e}")
# WebSocket handler
chat_manager = StreamingChatManager()
async def handle_websocket(websocket, path):
connection_id = str(uuid.uuid4())
try:
# Handle connection authentication/setup
auth_message = await websocket.recv()
auth_data = json.loads(auth_message)
user_id = auth_data.get('user_id')
await chat_manager.register_connection(websocket, connection_id, user_id)
await websocket.send(json.dumps({
"type": "connection_established",
"connection_id": connection_id
}))
async for message in websocket:
data = json.loads(message)
if data['type'] == 'chat_request':
await chat_manager.stream_llm_response(
connection_id,
data['messages'],
data.get('config', {})
)
except websockets.exceptions.ConnectionClosed:
pass
finally:
await chat_manager.unregister_connection(connection_id)
# Start WebSocket server
start_server = websockets.serve(handle_websocket, "localhost", 8765)
The WebSocket implementation provides several advantages over SSE:
The client side of streaming responses involves complex state management challenges. You're not just displaying text—you're managing partial renders, handling formatting, and maintaining smooth UX during network interruptions.
class StreamingTextRenderer {
constructor(containerElement, options = {}) {
this.container = containerElement;
this.buffer = '';
this.renderedLength = 0;
this.options = {
typewriterDelay: options.typewriterDelay || 0,
chunkSize: options.chunkSize || 10,
enableMarkdown: options.enableMarkdown || true,
...options
};
// For markdown parsing of partial content
this.markdownParser = new marked.Renderer();
this.codeBlockPattern = /```(\w+)?\n([\s\S]*?)\n```/g;
this.incompleteCodeBlock = false;
}
appendContent(newContent) {
this.buffer += newContent;
if (this.options.typewriterDelay > 0) {
this.renderWithTypewriter();
} else {
this.renderImmediate();
}
}
renderImmediate() {
if (this.options.enableMarkdown) {
this.renderMarkdownSafe();
} else {
this.renderPlainText();
}
}
renderMarkdownSafe() {
// Handle partial markdown gracefully
let renderableContent = this.buffer;
let tempContainer = document.createElement('div');
// Check for incomplete code blocks
const codeBlockMatches = [...this.buffer.matchAll(this.codeBlockPattern)];
const lastTripleBacktick = this.buffer.lastIndexOf('```');
if (lastTripleBacktick > -1) {
const afterLastBacktick = this.buffer.slice(lastTripleBacktick + 3);
// If we don't have a closing ```, treat as incomplete
if (!afterLastBacktick.includes('```')) {
this.incompleteCodeBlock = true;
// Only render up to the incomplete code block
renderableContent = this.buffer.slice(0, lastTripleBacktick);
} else {
this.incompleteCodeBlock = false;
}
}
try {
// Parse markdown but handle incomplete structures
let parsed = marked.parse(renderableContent);
tempContainer.innerHTML = parsed;
// If we have an incomplete code block, add it as plain text
if (this.incompleteCodeBlock) {
const incompleteBlock = this.buffer.slice(lastTripleBacktick);
const codeElement = document.createElement('pre');
codeElement.className = 'incomplete-code-block';
codeElement.textContent = incompleteBlock;
tempContainer.appendChild(codeElement);
}
this.container.innerHTML = tempContainer.innerHTML;
} catch (e) {
// Fallback to plain text if markdown parsing fails
this.renderPlainText();
}
}
renderPlainText() {
// Simple but fast text rendering with basic formatting preservation
const lines = this.buffer.split('\n');
let html = '';
for (let i = 0; i < lines.length; i++) {
const line = this.escapeHtml(lines[i]);
if (i < lines.length - 1) {
html += line + '<br>';
} else {
html += line;
}
}
this.container.innerHTML = html;
}
renderWithTypewriter() {
// Smooth typewriter effect for better UX
const currentVisible = this.container.textContent.length;
const targetLength = Math.min(
currentVisible + this.options.chunkSize,
this.buffer.length
);
if (currentVisible < targetLength) {
const nextChunk = this.buffer.slice(currentVisible, targetLength);
this.container.textContent += nextChunk;
if (targetLength < this.buffer.length) {
setTimeout(() => this.renderWithTypewriter(), this.options.typewriterDelay);
}
}
}
escapeHtml(text) {
const div = document.createElement('div');
div.textContent = text;
return div.innerHTML;
}
complete() {
// Final render pass for complete content
this.incompleteCodeBlock = false;
this.renderImmediate();
// Trigger syntax highlighting if available
if (typeof Prism !== 'undefined') {
Prism.highlightAllUnder(this.container);
}
}
}
import React, { useState, useEffect, useRef, useCallback } from 'react';
const StreamingChatInterface = ({ apiEndpoint, messages }) => {
const [streamingResponse, setStreamingResponse] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const [streamError, setStreamError] = useState(null);
const [connectionStatus, setConnectionStatus] = useState('disconnected');
const eventSourceRef = useRef(null);
const rendererRef = useRef(null);
const responseContainerRef = useRef(null);
// Initialize text renderer
useEffect(() => {
if (responseContainerRef.current) {
rendererRef.current = new StreamingTextRenderer(
responseContainerRef.current,
{
enableMarkdown: true,
typewriterDelay: 0,
chunkSize: 15
}
);
}
}, []);
const startStreaming = useCallback(async () => {
if (isStreaming) return;
setIsStreaming(true);
setStreamError(null);
setStreamingResponse('');
setConnectionStatus('connecting');
try {
const response = await fetch(`${apiEndpoint}/chat/stream`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'text/event-stream',
},
body: JSON.stringify({ messages })
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
setConnectionStatus('connected');
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) {
setConnectionStatus('completed');
break;
}
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n\n');
buffer = lines.pop(); // Keep incomplete line in buffer
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const data = JSON.parse(line.slice(6));
handleStreamData(data);
} catch (e) {
console.warn('Failed to parse SSE data:', line);
}
}
}
}
} catch (error) {
setStreamError(error.message);
setConnectionStatus('error');
} finally {
setIsStreaming(false);
}
}, [apiEndpoint, messages, isStreaming]);
const handleStreamData = useCallback((data) => {
switch (data.type) {
case 'content':
setStreamingResponse(prev => {
const newContent = prev + data.content;
// Update renderer
if (rendererRef.current) {
rendererRef.current.appendContent(data.content);
}
return newContent;
});
break;
case 'complete':
setIsStreaming(false);
setConnectionStatus('completed');
if (rendererRef.current) {
rendererRef.current.complete();
}
break;
case 'error':
setStreamError(data.message);
setIsStreaming(false);
setConnectionStatus('error');
break;
default:
console.log('Unknown stream data type:', data.type);
}
}, []);
const stopStreaming = useCallback(() => {
if (eventSourceRef.current) {
eventSourceRef.current.close();
}
setIsStreaming(false);
setConnectionStatus('disconnected');
}, []);
return (
<div className="streaming-chat-interface">
<div className="connection-status">
Status: <span className={`status-${connectionStatus}`}>
{connectionStatus}
</span>
{isStreaming && (
<button onClick={stopStreaming} className="stop-button">
Stop Generation
</button>
)}
</div>
<div
ref={responseContainerRef}
className="streaming-response"
style={{
minHeight: '200px',
padding: '16px',
border: '1px solid #ddd',
borderRadius: '8px',
whiteSpace: 'pre-wrap',
fontFamily: 'monospace'
}}
/>
{streamError && (
<div className="error-display">
<strong>Error:</strong> {streamError}
<button onClick={() => setStreamError(null)}>Dismiss</button>
</div>
)}
{!isStreaming && (
<button
onClick={startStreaming}
className="start-streaming-button"
disabled={messages.length === 0}
>
Start Streaming Response
</button>
)}
</div>
);
};
export default StreamingChatInterface;
One of the most critical aspects of production streaming systems is handling backpressure—what happens when the client can't process data as fast as the server sends it. This becomes especially important when dealing with fast LLMs or slower client devices.
import asyncio
from collections import deque
import time
from typing import AsyncIterator
import logging
class BackpressureAwareStreamer:
def __init__(self, max_buffer_size: int = 1000,
flow_control_window: int = 50):
self.max_buffer_size = max_buffer_size
self.flow_control_window = flow_control_window
self.client_ack_buffer = deque()
self.pending_chunks = 0
async def stream_with_backpressure(self, websocket, llm_response_iterator: AsyncIterator[str]):
"""Stream LLM response with intelligent backpressure management."""
chunk_id = 0
send_buffer = deque()
async def send_worker():
"""Async worker that sends chunks respecting flow control."""
nonlocal chunk_id
while True:
try:
# Wait for data or completion signal
if not send_buffer:
await asyncio.sleep(0.01) # Small delay to prevent busy waiting
continue
# Check if we're within flow control window
if self.pending_chunks >= self.flow_control_window:
# Wait for acknowledgments before sending more
await self.wait_for_acks(websocket)
chunk_data = send_buffer.popleft()
if chunk_data is None: # Completion signal
await websocket.send(json.dumps({
"type": "stream_complete",
"final_chunk_id": chunk_id - 1
}))
break
# Send chunk with flow control metadata
chunk_id += 1
await websocket.send(json.dumps({
"type": "content_chunk",
"content": chunk_data,
"chunk_id": chunk_id,
"pending_acks": self.pending_chunks,
"requires_ack": True
}))
self.pending_chunks += 1
except websockets.exceptions.ConnectionClosed:
logging.info("Connection closed during send")
break
except Exception as e:
logging.error(f"Send error: {e}")
break
async def receive_worker():
"""Handle client acknowledgments and flow control."""
async for message in websocket:
try:
data = json.loads(message)
if data.get('type') == 'chunk_ack':
self.handle_chunk_ack(data['chunk_id'])
except Exception as e:
logging.error(f"Ack processing error: {e}")
# Start concurrent workers
send_task = asyncio.create_task(send_worker())
receive_task = asyncio.create_task(receive_worker())
try:
# Feed LLM response into send buffer
async for chunk in llm_response_iterator:
# Apply backpressure if buffer is full
while len(send_buffer) >= self.max_buffer_size:
await asyncio.sleep(0.1)
logging.warning("Send buffer full, applying backpressure")
send_buffer.append(chunk)
# Signal completion
send_buffer.append(None)
# Wait for send completion
await send_task
finally:
receive_task.cancel()
def handle_chunk_ack(self, chunk_id: int):
"""Process chunk acknowledgment from client."""
self.pending_chunks = max(0, self.pending_chunks - 1)
self.client_ack_buffer.append({
"chunk_id": chunk_id,
"ack_time": time.time()
})
# Clean old acks
cutoff_time = time.time() - 30 # Keep 30 seconds of ack history
while (self.client_ack_buffer and
self.client_ack_buffer[0]["ack_time"] < cutoff_time):
self.client_ack_buffer.popleft()
async def wait_for_acks(self, websocket, timeout: float = 5.0):
"""Wait for client acknowledgments to clear flow control window."""
start_time = time.time()
while self.pending_chunks >= self.flow_control_window:
if time.time() - start_time > timeout:
logging.warning("Flow control timeout, forcing continue")
# Reset pending count to prevent permanent blocking
self.pending_chunks = 0
break
await asyncio.sleep(0.1)
def get_flow_control_stats(self) -> dict:
"""Return current flow control statistics."""
return {
"pending_chunks": self.pending_chunks,
"buffer_utilization": len(self.client_ack_buffer),
"flow_control_window": self.flow_control_window,
"recent_ack_rate": self.calculate_ack_rate()
}
def calculate_ack_rate(self) -> float:
"""Calculate recent acknowledgment rate for adaptive flow control."""
if len(self.client_ack_buffer) < 2:
return 0.0
recent_acks = [ack for ack in self.client_ack_buffer
if ack["ack_time"] > time.time() - 5]
if len(recent_acks) < 2:
return 0.0
time_span = recent_acks[-1]["ack_time"] - recent_acks[0]["ack_time"]
return len(recent_acks) / max(time_span, 0.1)
class FlowControlledStreamClient {
constructor(websocket) {
this.websocket = websocket;
this.processingQueue = [];
this.maxProcessingQueue = 100;
this.isProcessing = false;
this.ackDelay = 50; // ms delay between acks to prevent flooding
this.lastAckTime = 0;
this.setupMessageHandler();
}
setupMessageHandler() {
this.websocket.addEventListener('message', (event) => {
const data = JSON.parse(event.data);
if (data.type === 'content_chunk') {
this.handleContentChunk(data);
}
});
}
async handleContentChunk(chunkData) {
// Add to processing queue
this.processingQueue.push(chunkData);
// Apply backpressure if queue is full
if (this.processingQueue.length >= this.maxProcessingQueue) {
console.warn('Client processing queue full, dropping chunks');
// Keep only the most recent chunks
this.processingQueue = this.processingQueue.slice(-this.maxProcessingQueue / 2);
}
// Start processing if not already running
if (!this.isProcessing) {
this.processQueue();
}
// Send acknowledgment with rate limiting
if (chunkData.requires_ack) {
await this.sendAcknowledgment(chunkData.chunk_id);
}
}
async processQueue() {
this.isProcessing = true;
while (this.processingQueue.length > 0) {
const chunk = this.processingQueue.shift();
try {
// Simulate processing time (rendering, DOM updates, etc.)
await this.processChunk(chunk);
// Small delay to prevent overwhelming the browser
await this.sleep(10);
} catch (error) {
console.error('Chunk processing error:', error);
// Continue processing other chunks
}
}
this.isProcessing = false;
}
async processChunk(chunk) {
// Your actual chunk processing logic here
// This might involve DOM updates, state changes, etc.
return new Promise((resolve) => {
requestAnimationFrame(() => {
// Update UI
this.updateDisplay(chunk.content);
resolve();
});
});
}
async sendAcknowledgment(chunkId) {
const now = Date.now();
// Rate limit acknowledgments
if (now - this.lastAckTime < this.ackDelay) {
return;
}
this.lastAckTime = now;
try {
this.websocket.send(JSON.stringify({
type: 'chunk_ack',
chunk_id: chunkId,
client_time: now,
queue_length: this.processingQueue.length
}));
} catch (error) {
console.error('Failed to send acknowledgment:', error);
}
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
updateDisplay(content) {
// Implement your display update logic
console.log('Processing chunk:', content);
}
getClientStats() {
return {
queueLength: this.processingQueue.length,
isProcessing: this.isProcessing,
maxQueueSize: this.maxProcessingQueue,
lastAckTime: this.lastAckTime
};
}
}
Streaming connections are inherently fragile—network issues, server overloads, or client-side problems can interrupt the flow at any time. Robust error handling and recovery mechanisms are essential for production systems.
import asyncio
import json
import logging
import time
from enum import Enum
from dataclasses import dataclass
from typing import Optional, Dict, Any
class StreamState(Enum):
IDLE = "idle"
CONNECTING = "connecting"
STREAMING = "streaming"
PAUSED = "paused"
ERROR = "error"
COMPLETED = "completed"
@dataclass
class StreamError:
error_type: str
message: str
recoverable: bool
retry_after: Optional[float] = None
context: Optional[Dict[str, Any]] = None
class ResilientStreamManager:
def __init__(self):
self.state = StreamState.IDLE
self.retry_counts = {}
self.max_retries = 3
self.base_retry_delay = 1.0
self.recovery_callbacks = {}
async def stream_with_recovery(self, connection_id: str, request_data: dict):
"""Main streaming method with comprehensive error handling."""
self.state = StreamState.CONNECTING
retry_count = 0
while retry_count <= self.max_retries:
try:
await self.attempt_stream(connection_id, request_data)
# If we reach here, streaming completed successfully
self.state = StreamState.COMPLETED
return
except StreamingError as e:
await self.handle_streaming_error(connection_id, e, retry_count)
if not e.recoverable or retry_count >= self.max_retries:
self.state = StreamState.ERROR
raise e
retry_count += 1
delay = self.calculate_retry_delay(retry_count)
await asyncio.sleep(delay)
except asyncio.CancelledError:
logging.info(f"Streaming cancelled for {connection_id}")
self.state = StreamState.IDLE
raise
except Exception as e:
# Unexpected error - treat as non-recoverable
error = StreamingError(
error_type="unexpected_error",
message=str(e),
recoverable=False
)
await self.handle_streaming_error(connection_id, error, retry_count)
self.state = StreamState.ERROR
raise error
async def attempt_stream(self, connection_id: str, request_data: dict):
"""Single streaming attempt with granular error detection."""
websocket = self.get_connection(connection_id)
if not websocket:
raise StreamingError(
error_type="connection_lost",
message="WebSocket connection not found",
recoverable=True
)
self.state = StreamState.STREAMING
chunk_count = 0
last_chunk_time = time.time()
try:
# Initialize LLM stream
llm_stream = await self.create_llm_stream(request_data)
# Send stream start notification
await self.send_safe(websocket, {
"type": "stream_start",
"request_id": connection_id,
"timestamp": time.time()
})
async for chunk in llm_stream:
# Check for timeout between chunks
current_time = time.time()
if current_time - last_chunk_time > 30: # 30 second timeout
raise StreamingError(
error_type="chunk_timeout",
message="No chunks received in 30 seconds",
recoverable=True,
context={"last_chunk_time": last_chunk_time, "chunk_count": chunk_count}
)
# Process and send chunk
processed_chunk = await self.process_chunk(chunk, chunk_count)
await self.send_safe(websocket, processed_chunk)
chunk_count += 1
last_chunk_time = current_time
# Health check every 50 chunks
if chunk_count % 50 == 0:
await self.health_check(websocket, connection_id)
except openai.error.RateLimitError as e:
raise StreamingError(
error_type="rate_limit",
message="OpenAI API rate limit exceeded",
recoverable=True,
retry_after=60.0, # Wait 60 seconds
context={"openai_error": str(e)}
)
except openai.error.APIError as e:
# Determine if API error is recoverable
recoverable = "server_error" in str(e).lower() or "503" in str(e)
raise StreamingError(
error_type="api_error",
message=f"OpenAI API error: {str(e)}",
recoverable=recoverable,
context={"openai_error": str(e)}
)
except websockets.exceptions.ConnectionClosed:
raise StreamingError(
error_type="connection_closed",
message="WebSocket connection closed unexpectedly",
recoverable=True,
context={"chunks_sent": chunk_count}
)
async def handle_streaming_error(self, connection_id: str,
error: StreamingError, retry_count: int):
"""Comprehensive error handling with context preservation."""
logging.error(f"Streaming error for {connection_id}: {error.message}")
# Update retry tracking
self.retry_counts[connection_id] = retry_count
# Send error notification to client if connection still exists
websocket = self.get_connection(connection_id)
if websocket:
try:
await self.send_safe(websocket, {
"type": "stream_error",
"error_type": error.error_type,
"message": error.message,
"recoverable": error.recoverable,
"retry_count": retry_count,
"retry_after": error.retry_after,
"context": error.context
})
except:
# If we can't send error notification, connection is truly lost
pass
# Execute recovery callbacks
recovery_callback = self.recovery_callbacks.get(error.error_type)
if recovery_callback:
try:
await recovery_callback(connection_id, error, retry_count)
except Exception as callback_error:
logging.error(f"Recovery callback failed: {callback_error}")
async def send_safe(self, websocket, data: dict):
"""Safe WebSocket send with connection validation."""
if websocket.closed:
raise StreamingError(
error_type="connection_closed",
message="Cannot send to closed WebSocket",
recoverable=True
)
try:
await websocket.send(json.dumps(data))
except websockets.exceptions.ConnectionClosed:
raise StreamingError(
error_type="connection_closed",
message="Connection closed during send",
recoverable=True
)
def calculate_retry_delay(self, retry_count: int) -> float:
"""Exponential backoff with jitter."""
base_delay = self.base_retry_delay * (2 ** retry_count)
# Add jitter to prevent thundering herd
jitter = base_delay * 0.1 * (time.time() % 1)
return min(base_delay + jitter, 60.0) # Cap at 60 seconds
def register_recovery_callback(self, error_type: str, callback):
"""Register custom recovery logic for specific error types."""
self.recovery_callbacks[error_type] = callback
class StreamingError(Exception):
def __init__(self, error_type: str, message: str, recoverable: bool = True,
retry_after: Optional[float] = None, context: Optional[Dict] = None):
self.error_type = error_type
self.message = message
self.recoverable = recoverable
self.retry_after = retry_after
self.context = context or {}
super().__init__(message)
class ResilientStreamingClient {
constructor(wsUrl, options = {}) {
this.wsUrl = wsUrl;
this.options = {
maxRetries: 5,
initialRetryDelay: 1000,
maxRetryDelay: 30000,
heartbeatInterval: 30000,
...options
};
this.state = 'disconnected';
this.retryCount = 0;
this.websocket = null;
this.heartbeatTimer = null;
this.reconnectTimer = null;
// Stream state preservation
this.streamBuffer = [];
this.lastProcessedChunk = -1;
this.streamMetadata = {};
this.eventHandlers = new Map();
}
async connect() {
if (this.state === 'connecting' || this.state === 'connected') {
return;
}
this.state = 'connecting';
this.emit('stateChange', { state: this.state });
try {
this.websocket = new WebSocket(this.wsUrl);
this.setupWebSocketHandlers();
// Wait for connection
await new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error('Connection timeout'));
}, 10000);
this.websocket.addEventListener('open', () => {
clearTimeout(timeout);
resolve();
});
this.websocket.addEventListener('error', (error) => {
clearTimeout(timeout);
reject(error);
});
});
this.state = 'connected';
this.retryCount = 0;
this.startHeartbeat();
this.emit('connected');
} catch (error) {
this.state = 'disconnected';
this.emit('connectionError', { error, retryCount: this.retryCount });
await this.handleConnectionFailure(error);
}
}
setupWebSocketHandlers() {
this.websocket.addEventListener('message', (event) => {
this.handleMessage(JSON.parse(event.data));
});
this.websocket.addEventListener('close', (event) => {
this.handleConnectionClose(event);
});
this.websocket.addEventListener('error', (error) => {
this.handleConnectionError(error);
});
}
handleMessage(data) {
switch (data.type) {
case 'stream_start':
this.handleStreamStart(data);
break;
case 'content_chunk':
this.handleContentChunk(data);
break;
case 'stream_error':
this.handleStreamError(data);
break;
case 'stream_complete':
this.handleStreamComplete(data);
break;
case 'heartbeat':
this.handleHeartbeat(data);
break;
default:
this.emit('unknownMessage', data);
}
}
handleContentChunk(data) {
// Check for missing chunks
if (data.chunk_id !== undefined &&
data.chunk_id !== this.lastProcessedChunk + 1) {
this.emit('chunkGap', {
expected: this.lastProcessedChunk + 1,
received: data.chunk_id
});
// Request missing chunks if possible
this.requestMissingChunks(this.lastProcessedChunk + 1, data.chunk_id - 1);
}
// Buffer chunk for processing
this.streamBuffer.push(data);
this.lastProcessedChunk = data.chunk_id || this.lastProcessedChunk + 1;
// Process buffered chunks in order
this.processBufferedChunks();
this.emit('contentChunk', data);
}
handleStreamError(data) {
this.emit('streamError', data);
if (data.recoverable && this.retryCount < this.options.maxRetries) {
const retryDelay = data.retry_after ?
data.retry_after * 1000 :
this.calculateRetryDelay();
setTimeout(() => {
this.attemptRecovery(data);
}, retryDelay);
} else {
this.state = 'failed';
this.emit('streamFailed', data);
}
}
async attemptRecovery(errorData) {
this.emit('recoveryAttempt', { retryCount: this.retryCount });
if (errorData.error_type === 'connection_closed') {
// Reconnect and resume stream
await this.connect();
if (this.state === 'connected') {
await this.resumeStream();
}
} else if (errorData.error_type === 'rate_limit') {
// Wait and retry the original request
await this.retryCurrentRequest();
} else {
// Generic recovery attempt
await this.connect();
}
}
async resumeStream() {
// Request to resume from last processed chunk
this.send({
type: 'resume_stream',
last_chunk_id: this.lastProcessedChunk,
stream_metadata: this.streamMetadata
});
}
handleConnectionClose(event) {
this.stopHeartbeat();
if (this.state === 'connected') {
this.state = 'disconnected';
this.emit('connectionLost', { code: event.code, reason: event.reason });
// Attempt automatic reconnection
this.scheduleReconnect();
}
}
scheduleReconnect() {
if (this.retryCount >= this.options.maxRetries) {
this.state = 'failed';
this.emit('maxRetriesExceeded');
return;
}
const delay = this.calculateRetryDelay();
this.retryCount++;
this.reconnectTimer = setTimeout(() => {
this.connect();
}, delay);
}
calculateRetryDelay() {
const delay = Math.min(
this.options.initialRetryDelay * Math.pow(2, this.retryCount),
this.options.maxRetryDelay
);
// Add jitter
return delay + (Math.random() * delay * 0.1);
}
startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
if (this.state === 'connected') {
this.send({ type: 'heartbeat', timestamp: Date.now() });
}
}, this.options.heartbeatInterval);
}
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
send(data) {
if (this.state !== 'connected' || !this.websocket) {
throw new Error('Not connected');
}
this.websocket.send(JSON.stringify(data));
}
// Event system
on(event, handler) {
if (!this.eventHandlers.has(event)) {
this.eventHandlers.set(event, new Set());
}
this.eventHandlers.get(event).add(handler);
}
emit(event, data) {
const handlers = this.eventHandlers.get(event);
if (handlers) {
handlers.forEach(handler => {
try {
handler(data);
} catch (error) {
console.error(`Error in event handler for ${event}:`, error);
}
});
}
}
disconnect() {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
}
this.stopHeartbeat();
if (this.websocket) {
this.websocket.close();
}
this.state = 'disconnected';
this.emit('disconnected');
}
}
Production streaming systems must handle hundreds or thousands of concurrent streams while maintaining low latency and high throughput. This requires careful attention to performance bottlenecks and scaling patterns.
import asyncio
import uvloop
import json
import time
from collections import defaultdict
import weakref
import psutil
import logging
from typing import Dict, List, Set
from dataclasses import dataclass, field
@dataclass
class StreamMetrics:
total_streams: int = 0
active_streams: int = 0
bytes_sent: int = 0
chunks_sent: int = 0
avg_chunk_size: float = 0.0
peak_concurrent_streams: int = 0
error_count: int = 0
start_time: float = field(default_factory=time.time)
class HighPerformanceStreamServer:
def __init__(self, max_concurrent_streams: int = 1000):
self.max_concurrent_streams = max_concurrent_streams
self.active_streams: Dict[str, 'StreamContext'] = {}
self.connection_pool = weakref.WeakSet()
self.metrics = StreamMetrics()
# Performance optimization settings
self.chunk_batch_size = 10
self.send_queue_size = 1000
self.memory_pressure_threshold = 0.85
# Load balancing
self.worker_pools = {}
self.current_worker = 0
# Initialize event loop with optimizations
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
async def start_optimized_server(self, host: str = "localhost", port: int = 8765):
"""Start server with performance optimizations."""
# Configure asyncio for high performance
loop = asyncio.get_event_loop()
loop.set_debug(False) # Disable debug mode for production
# Create optimized WebSocket server
server = await websockets.serve(
self.handle_connection,
host,
port,
# Performance optimizations
max_size=10**7, # 10MB max message size
max_queue=100, # Limit queued messages
compression=None, # Disable compression for speed
ping_interval=30,
ping_timeout=10,
# Use SO_REUSEPORT for better load distribution
reuse_port=True
)
# Start background tasks
asyncio.create_task(self.metrics_collector())
asyncio.create_task(self.memory_monitor())
asyncio.create_task(self.connection_cleaner())
logging.info(f"High-performance streaming server started on {host}:{port}")
return server
async def handle_connection(self, websocket, path):
"""Optimized connection handler with resource management."""
# Check if we're at capacity
if len(self.active_streams) >= self.max_concurrent_streams:
await websocket.close(code=1013, reason="Server at capacity")
return
connection_id = self.generate_connection_id()
stream_context = StreamContext(connection_id, websocket)
try:
self.active_streams[connection_id] = stream_context
self.connection_pool.add(websocket)
self.metrics.active_streams += 1
self.metrics.peak_concurrent_streams = max(
self.metrics.peak_concurrent_streams,
self.metrics.active_streams
)
await self.process_stream_optimized(stream_context)
except Exception as e:
self.metrics.error_count += 1
logging.error(f"Stream error for {connection_id}: {e}")
finally:
self.cleanup_stream(connection_id)
async def process_stream_optimized(self, context: 'StreamContext'):
"""Optimized streaming with batching and memory management."""
send_queue = asyncio.Queue(maxsize=self.send_queue_size)
async def optimized_sender():
"""Batched sender to reduce syscalls."""
batch = []
while True:
try:
# Collect batch of chunks
while len(batch) < self.chunk_batch_size:
try:
chunk = await asyncio.wait_for(
send_queue.get(), timeout=0.1
)
if chunk is None: # Completion signal
break
batch.append(chunk)
except asyncio.TimeoutError:
break
if not batch:
continue
# Send batch efficiently
if len(batch) == 1:
# Single chunk - send directly
await context.websocket.send(batch[0])
else:
# Multiple chunks - batch into single message
batched_message = json.dumps({
"type": "chunk_batch",
"chunks": [json.loads(msg) for msg in batch]
})
await context.websocket.send(batched_message)
# Update metrics
for chunk in batch:
self.metrics.bytes_sent += len(chunk)
self.metrics.chunks_sent += 1
batch.clear()
if chunk is None: # Was completion signal
break
except websockets.exceptions.ConnectionClosed:
break
except Exception as e:
logging.error(f"Sender error: {e}")
break
async def llm_processor():
"""Process LLM stream with memory-efficient handling."""
try:
# Get LLM response stream
llm_stream = await self.get_llm_stream(context.request_data)
chunk_buffer = []
buffer_size = 0
max_buffer_size = 8192 # 8KB buffer
async for raw_chunk in llm_stream:
chunk_data = self.process_chunk(raw_chunk)
chunk_json = json.dumps(chunk_data)
chunk_buffer.append(chunk_json)
buffer_size += len(chunk_json)
# Flush buffer when size limit reached
if buffer_size >= max_buffer_size:
for buffered_chunk in chunk_buffer:
await send_queue.put(buffered_chunk)
chunk_buffer.clear()
buffer_size = 0
# Check memory pressure
if self.is_memory_pressure():
await asyncio.sleep(0.01) # Brief pause
# Flush remaining buffer
for buffered_chunk in chunk_buffer:
await send_queue.put(buffered_chunk)
# Signal completion
await send_queue.put(None)
except Exception as e:
logging.error(f"LLM processing error: {e}")
await send_queue.put(json.dumps({
"type": "error",
"message": str(e)
}))
# Run sender and processor concurrently
sender_task = asyncio.create_task(optimized_sender())
processor_task = asyncio.create_task(llm_processor())
await asyncio.gather(sender_task, processor_task, return_exceptions=True)
def is_memory_pressure(self) -> bool:
"""Check if system is under memory pressure."""
memory_percent = psutil.virtual_memory().percent / 100
return memory_percent > self.memory_pressure_threshold
async def memory_monitor(self):
"""Background task to monitor and manage memory usage."""
while True:
try:
if self.is_memory_pressure():
# Implement memory pressure relief
await self.handle_memory_pressure()
await asyncio.sleep(5) # Check every 5 seconds
except Exception as e:
logging.error(f"Memory monitor error: {e}")
async def handle_memory_pressure(self):
"""Handle memory pressure by throttling or dropping connections."""
logging.warning("Memory pressure detected, implementing relief measures")
# Sort streams by age, close oldest ones
sorted_streams = sorted(
self.active_streams.items(),
key=lambda x: x[1].start_time
)
# Close 10% of oldest streams
to_close = max(1, len(sorted_streams) // 10)
for connection_id, context in sorted_streams[:to_close]:
try:
await context.websocket.close(
code=1000,
reason="Server memory pressure"
)
logging.info(f"Closed stream {connection_id} due to memory pressure")
except:
pass # Connection might already be closed
async def metrics_collector(self):
"""Collect and log performance metrics."""
while True:
try:
current_time = time.time()
uptime = current_time - self.metrics.start_time
if self.metrics.chunks_sent > 0:
self.metrics.avg_chunk_size = (
self.metrics.bytes_sent / self.metrics.chunks_sent
)
# Log metrics every minute
logging.info(f"Streaming Metrics - "
f"Active: {self.metrics.active_streams}, "
f"Peak: {self.metrics.peak_concurrent_streams}, "
f"Total Chunks: {self.metrics.chunks_sent}, "
f"Avg Size: {self.metrics.avg_chunk_size:.1f}B, "
f"Errors: {self.metrics.error_count}, "
f"Uptime: {uptime:.1f}s")
await asyncio.sleep(60)
except Exception as e:
logging.error(f"Metrics collector error: {e}")
@dataclass
class StreamContext:
connection_id: str
websocket: websockets.WebSocketServerProtocol
start_time: float = field(default_factory=time.time)
request_data: dict = field(default_factory=dict)
bytes_sent: int = 0
chunks_sent: int = 0
class OptimizedStreamingClient {
constructor(options = {}) {
this.options = {
bufferSize: 50,
renderBatchSize: 5,
maxRenderFreq: 60, // Max FPS for rendering
enableVirtualScrolling: true,
...options
};
this.renderQueue = [];
this.isRendering = false;
this.lastRenderTime = 0;
this.virtualScrollOffset = 0;
// Performance monitoring
this.perfMetrics = {
chunksReceived: 0,
chunksRendered: 0,
avgRenderTime: 0,
droppedFrames: 0
};
// Use requestAnimationFrame for smooth rendering
this.frameId = null;
}
handleContentChunk(chunk) {
this.perfMetrics.chunksReceived++;
// Add to render queue
this.renderQueue.push(chunk);
// Limit queue size to prevent memory issues
if (this.renderQueue.length > this.options.bufferSize) {
this.renderQueue = this.renderQueue.slice(-this.options.bufferSize);
this.perfMetrics.droppedFrames++;
}
// Schedule rendering if not already scheduled
this.scheduleRender();
}
scheduleRender() {
if (this.frameId) {
return; // Already scheduled
}
this.frameId = requestAnimationFrame(() => {
this.performBatchRender();
this.frameId = null;
});
}
performBatchRender() {
const startTime = performance.now();
const targetFrameTime = 1000 / this.options.maxRenderFreq;
// Check if enough time has passed since last render
if (startTime - this.lastRenderTime < targetFrameTime) {
// Schedule for next frame
this.scheduleRender();
return;
}
try {
// Process batch of chunks
const batchSize = Math.min(
this.options.renderBatchSize,
this.renderQueue.length
);
const batch = this.renderQueue.splice(0, batchSize);
if (batch.length > 0) {
this.renderBatch(batch);
this.perfMetrics.chunksRendered += batch.length;
}
// Continue if more chunks waiting
if (this.renderQueue.length > 0) {
this.scheduleRender();
}
} finally {
const endTime = performance.now();
const renderTime = endTime - startTime;
// Update performance metrics
this.perfMetrics.avgRenderTime = (
(this.perfMetrics.avgRenderTime * 0.9) + (renderTime * 0.1)
);
this.lastRenderTime = endTime;
}
}
renderBatch(chunks) {
// Efficient DOM manipulation using document fragments
const fragment = document.createDocumentFragment();
for (const chunk of chunks) {
const element = this.createChunkElement(chunk);
fragment.appendChild(element);
}
// Single DOM update
this.container.appendChild(fragment);
// Update virtual scrolling if enabled
if (this.options.enableVirtualScrolling) {
this.updateVirtualScrolling();
}
// Trigger syntax highlighting in batches
this.scheduleHighlighting();
}
updateVirtualScrolling() {
const containerHeight = this.container.clientHeight;
const scrollTop = this.container.scrollTop;
const itemHeight = 20; // Approximate line height
const visibleStart = Math.floor(scrollTop / itemHeight);
const visibleEnd = visibleStart + Math.ceil(containerHeight / itemHeight);
// Hide elements outside visible range
const children = this.container.children;
for (let i = 0; i < children.length; i++) {
const child = children[i];
if (i < visibleStart || i > visibleEnd) {
child.style.display = 'none';
} else {
child.style.display = '';
}
}
}
scheduleHighlighting() {
// Debounced syntax highlighting to avoid performance issues
clearTimeout(this.highlightTimeout);
this.highlightTimeout = setTimeout(() => {
this.performHighlighting();
}, 100);
}
performHighlighting() {
// Use Intersection Observer for efficient highlighting
if (!this.highlightObserver) {
this.highlightObserver = new IntersectionObserver((entries) => {
entries.forEach(entry => {
if (entry.isIntersecting && !entry.target.highlighted) {
this.highlightElement(entry.target);
entry.target.highlighted = true;
}
});
}, { threshold: 0.1 });
}
// Observe code blocks for highlighting
const codeBlocks = this.container.querySelectorAll('pre:not([highlighted])');
codeBlocks.forEach(block => {
this.highlightObserver.observe(block);
});
}
highlightElement(element) {
// Async syntax highlighting to avoid blocking
setTimeout(() => {
if (typeof Prism !== 'undefined') {
Prism.highlightElement(element);
}
}, 0);
}
// Performance monitoring
getPerformanceMetrics() {
return {
...this.perfMetrics,
queueSize: this.renderQueue.length,
memoryUsage: this.estimateMemoryUsage()
};
}
estimateMem
Learning Path: Building with LLMs