Tutorial 34: Google Cloud Pub/Sub + Event-Driven Agents
This tutorial implements a real event-driven document processing system using Google Cloud Pub/Sub and ADK agents. It demonstrates a coordinator + specialist agents pattern with structured JSON output using Pydantic models. Verified as of October 2025 with latest ADK and Gemini 2.5 Flash.
Estimated Reading Time: 50-60 minutes
Difficulty Level: Advanced
Prerequisites: Tutorial 01-03 (ADK Basics), Google Cloud project
π Quick Start - Working Implementationβ
The easiest way to get started is with our complete working implementation:
cd tutorial_implementation/tutorial34
make setup # Install dependencies
make test # Run all tests
What's included:
- β
root_agent
: Coordinator agent that routes documents to specialists - β 4 Specialist agents: Financial, Technical, Sales, Marketing analyzers
- β Pydantic output schemas: Structured JSON results
- β 66 comprehensive tests (all passing)
- β Real-world example code ready to run
Table of Contentsβ
- Overview
- Prerequisites & Setup
- Understanding the Architecture
- Core Components
- Running Locally
- Google Cloud Deployment
- Troubleshooting
- Next Steps
Overviewβ
What You'll Buildβ
In this tutorial, you'll build an event-driven document processing system using:
- Google Cloud Pub/Sub (Event messaging)
- Google ADK (Multi-agent coordination)
- Gemini 2.5 Flash (Document analysis)
- Pydantic Models (Structured JSON output)
Architecture:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Publisher: Sends documents to Pub/Sub β
ββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββ
β
βββββββββββββΌβββββββββββββ
β Google Cloud Pub/Sub β
β (document-uploads) β
βββββββββββββ¬βββββββββββββ
β
βββββββββββββΌβββββββββββββ
β root_agent (Coordinator)
β - Routes documents β
β - Coordinates analysisβ
βββββββββββββ¬βββββββββββββ
β
ββββ¬βββ¬βββ¬βββ
β β β β
ββββββΌβ β ββ΄βββββ¬ββββββββββ
βFin. β β βTech β Sales Marketing
βAnal.β β βAnal.βAnalyst Analyst
βββββββ β βββββββ΄βββββββββ
Why Pub/Sub + ADK?β
Feature | Benefit |
---|---|
Asynchronous | Non-blocking processing |
Decoupled | Publishers and subscribers independent |
Scalable | Auto-scales message volume |
Structured | Pydantic models for JSON |
Reliable | At-least-once delivery, retries |
When to use Pub/Sub + ADK:
β
Asynchronous document processing
β
Multi-step workflows
β
Event-driven architectures
β
Systems with strict output schemas
β
Google Cloud deployments
β Real-time chat interfaces β Use Next.js/WebSocket
β Simple synchronous calls β Use direct API
Prerequisites & Setupβ
Local Testing (No GCP Required)β
To get started without Google Cloud:
# Install dependencies
cd tutorial_implementation/tutorial34
make setup
# Run tests - verifies agent configuration
make test
# This works completely locally using in-memory processing
Google Cloud Setup (Optional - For Real Pub/Sub)β
To deploy with real Google Cloud Pub/Sub:
1. Install gcloud CLIβ
# macOS
brew install --cask google-cloud-sdk
# Then initialize
gcloud init
2. Authenticateβ
# Login to Google Cloud
gcloud auth login
# Set default project
gcloud config set project your-project-id
# Verify authentication
gcloud auth list
3. Create Pub/Sub Resourcesβ
# Enable Pub/Sub API
gcloud services enable pubsub.googleapis.com
# Create topic
gcloud pubsub topics create document-uploads
# Create subscription
gcloud pubsub subscriptions create document-processor \
--topic=document-uploads \
--ack-deadline=600
4. Set Environment Variablesβ
# Set your GCP project
export GCP_PROJECT="your-project-id"
# Set Gemini API key
export GOOGLE_API_KEY="your_gemini_api_key"
# Set application credentials
gcloud auth application-default login
Understanding the Architectureβ
The Coordinator + Specialist Patternβ
This implementation uses a coordinator agent that intelligently routes documents to specialized analyzers:
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β root_agent (Coordinator) β
β - Analyzes document type β
β - Routes to appropriate analyzer β
β - Coordinates specialized agents β
βββββββββ¬βββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββ΄ββββ¬ββββββββββββββ¬βββββββββββββββ
β β β β
βββββΌβββ ββββΌββββ βββββββββΌββββ ββββββββΌβββ
βFinan.β βTech. β βSales β βMarketing
βAnal. β βAnal. β βAnalyst β βAnalyst
ββββββββ ββββββββ βββββββββββββ βββββββββββ
β β β β
ββββββββββ΄ββββββββββββββ΄βββββββββββββββ
β
Structured JSON Output
(Pydantic Models)
Key Componentsβ
-
root_agent (
pubsub_agent/agent.py
):- Coordinator that routes documents to specialists
- Analyzes document type and content
- Calls appropriate sub-agent tool
- Returns structured analysis
-
Sub-Agents (financial, technical, sales, marketing):
- Specialized analyzers for document types
- Enforce structured JSON via Pydantic output_schema
- Extract type-specific metrics and insights
-
Pydantic Output Schemas:
FinancialAnalysisOutput
: Revenue, profit, metricsTechnicalAnalysisOutput
: Technologies, componentsSalesAnalysisOutput
: Deals, pipeline valueMarketingAnalysisOutput
: Campaigns, engagement metrics
Pub/Sub Guaranteesβ
Feature | Description |
---|---|
At-least-once | Messages delivered β₯1 time |
Asynchronous | Non-blocking processing |
Scalable | Auto-scales message volume |
Durable | Messages stored in topics |
Reliable | Automatic retries on failure |
Core Componentsβ
Agent Configurationβ
View the agent at pubsub_agent/agent.py
:
# Coordinator agent
root_agent = LlmAgent(
name="pubsub_processor",
model="gemini-2.5-flash",
description="Event-driven document processing coordinator",
instruction="Routes documents to specialized analyzers",
tools=[financial_tool, technical_tool, sales_tool, marketing_tool],
)
# Sub-agents (financial, technical, sales, marketing)
# Each configured with output_schema for structured JSON
Output Schemasβ
All sub-agents return structured Pydantic models:
# Financial documents return:
FinancialAnalysisOutput(
summary: DocumentSummary,
entities: EntityExtraction,
financial_metrics: FinancialMetrics,
fiscal_periods: list[str],
recommendations: list[str]
)
# Technical documents return:
TechnicalAnalysisOutput(
summary: DocumentSummary,
entities: EntityExtraction,
technologies: list[str],
components: list[str],
recommendations: list[str]
)
# Similar for Sales and Marketing analyzers
Example Usageβ
Locally without GCP:
cd tutorial_implementation/tutorial34
make test
Test the agent in code:
import asyncio
from google.adk import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from pubsub_agent.agent import root_agent
async def test_document_analysis():
session_service = InMemorySessionService()
runner = Runner(
app_name="document_analyzer",
agent=root_agent,
session_service=session_service
)
session = await session_service.create_session(
app_name="document_analyzer",
user_id="test_user"
)
prompt = types.Content(
role="user",
parts=[types.Part(
text="Analyze: Revenue $1.2M, Profit 33%, Q4 2024"
)]
)
async for event in runner.run_async(
user_id="test_user",
session_id=session.id,
new_message=prompt
):
print("Response:", event)
asyncio.run(test_document_analysis())
Using ADK Web Interface:
adk web
Then visit http://localhost:8000
and select pubsub_processor
from
the agent dropdown.
Running Locallyβ
Without Pub/Sub (Local Testing)β
cd tutorial_implementation/tutorial34
# Run all tests
make test
# See test coverage
make test-cov
Tests validate:
- Agent configuration
- Sub-agent setup
- Pydantic output schemas
- Agent imports and structure
With Pub/Sub (Google Cloud)β
After setting up GCP (see Prerequisites), run publisher and subscriber:
Terminal 1 - Start subscriber:
export GCP_PROJECT="your-project-id"
export GOOGLE_API_KEY="your_api_key"
python subscriber.py
Terminal 2 - Publish documents:
export GCP_PROJECT="your-project-id"
python publisher.py
The subscriber will process each document with the coordinator agent.
Google Cloud Deploymentβ
Step 1: Set Up Pub/Sub Resourcesβ
gcloud pubsub topics create document-uploads
gcloud pubsub subscriptions create document-processor \
--topic=document-uploads \
--ack-deadline=600
Step 2: Run Subscriberβ
export GCP_PROJECT=$(gcloud config get-value project)
export GOOGLE_API_KEY="your_api_key"
python subscriber.py
Step 3: Publish Documentsβ
python publisher.py
The subscriber will automatically process each Pub/Sub message using the coordinator agent.
Troubleshootingβ
Common Issuesβ
Issue 1: gcloud command not foundβ
Cause: Google Cloud CLI not installed
Solution:
# macOS
brew install --cask google-cloud-sdk
# After installation, verify
gcloud --version
Issue 2: Agent not found when running locallyβ
Cause: Agent module not properly installed
Solution:
cd tutorial_implementation/tutorial34
# Install in development mode
pip install -e .
# Verify agent imports
python -c "from pubsub_agent.agent import root_agent; print(root_agent.name)"
Issue 3: Tests fail with import errorsβ
Cause: Dependencies not installed
Solution:
cd tutorial_implementation/tutorial34
# Install dependencies
make setup
# Or manually
pip install -r requirements.txt
# Run tests
make test
Issue 4: Messages Not Delivered on Pub/Subβ
Cause: Subscription not receiving published messages
Solution:
# Verify subscription exists
gcloud pubsub subscriptions list
# Check subscription details
gcloud pubsub subscriptions describe document-processor
# Manually pull a message to test
gcloud pubsub subscriptions pull document-processor --limit=1
# Check IAM permissions
gcloud pubsub subscriptions get-iam-policy document-processor
Issue 5: Pub/Sub Authentication Errorβ
Error: DefaultCredentialsError: Could not automatically determine credentials
Solution:
# Set up application default credentials
gcloud auth application-default login
# Or set explicit credentials
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/key.json"
# Verify setup
gcloud auth list
Issue 6: Tests fail with "GOOGLE_API_KEY not set"β
Cause: Gemini API key not configured
Solution:
# Set your Gemini API key
export GOOGLE_API_KEY="your_actual_api_key"
# Verify it's set
echo $GOOGLE_API_KEY
# Run tests again
make test
Issue 7: Agent processes documents but returns empty resultsβ
Cause: Model not returning expected output format
Solution:
- Verify GOOGLE_API_KEY is set and valid
- Check that the document content is clear and valid
- Review agent instructions in
pubsub_agent/agent.py
- Test with a simple document first
# Test the agent directly
import asyncio
from google.adk import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from pubsub_agent.agent import root_agent
async def test():
session_service = InMemorySessionService()
runner = Runner(
app_name="test",
agent=root_agent,
session_service=session_service
)
session = await session_service.create_session(
app_name="test",
user_id="test"
)
message = types.Content(
role="user",
parts=[types.Part(text="Revenue $1M, Profit 30%")]
)
async for event in runner.run_async(
user_id="test",
session_id=session.id,
new_message=message
):
print(event)
asyncio.run(test())
Next Stepsβ
You've Mastered Event-Driven Agents with Pub/Sub! πβ
You now know how to:
β
Build multi-agent coordinator systems
β
Use Pydantic for structured JSON output
β
Implement async agent processing
β
Route documents to specialized analyzers
β
Use Google Cloud Pub/Sub for event-driven processing
β
Test agents locally without GCP
β
Deploy to production with Pub/Sub integration
Key Patterns Learnedβ
- Coordinator + Specialist: One agent routes to many specialized agents
- Structured Output: Pydantic models enforce JSON schemas
- Async Processing: Non-blocking document analysis
- Event-Driven: Pub/Sub handles message buffering and retries
- Tool Composition: Sub-agents as tools within coordinator
Continue Learningβ
Tutorial 29: UI Integration Overview
Compare all integration approaches (Next.js, Vite, Streamlit, etc.)
Tutorial 30: Next.js + CopilotKit Integration
Build real-time chat interfaces with React
Tutorial 35+: Advanced Patterns
Master deployment, scaling, and production optimization
Additional Resourcesβ
π Tutorial 34 Complete!
You've successfully built an event-driven document processing system with a multi-agent coordinator architecture. This pattern scales to millions of documents while maintaining structured, validated output.
Questions or feedback? Open an issue on the ADK Training Repository.
π¬ Join the Discussion
Have questions or feedback? Discuss this tutorial with the community on GitHub Discussions.