Tutorial 14: Streaming and Server-Sent Events (SSE) - Real-Time Responses
This tutorial demonstrates real ADK streaming APIs with a working implementation that uses ADK v1.16.0's actual streaming capabilities.
- ADK v1.16.0 includes full streaming support with
StreamingMode
,Runner
,Session
, andLiveRequestQueue
classes - The tutorial implementation uses actual ADK streaming APIs, not simulation
Runner.run_async()
withStreamingMode.SSE
provides real progressive outputSession
management maintains conversation context across streaming responsesLiveRequestQueue
enables bidirectional communication for advanced streaming scenarios
Working Implementation Available: Check out the working implementation which demonstrates real ADK streaming APIs with proper session management and error handling.
Tutorial 14: Streaming with Server-Sent Events (SSE)β
Goal: Implement streaming responses using Server-Sent Events (SSE) to provide real-time, progressive output for better user experience in your AI agents.
Prerequisites:
- Tutorial 01 (Hello World Agent)
- Tutorial 02 (Function Tools)
- Basic understanding of async/await in Python
What You'll Learn:
- Understanding streaming vs. non-streaming responses
- Implementing SSE streaming with
StreamingMode.SSE
- Using
RunConfig
for streaming configuration - Building real-time chat interfaces
- Aggregating streaming responses
- Best practices for production streaming applications
Time to Complete: 45-60 minutes
Why Streaming Mattersβ
Traditional AI responses are blocking - users wait for the complete answer before seeing anything. Streaming provides progressive output as the model generates text.
Without Streaming (Blocking):
User: "Explain quantum computing"
Agent: [5 seconds of waiting...]
[Complete response appears at once]
With Streaming (Progressive):
User: "Explain quantum computing"
Agent: "Quantum computing is a revolutionary..."
[Text appears word-by-word or chunk-by-chunk]
[User sees progress immediately]
Benefits:
- β Better UX: Users see progress immediately
- β Perceived Speed: Feels faster even if total time is similar
- β Early Feedback: Users can interrupt if needed
- β Real-Time Feel: More conversational and engaging
- β Long Responses: Essential for lengthy outputs
Response Flow Comparison:
BLOCKING RESPONSE (Traditional):
User βββΊ [Agent Processing: 5 seconds] βββΊ Complete Response Displayed
STREAMING RESPONSE (Progressive):
User βββΊ Agent: "Quantum computing..." βββΊ "is revolutionary..." βββΊ "...approach..."
[Immediate feedback] [Progressive chunks] [Continues...]
Data Flow Architecture:
βββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β Client ββββββΊβ ADK Runner ββββββΊβ Gemini Model β
β β β β β β
β User waits β β StreamingMode β β Generates text β
β for completeβ β .SSE enabled β β chunks as β
β response β β β β they're ready β
βββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β² β β
β βΌ βΌ
ββββββββ Chunks flow back progressively ββββββββ
1. Streaming Basicsβ
What is Server-Sent Events (SSE)?β
SSE is a standard protocol for servers to push data to clients over HTTP. In ADK,
streaming enables the model to send response chunks as they're generated using
StreamingMode.SSE
.
Basic Streaming Implementationβ
import asyncio
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.agents.run_config import RunConfig, StreamingMode
from google.adk.sessions import InMemorySessionService
from google.genai import types
# Create agent
agent = Agent(
model='gemini-2.0-flash',
name='streaming_assistant',
instruction='Provide detailed, helpful responses.'
)
# Configure streaming
run_config = RunConfig(
streaming_mode=StreamingMode.SSE
)
async def stream_response(query: str):
"""Stream agent response using real ADK APIs."""
# Create session service and runner
session_service = InMemorySessionService()
runner = Runner(app_name="streaming_demo", agent=agent, session_service=session_service)
# Create session
session = await session_service.create_session(
app_name="streaming_demo",
user_id="demo_user"
)
print(f"User: {query}\n")
print("Agent: ", end='', flush=True)
# Run with streaming
async for event in runner.run_async(
user_id="demo_user",
session_id=session.id,
new_message=types.Content(role="user", parts=[types.Part(text=query)]),
run_config=run_config
):
# Print each chunk as it arrives
if event.content and event.content.parts:
for part in event.content.parts:
if part.text:
print(part.text, end='', flush=True)
print("\n")
# Usage
asyncio.run(stream_response("Explain how neural networks work"))
Output (progressive):
User: Explain how neural networks work
Agent: Neural networks are computational models inspired by...
[Text appears progressively as generated]
...the human brain. They consist of interconnected nodes...
[Continues streaming...]
...making them powerful for pattern recognition tasks.
How Streaming Works (Actual Implementation)β
Current ADK v1.16.0 Implementation Flow:
- Setup Components β Create
Runner
,SessionService
, andSession
for context - Configure Streaming β Use
RunConfig
withStreamingMode.SSE
- Send Message β Use
types.Content
with proper role and parts structure - Process Events β Iterate through
runner.run_async()
events - Extract Chunks β Get text from
event.content.parts
- Display Progressively β Yield/print chunks as they arrive
- Complete β Final event signals completion
Key Components:
Runner
: Executes agent runs with streaming supportSessionService
: Manages conversation sessions and contextRunConfig
: Configures streaming mode and parametersStreamingMode.SSE
: Enables Server-Sent Events streamingtypes.Content
: Properly structured message format
ADK Streaming Flow:
βββββββββββββββββββ
β Initialize β
β Components β
βββββββββ¬ββββββββββ
β
βΌ
βββββββββββββββββββ βββββββββββββββββββ
β Create Session ββββββΊβ Configure β
β Service & β β StreamingMode β
β Runner β β .SSE β
βββββββββ¬ββββββββββ βββββββββ¬ββββββββββ
β β
βΌ βΌ
βββββββββββββββββββ βββββββββββββββββββ
β Send User ββββββΊβ Process Events β
β Message β β (Async Loop) β
β (types.Content) β β β
βββββββββ¬ββββββββββ βββββββββ¬ββββββββββ
β β
βΌ βΌ
βββββββββββββββββββ βββββββββββββββββββ
β Extract Text ββββββΊβ Display β
β Chunks from β β Progressively β
β event.content β β (flush=True) β
βββββββββββββββββββ βββββββββ¬ββββββββββ
β β
βΌ βΌ
βββββββββββββββββββ βββββββββββββββββββ
β Final Event ββββββΊβ Complete β
β Signals End β β Response β
βββββββββββββββββββ βββββββββββββββββββ
2. StreamingMode Configurationβ
Available Streaming Modesβ
from google.adk.agents import StreamingMode
# SSE - Server-Sent Events (one-way streaming)
StreamingMode.SSE
# BIDI - Bidirectional streaming (two-way, for Live API)
StreamingMode.BIDI
# OFF - No streaming (default, blocking)
StreamingMode.NONE
RunConfig Setupβ
from google.adk.agents import RunConfig, StreamingMode
# SSE Streaming
sse_config = RunConfig(
streaming_mode=StreamingMode.SSE
)
# No Streaming (blocking)
blocking_config = RunConfig(
streaming_mode=StreamingMode.NONE
)
# Use in runner
runner = Runner()
# Streaming
async for event in runner.run_async(query, agent, run_config=sse_config):
process_event(event)
# Blocking
result = await runner.run_async(query, agent, run_config=blocking_config)
process_complete_result(result)
StreamingMode Decision Tree:
StreamingMode Selection Guide
βββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββ
β What type of streaming? β
βββββββββββββββββββ¬ββββββββββββββββββββββββββββ
β
βββββββββββ΄ββββββββββ
β β
ββββββΌβββββ βββββββΌββββββ
β SSE β β BIDI β
β (One-wayβ β (Two-way β
β Server β β Live β
β Push) β β API) β
ββββββ¬βββββ βββββββ¬ββββββ
β β
βΌ βΌ
βββββββββββββββ βββββββββββββββββββ
β Use Cases: β β Use Cases: β
β β’ Chat apps β β β’ Voice agents β
β β’ Text β β β’ Real-time β
β streaming β β audio/video β
β β’ Progressiveβ β β’ Interactive β
β responses β β conversations β
β β’ Web APIs β β β’ Live sessions β
βββββββββββββββ βββββββββββββββββββ
Data Flow Patterns:
βββββββββββββββββββ
SSE (Server-Sent Events):
Client ββββ Text Chunks βββ Server
ββββ Text Chunks βββ
ββββ Text Chunks βββ
BIDI (Bidirectional):
Client βββ Audio/Text βββΊ Server
βββ Audio/Text βββ
βββ Audio/Text βββΊ
βββ Audio/Text βββ
NONE (Blocking):
Client βββ Request βββΊ Server
βββ Full Response ββ (after complete processing)
3. Real-World Example: Streaming Chat Applicationβ
Complete Implementationβ
"""
Streaming Chat Application with SSE
Real-time interactive chat with progressive responses.
"""
import asyncio
import os
from datetime import datetime
from typing import AsyncIterator
from google.adk.agents import Agent, Runner, RunConfig, StreamingMode, Session
from google.genai import types
# Environment setup
os.environ['GOOGLE_GENAI_USE_VERTEXAI'] = '1'
os.environ['GOOGLE_CLOUD_PROJECT'] = 'your-project-id'
os.environ['GOOGLE_CLOUD_LOCATION'] = 'us-central1'
class StreamingChatApp:
"""Interactive streaming chat application."""
def __init__(self):
"""Initialize chat application."""
# Create chat agent
self.agent = Agent(
model='gemini-2.0-flash',
name='chat_assistant',
description='An assistant that can search the web.',
instruction="""
You are a helpful, friendly assistant engaging in natural conversation.
Guidelines:
- Be conversational and engaging
- Provide detailed explanations when asked
- Ask clarifying questions if needed
- Remember conversation context
- Be concise for simple queries, detailed for complex ones
""".strip(),
generate_content_config=types.GenerateContentConfig(
temperature=0.7, # Conversational
max_output_tokens=2048
)
)
# Create session for conversation context
self.session = Session()
# Configure streaming
self.run_config = RunConfig(
streaming_mode=StreamingMode.SSE
)
self.runner = Runner()
async def stream_response(self, user_message: str) -> AsyncIterator[str]:
"""
Stream agent response to user message.
Args:
user_message: User's input message
Yields:
Text chunks as they're generated
"""
# Run agent with streaming
async for event in self.runner.run_async(
user_message,
agent=self.agent,
session=self.session,
run_config=self.run_config
):
# Extract text from event
if event.content and event.content.parts:
for part in event.content.parts:
if part.text:
yield part.text
async def chat_turn(self, user_message: str):
"""
Execute one chat turn with streaming display.
Args:
user_message: User's input message
"""
# Display user message
timestamp = datetime.now().strftime('%H:%M:%S')
print(f"\n[{timestamp}] User: {user_message}")
# Display agent response with streaming
timestamp = datetime.now().strftime('%H:%M:%S')
print(f"[{timestamp}] Agent: ", end='', flush=True)
# Stream response chunks
async for chunk in self.stream_response(user_message):
print(chunk, end='', flush=True)
print() # New line after complete response
async def run_interactive(self):
"""Run interactive chat loop."""
print("="*70)
print("STREAMING CHAT APPLICATION")
print("="*70)
print("Type 'exit' or 'quit' to end conversation")
print("="*70)
while True:
try:
# Get user input
user_input = input("\nYou: ").strip()
if not user_input:
continue
# Check for exit
if user_input.lower() in ['exit', 'quit']:
print("\nGoodbye!")
break
# Process chat turn
await self.chat_turn(user_input)
except KeyboardInterrupt:
print("\n\nInterrupted. Goodbye!")
break
except Exception as e:
print(f"\nError: {e}")
async def run_demo(self):
"""Run demo conversation."""
print("="*70)
print("STREAMING CHAT DEMO")
print("="*70)
demo_messages = [
"Hello! What can you help me with?",
"Explain quantum computing in simple terms",
"What are the practical applications?",
"How does it compare to classical computing?"
]
for message in demo_messages:
await self.chat_turn(message)
await asyncio.sleep(1) # Pause between turns
async def main():
"""Main entry point."""
chat = StreamingChatApp()
# Run demo
await chat.run_demo()
# Uncomment for interactive mode:
# await chat.run_interactive()
if __name__ == '__main__':
asyncio.run(main())
Streaming Chat Application Architecture:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β StreamingChatApp Class β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββ β
β β Agent β β Session β β Runner β β
β β (Gemini-2.0) β β (Context) β β (Executes) β β
β β β β β β β β
β β β’ Model config β β β’ Conversation β β β’ run_async β β
β β β’ Instructions β β history β β β’ Streaming β β
β β β’ Temperature β β β’ User state β β mode β β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββ β
β β β β β
βββββββββββββΌββββββββββββββββββββββΌββββββββββββββββββββΌβββββββββ€
β β β β β
β ββββββββββΌββββββββββββββββββββββΌββββββββββββββββββββΌββββββ β
β β RunConfig(streaming_mode=SSE) β β
β ββββββββββ²ββββββββββββββββββββββ²ββββββββββββββββββββ²ββββββ β
β β β β β
β ββββββββββ΄ββββββββββββββββββββββ΄ββββββββββββββββββββ΄ββββββ β
β β Streaming Response Flow β β
β β β β
β β User Input βββΊ Runner.run_async() βββΊ Events βββΊ β β
β β β β
β β Chunks yielded progressively β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β β
βββββββββββββΌββββββββββββββββββββββΌββββββββββββββββββββΌβββββββββ€
β βΌ βΌ βΌ β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββ β
β β chat_turn() β β stream_response β β Display β β
β β method β β () async gen β β (flush=True) β β
β β β β β β β β
β β β’ Format msg β β β’ Yield chunks β β β’ Progressiveβ β
β β β’ Timestamp β β β’ Handle async β β output β β
β β β’ Error handle β β β’ Context β β β’ Real-time β β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Expected Outputβ
======================================================================
STREAMING CHAT DEMO
======================================================================
[14:23:15] User: Hello! What can you help me with?
[14:23:15] Agent: Hello! I'm here to help with a wide variety of tasks...
[Streams progressively...]
...I can explain concepts, answer questions, help with writing, assist
with problem-solving, provide information on various topics, and much more.
What would you like to explore today?
[14:23:18] User: Explain quantum computing in simple terms
[14:23:18] Agent: Imagine regular computers use bits, which are like...
[Streams progressively...]
...light switches that are either ON (1) or OFF (0). Quantum computers use
quantum bits, or "qubits," which can be both ON and OFF at the same time...
[Continues streaming...]
...This allows quantum computers to explore many possibilities simultaneously,
making them potentially much faster for certain types of problems.
[14:23:25] User: What are the practical applications?
[14:23:25] Agent: Great question! Here are some key applications...
[Streams progressively...]
1. **Drug Discovery**: Simulating molecular interactions...
2. **Cryptography**: Breaking current encryption and creating quantum-safe...
3. **Optimization**: Solving complex logistics and scheduling...
4. **Financial Modeling**: Analyzing risk and portfolio optimization...
5. **Artificial Intelligence**: Training more sophisticated ML models...
[14:23:32] User: How does it compare to classical computing?
[14:23:32] Agent: Let me break down the key differences...
[Streams progressively...]
**Classical Computing:**
- Sequential processing (one calculation at a time)
- Deterministic (same input β same output)
- Excellent for everyday tasks...
**Quantum Computing:**
- Parallel exploration (many paths simultaneously)
- Probabilistic (results have probabilities)
- Excels at specific complex problems...
Think of it this way: a classical computer is like checking...
4. Advanced Streaming Patternsβ
Pattern 1: Response Aggregationβ
Collect the complete response while streaming:
from typing import List
async def stream_and_aggregate(query: str, agent: Agent) -> tuple[str, List[str]]:
"""
Stream response while collecting chunks.
Returns:
(complete_text, chunks_list)
"""
runner = Runner()
run_config = RunConfig(streaming_mode=StreamingMode.SSE)
chunks = []
async for event in runner.run_async(query, agent=agent, run_config=run_config):
if event.content and event.content.parts:
chunk = event.content.parts[0].text
chunks.append(chunk)
print(chunk, end='', flush=True)
complete_text = ''.join(chunks)
return complete_text, chunks
# Usage
complete, chunks = await stream_and_aggregate(
"Explain machine learning",
agent
)
print(f"\n\nTotal chunks: {len(chunks)}")
print(f"Total length: {len(complete)} characters")
Pattern 1: Response Aggregation Flow:
βββββββββββββββββββ
β Start β
β Streaming β
βββββββββ¬ββββββββββ
β
βΌ
βββββββββββββββββββ βββββββββββββββββββ
β Initialize ββββββΊβ Create β
β Runner & β β Empty Chunks β
β RunConfig β β List β
βββββββββ¬ββββββββββ βββββββββ¬ββββββββββ
β β
βΌ βΌ
βββββββββββββββββββ βββββββββββββββββββ
β Async Loop ββββββΊβ Process β
β Events β β Each Event β
βββββββββ¬ββββββββββ βββββββββ¬ββββββββββ
β β
βΌ βΌ
βββββββββββββββββββ βββββββββββββββββββ
β Extract Text ββββββΊβ Append to β
β from Chunk β β Chunks List β
βββββββββ¬ββββββββββ βββββββββ¬ββββββββββ
β β
βΌ βΌ
βββββββββββββββββββ βββββββββββββββββββ
β Display ββββββΊβ Continue β
β Progress β β Loop Until β
β (Optional) β β Complete β
βββββββββ¬ββββββββββ βββββββββ¬ββββββββββ
β β
ββββββΌβββββ β
βTimeout? β β
βNo β β
ββββββ¬βββββ β
β β
βΌ βΌ
βββββββββββββββ βββββββββββββββ
β Complete β β Timeout β
β Response β β Reached β
β Displayed β β β
βββββββββββββββ βββββββββββββββ
Pattern 2: Streaming with Progress Indicatorsβ
Show progress during streaming:
import sys
async def stream_with_progress(query: str, agent: Agent):
"""Stream with visual progress indicator."""
runner = Runner()
run_config = RunConfig(streaming_mode=StreamingMode.SSE)
print("Agent: ", end='', flush=True)
chunk_count = 0
async for event in runner.run_async(query, agent=agent, run_config=run_config):
if event.content and event.content.parts:
chunk = event.content.parts[0].text
print(chunk, end='', flush=True)
chunk_count += 1
# Show progress indicator every 10 chunks
if chunk_count % 10 == 0:
sys.stderr.write('.')
sys.stderr.flush()
print() # New line
# Usage
await stream_with_progress("Write a long essay on AI", agent)
Pattern 3: Streaming to Multiple Outputsβ
Send streaming response to multiple destinations:
from typing import List, Callable
async def stream_to_multiple(
query: str,
agent: Agent,
outputs: List[Callable[[str], None]]
):
"""
Stream response to multiple output handlers.
Args:
query: User query
agent: Agent to use
outputs: List of functions to handle each chunk
"""
runner = Runner()
run_config = RunConfig(streaming_mode=StreamingMode.SSE)
async for event in runner.run_async(query, agent=agent, run_config=run_config):
if event.content and event.content.parts:
chunk = event.content.parts[0].text
# Send to all outputs
for output_fn in outputs:
output_fn(chunk)
# Usage
async def console_output(chunk: str):
print(chunk, end='', flush=True)
async def file_output(chunk: str):
with open('response.txt', 'a') as f:
f.write(chunk)
async def websocket_output(chunk: str):
# await websocket.send(chunk)
pass
await stream_to_multiple(
"Explain AI safety",
agent,
outputs=[console_output, file_output, websocket_output]
)
Pattern 3: Multiple Output Destinations:
βββββββββββββββββββ
β User Query β
β "Explain AI β
β safety" β
βββββββββ¬ββββββββββ
β
βΌ
βββββββββββββββββββ βββββββββββββββββββ
β Runner. ββββββΊβ Stream β
β run_async() β β Events β
βββββββββ¬ββββββββββ βββββββββ¬ββββββββββ
β β
βΌ βΌ
βββββββββββββββββββ βββββββββββββββββββ
β Extract ββββββΊβ Distribute β
β Text Chunk β β to All β
β from Event β β Outputs β
βββββββββ¬ββββββββββ βββββββββ¬ββββββββββ
β β
ββββββββββββββββββββββββΌβββββββββββββββββββββββ€
β β β
βΌ βΌ βΌ
βββββββββββββββ βββββββββββββββ βββββββββββββββ
β Console β β File β β WebSocket β
β Output β β Output β β Output β
β (Terminal) β β (response. β β (Real-time β
β β β txt) β β UI) β
β print(chunk,β β with open() β β await send β
β end='', β β .write() β β (chunk) β
β flush=True)β β β β β
βββββββββββββββ βββββββββββββββ βββββββββββββββ
Pattern 4: Streaming with Timeoutβ
Add timeout protection:
import asyncio
async def stream_with_timeout(
query: str,
agent: Agent,
timeout_seconds: float = 30.0
):
"""Stream response with timeout."""
runner = Runner()
run_config = RunConfig(streaming_mode=StreamingMode.SSE)
try:
async with asyncio.timeout(timeout_seconds):
async for event in runner.run_async(query, agent=agent, run_config=run_config):
if event.content and event.content.parts:
chunk = event.content.parts[0].text
print(chunk, end='', flush=True)
except asyncio.TimeoutError:
print("\n\n[Timeout: Response took too long]")
print()
# Usage
await stream_with_timeout("Explain the universe", agent, timeout_seconds=10.0)
Pattern 4: Timeout Protection Flow:
βββββββββββββββββββ
β Start β
β Streaming β
β Request β
βββββββββ¬ββββββββββ
β
βΌ
βββββββββββββββββββ βββββββββββββββββββ
β Set Timeout ββββββΊβ Begin Async β
β (e.g., 30s) β β Timeout β
β β β Context β
βββββββββ¬ββββββββββ βββββββββ¬ββββββββββ
β β
βΌ βΌ
βββββββββββββββββββ βββββββββββββββββββ
β Start ββββββΊβ Process β
β Streaming β β Events in β
β Loop β β Timeout β
βββββββββ¬ββββββββββ βββββββββ¬ββββββββββ
β β
βΌ βΌ
βββββββββββββββββββ βββββββββββββββββββ
β Display ββββββΊβ Check for β
β Each Chunk β β Completion β
β Progressively β β or Timeout β
βββββββββ¬ββββββββββ βββββββββ¬ββββββββββ
β β
ββββββΌβββββ β
βTimeout? β β
βNo β β
ββββββ¬βββββ β
β β
βΌ βΌ
βββββββββββββββ βββββββββββββββ
β Complete β β Timeout β
β Response β β Reached β
β Displayed β β β
βββββββββββββββ βββββββββββββββ
5. StreamingResponseAggregatorβ
ADK provides StreamingResponseAggregator
for handling streaming responses:
from google.adk.models.streaming_response_aggregator import StreamingResponseAggregator
async def stream_with_aggregator(query: str, agent: Agent):
"""Use StreamingResponseAggregator for cleaner code."""
runner = Runner()
run_config = RunConfig(streaming_mode=StreamingMode.SSE)
aggregator = StreamingResponseAggregator()
async for event in runner.run_async(query, agent=agent, run_config=run_config):
# Aggregator handles chunk collection
aggregator.add(event)
# Display chunk
if event.content and event.content.parts:
print(event.content.parts[0].text, end='', flush=True)
# Get complete response
complete_response = aggregator.get_response()
print(f"\n\nComplete response has {len(complete_response.content.parts[0].text)} characters")
return complete_response
# Usage
response = await stream_with_aggregator("Explain blockchain", agent)
6. Building Web APIs with Streamingβ
FastAPI SSE Endpointβ
"""
FastAPI endpoint with SSE streaming.
"""
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from google.adk.agents import Agent, Runner, RunConfig, StreamingMode
from google.adk.cli.adk_web_server import AdkWebServer
import json
app = FastAPI()
# Create agent
agent = Agent(
model='gemini-2.0-flash',
name='api_assistant'
)
runner = Runner()
async def generate_stream(query: str):
"""Generate SSE stream."""
run_config = RunConfig(streaming_mode=StreamingMode.SSE)
async for event in runner.run_async(query, agent=agent, run_config=run_config):
if event.content and event.content.parts:
# Format as SSE
chunk = event.content.parts[0].text
data = json.dumps({'text': chunk})
yield f"data: {data}\n\n"
# Send completion signal
yield "data: [DONE]\n\n"
@app.post("/chat/stream")
async def chat_stream(query: str):
"""Streaming chat endpoint."""
return StreamingResponse(
generate_stream(query),
media_type="text/event-stream"
)
# Usage with ADK web server
if __name__ == '__main__':
# adk api_server automatically sets up streaming endpoints
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Web API SSE Architecture:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Client-Server SSE Flow β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββ HTTP/1.1 βββββββββββ β
β β Browser βββββββββββββββββββββββββββββΊβ FastAPI β β
β β Client β Server-Sent Events β Server β β
β β β (text/event-stream) β β β
β β β’ EventSource() βββββββββββββββββββββββββββββΊβ β’ SSE β β
β β β’ onmessage β data: {"text": "chunk"} β Endpointβ β
β β β’ onerror βββββββββββββββββββββββββββββΊβ β’ Streamingβ β
β β β’ close() β data: [DONE] β Responseβ β
β βββββββββββββββββββ ββββββ¬βββββ β
β β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββΌββββββββ€
β β β
β βββββββββββββββββββ βββββββββββββββββββ β β
β β ADK Runner ββββββββββββ€ Agent β β β
β β (Streaming) β β (Gemini) β β β
β β β β β β β
β β β’ run_async() β β β’ Model β β β
β β β’ SSE Events β β β’ Instructions β β β
β β β’ Event Loop β β β’ Context β β β
β βββββββββββββββββββ βββββββββββββββββββ β β
β β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββΌββββββββ€
β β β
β Data Flow Direction β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββΌββββββ β
β β β β
β β 1. Client connects: GET /chat/stream?query=Hello β β
β β 2. Server starts: Runner.run_async() with SSE β β
β β 3. Agent generates: Text chunks progressively β β
β β 4. Server sends: data: {"text": "chunk"}\n\n β β
β β β 5. Client receives: EventSource.onmessage() β β
β β 6. UI updates: document.getElementById().innerHTML += β β
β β 7. Completion: data: [DONE]\n\n closes connection β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Client-Side JavaScriptβ
// Connect to SSE endpoint
const eventSource = new EventSource(
"http://localhost:8000/chat/stream?query=Hello"
);
eventSource.onmessage = (event) => {
if (event.data === "[DONE]") {
eventSource.close();
return;
}
const data = JSON.parse(event.data);
// Display chunk in UI
document.getElementById("response").innerHTML += data.text;
};
eventSource.onerror = (error) => {
console.error("SSE Error:", error);
eventSource.close();
};
7. Best Practicesβ
Streaming Implementation Guidelines:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β BEST PRACTICES MATRIX β
βββββββββββββββββββββββ¬ββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββ€
β Category β β
DO β β DON'T β
βββββββββββββββββββββββΌββββββββββββββββββββββΌββββββββββββββββββββββββββββββ€
β Response Type β Use streaming for β Block on long responses β
β β long/verbose output β (>10 seconds wait time) β
βββββββββββββββββββββββΌββββββββββββββββββββββΌββββββββββββββββββββββββββββββ€
β Async Handling β Proper async/await β Mix sync/async contexts β
β β throughout chain β (blocks event loop) β
βββββββββββββββββββββββΌββββββββββββββββββββββΌββββββββββββββββββββββββββββββ€
β Output Display β flush=True for β Buffered output (delayed β
β β immediate terminal β display, poor UX) β
β β display β β
βββββββββββββββββββββββΌββββββββββββββββββββββΌββββββββββββββββββββββββββββββ€
β Error Handling β Try/catch streaming β Ignore streaming errors β
β β failures β (silent failures) β
βββββββββββββββββββββββΌββββββββββββββββββββββΌββββββββββββββββββββββββββββββ€
β Session Mgmt β Use sessions for β Stateless conversations β
β β conversation contextβ (lose context) β
βββββββββββββββββββββββΌββββββββββββββββββββββΌββββββββββββββββββββββββββββββ€
β Timeout Control β Set reasonable β Infinite streaming waits β
β β timeouts (10-60s) β (resource exhaustion) β
βββββββββββββββββββββββΌββββββββββββββββββββββΌββββββββββββββββββββββββββββββ€
β Resource Usage β Monitor chunk sizes β Unbounded memory usage β
β β and latency β (memory leaks) β
βββββββββββββββββββββββΌββββββββββββββββββββββΌββββββββββββββββββββββββββββββ€
β Testing Strategy β Test with real β Mock all streaming β
β β streaming data β (misses real issues) β
βββββββββββββββββββββββΌββββββββββββββββββββββΌββββββββββββββββββββββββββββββ€
β Production Ready β Graceful degradationβ Fail fast without fallbacks β
β β to blocking mode β (brittle deployments) β
βββββββββββββββββββββββ΄ββββββββββββββββββββββ΄ββββββββββββββββββββββββββββββ
β DO: Use Streaming for Long Responsesβ
# β
Good - Stream long responses
run_config = RunConfig(streaming_mode=StreamingMode.SSE)
async for event in runner.run_async(
"Write a detailed essay on climate change",
agent=agent,
run_config=run_config
):
display_chunk(event)
# β Bad - Blocking for long response
result = runner.run(
"Write a detailed essay on climate change",
agent=agent
)
# User waits 10+ seconds for complete response
β DO: Handle Async Properlyβ
# β
Good - Proper async handling
async def handle_stream():
async for event in runner.run_async(...):
await process_event(event)
asyncio.run(handle_stream())
# β Bad - Blocking in async context
async def handle_stream():
result = runner.run(...) # Blocks async loop
β DO: Flush Output Immediatelyβ
# β
Good - Flush for immediate display
print(chunk, end='', flush=True)
# β Bad - Buffered output (delayed display)
print(chunk, end='') # No flush
β DO: Handle Streaming Errorsβ
# β
Good - Error handling
async def safe_stream(query, agent):
try:
run_config = RunConfig(streaming_mode=StreamingMode.SSE)
async for event in runner.run_async(query, agent=agent, run_config=run_config):
if event.content and event.content.parts:
print(event.content.parts[0].text, end='', flush=True)
except Exception as e:
print(f"\n[Error during streaming: {e}]")
# β Bad - No error handling
async for event in runner.run_async(...):
print(event.content.parts[0].text) # Crashes on error
β DO: Use Sessions for Contextβ
# β
Good - Session maintains conversation context
session = Session()
for message in conversation:
async for event in runner.run_async(
message,
agent=agent,
session=session, # Context preserved
run_config=run_config
):
process_event(event)
# β Bad - No session (loses context)
for message in conversation:
async for event in runner.run_async(message, agent=agent, run_config=run_config):
process_event(event)
8. Troubleshootingβ
Issue: "Streaming classes not available"β
Problem: Import errors for StreamingMode
, Runner
, Session
, or RunConfig
Solution: Ensure you're using ADK v1.16.0 or later. These classes are available in current ADK:
# β
Working with ADK v1.16.0+
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.agents.run_config import RunConfig, StreamingMode
from google.adk.sessions import InMemorySessionService
from google.genai import types
# Use the working implementation
from streaming_agent import stream_agent_response
async for chunk in stream_agent_response("Hello"):
print(chunk, end='', flush=True)
Issue: "No streaming happening"β
Problem: Response appears all at once instead of progressively
Solutions:
- Check ADK version:
pip show google-adk # Should be 1.16.0 or later
- Verify streaming configuration:
# β
Correct streaming config
run_config = RunConfig(streaming_mode=StreamingMode.SSE)
# β Wrong - no streaming
run_config = RunConfig(streaming_mode=StreamingMode.NONE)
- Check output flushing:
# β
Flushed immediately
print(chunk, end='', flush=True)
# β Buffered (delayed display)
print(chunk, end='')
Issue: "Agent.run_async method not found"β
Problem: AttributeError: 'LlmAgent' object has no attribute 'run_async'
Solution: The Agent class uses Runner.run_async()
. The working
implementation uses real ADK streaming with fallback to simulation:
# β
Working approach - uses real ADK streaming
from streaming_agent import stream_agent_response
async for chunk in stream_agent_response("Hello"):
print(chunk, end='', flush=True)
# β Won't work - Agent doesn't have run_async directly
# agent.run_async(query) # AttributeError
Issue: "Streaming performance issues"β
Problem: Real streaming feels too slow or has performance issues
Solution: The implementation uses real ADK streaming. If performance issues occur, the code falls back to simulated streaming. Check your network and API key:
# Check ADK version
pip show google-adk # Should be 1.16.0+
# Test basic connectivity
python -c "import google.genai; print('GenAI import OK')"
If real streaming fails, you'll see a warning and fallback to simulation.
Issue: "Import errors with google.adk.agents"β
Problem: Cannot import expected classes from ADK
Solution: Check your ADK version and use the working implementation:
# Check ADK version
pip show google.adk.agents
# Use working implementation instead
cd tutorial_implementation/tutorial14
python -c "from streaming_agent import stream_agent_response"
9. Testing Streaming Agentsβ
Unit Testsβ
import pytest
from google.adk.agents import Agent, Runner, RunConfig, StreamingMode
@pytest.mark.asyncio
async def test_streaming_response():
"""Test that streaming returns multiple chunks."""
agent = Agent(
model='gemini-2.0-flash',
instruction='Provide detailed responses.'
)
runner = Runner()
run_config = RunConfig(streaming_mode=StreamingMode.SSE)
chunks = []
async for event in runner.run_async(
"Explain machine learning in detail",
agent=agent,
run_config=run_config
):
if event.content and event.content.parts:
chunks.append(event.content.parts[0].text)
# Should receive multiple chunks
assert len(chunks) > 1
# Complete text should be reasonable length
complete = ''.join(chunks)
assert len(complete) > 100
@pytest.mark.asyncio
async def test_streaming_aggregation():
"""Test that streaming chunks combine correctly."""
agent = Agent(model='gemini-2.0-flash')
runner = Runner()
run_config = RunConfig(streaming_mode=StreamingMode.SSE)
chunks = []
async for event in runner.run_async(
"Count to 10",
agent=agent,
run_config=run_config
):
if event.content and event.content.parts:
chunks.append(event.content.parts[0].text)
complete = ''.join(chunks)
# Should contain all numbers
for i in range(1, 11):
assert str(i) in complete
Summaryβ
You've mastered streaming responses with SSE:
Key Takeaways:
- β
StreamingMode.SSE
enables progressive response output - β
Use
RunConfig
to configure streaming - β
runner.run_async()
withasync for
for streaming - β Better UX - users see progress immediately
- β Essential for long responses and real-time applications
- β Works with sessions for conversation context
- β Can combine with tools and code execution
- β
flush=True
for immediate terminal output
Production Checklist:
- Using
RunConfig(streaming_mode=StreamingMode.SSE)
- Proper async/await handling
- Error handling for streaming failures
- Session management for context
- Output flushing (
flush=True
) - Timeout protection for long streams
- Testing streaming with multiple queries
- Monitoring chunk sizes and latency
Next Steps:
- Tutorial 15: Learn Live API for bidirectional streaming with audio
- Tutorial 16: Explore MCP Integration for extended tool ecosystem
- Tutorial 17: Implement Agent-to-Agent (A2A) communication
Resources:
π Tutorial 14 Complete! You now know how to implement streaming responses for better user experience. Continue to Tutorial 15 to learn about bidirectional streaming with the Live API.
π¬ Join the Discussion
Have questions or feedback? Discuss this tutorial with the community on GitHub Discussions.