Skip to main content

Tutorial 34: Google Cloud Pub/Sub + Event-Driven Agents

This tutorial implements a real event-driven document processing system using Google Cloud Pub/Sub and ADK agents. It demonstrates a coordinator + specialist agents pattern with structured JSON output using Pydantic models. Verified as of October 2025 with latest ADK and Gemini 2.5 Flash.

Estimated Reading Time: 50-60 minutes
Difficulty Level: Advanced
Prerequisites: Tutorial 01-03 (ADK Basics), Google Cloud project


πŸš€ Quick Start - Working Implementation​

The easiest way to get started is with our complete working implementation:

cd tutorial_implementation/tutorial34
make setup # Install dependencies
make test # Run all tests

πŸ“ View Full Implementation

What's included:

  • βœ… root_agent: Coordinator agent that routes documents to specialists
  • βœ… 4 Specialist agents: Financial, Technical, Sales, Marketing analyzers
  • βœ… Pydantic output schemas: Structured JSON results
  • βœ… 66 comprehensive tests (all passing)
  • βœ… Real-world example code ready to run

Table of Contents​

  1. Overview
  2. Prerequisites & Setup
  3. Understanding the Architecture
  4. Core Components
  5. Running Locally
  6. Google Cloud Deployment
  7. Troubleshooting
  8. Next Steps

Overview​

What You'll Build​

In this tutorial, you'll build an event-driven document processing system using:

  • Google Cloud Pub/Sub (Event messaging)
  • Google ADK (Multi-agent coordination)
  • Gemini 2.5 Flash (Document analysis)
  • Pydantic Models (Structured JSON output)

Architecture:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Publisher: Sends documents to Pub/Sub β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Google Cloud Pub/Sub β”‚
β”‚ (document-uploads) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ root_agent (Coordinator)
β”‚ - Routes documents β”‚
β”‚ - Coordinates analysisβ”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β”Œβ”€β”€β”¬β”€β”€β”¬β”€β”€β”¬β”€β”€β”˜
β”‚ β”‚ β”‚ β”‚
β”Œβ”€β”€β”€β”€β–Όβ” β”‚ β”Œβ”΄β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚Fin. β”‚ β”‚ β”‚Tech β”‚ Sales Marketing
β”‚Anal.β”‚ β”‚ β”‚Anal.β”‚Analyst Analyst
β””β”€β”€β”€β”€β”€β”˜ β”‚ β””β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Why Pub/Sub + ADK?​

FeatureBenefit
AsynchronousNon-blocking processing
DecoupledPublishers and subscribers independent
ScalableAuto-scales message volume
StructuredPydantic models for JSON
ReliableAt-least-once delivery, retries

When to use Pub/Sub + ADK:

βœ… Asynchronous document processing
βœ… Multi-step workflows
βœ… Event-driven architectures
βœ… Systems with strict output schemas
βœ… Google Cloud deployments

❌ Real-time chat interfaces β†’ Use Next.js/WebSocket
❌ Simple synchronous calls β†’ Use direct API


Prerequisites & Setup​

Local Testing (No GCP Required)​

To get started without Google Cloud:

# Install dependencies
cd tutorial_implementation/tutorial34
make setup

# Run tests - verifies agent configuration
make test

# This works completely locally using in-memory processing

Google Cloud Setup (Optional - For Real Pub/Sub)​

To deploy with real Google Cloud Pub/Sub:

1. Install gcloud CLI​

# macOS
brew install --cask google-cloud-sdk

# Then initialize
gcloud init

2. Authenticate​

# Login to Google Cloud
gcloud auth login

# Set default project
gcloud config set project your-project-id

# Verify authentication
gcloud auth list

3. Create Pub/Sub Resources​

# Enable Pub/Sub API
gcloud services enable pubsub.googleapis.com

# Create topic
gcloud pubsub topics create document-uploads

# Create subscription
gcloud pubsub subscriptions create document-processor \
--topic=document-uploads \
--ack-deadline=600

4. Set Environment Variables​

# Set your GCP project
export GCP_PROJECT="your-project-id"

# Set Gemini API key
export GOOGLE_API_KEY="your_gemini_api_key"

# Set application credentials
gcloud auth application-default login

Understanding the Architecture​

The Coordinator + Specialist Pattern​

This implementation uses a coordinator agent that intelligently routes documents to specialized analyzers:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ root_agent (Coordinator) β”‚
β”‚ - Analyzes document type β”‚
β”‚ - Routes to appropriate analyzer β”‚
β”‚ - Coordinates specialized agents β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β”Œβ”€β”€β”€β”΄β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ β”‚ β”‚ β”‚
β”Œβ”€β”€β”€β–Όβ”€β”€β” β”Œβ”€β”€β–Όβ”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”
β”‚Finan.β”‚ β”‚Tech. β”‚ β”‚Sales β”‚ β”‚Marketing
β”‚Anal. β”‚ β”‚Anal. β”‚ β”‚Analyst β”‚ β”‚Analyst
β””β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚ β”‚ β”‚ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
Structured JSON Output
(Pydantic Models)

Key Components​

  1. root_agent (pubsub_agent/agent.py):

    • Coordinator that routes documents to specialists
    • Analyzes document type and content
    • Calls appropriate sub-agent tool
    • Returns structured analysis
  2. Sub-Agents (financial, technical, sales, marketing):

    • Specialized analyzers for document types
    • Enforce structured JSON via Pydantic output_schema
    • Extract type-specific metrics and insights
  3. Pydantic Output Schemas:

    • FinancialAnalysisOutput: Revenue, profit, metrics
    • TechnicalAnalysisOutput: Technologies, components
    • SalesAnalysisOutput: Deals, pipeline value
    • MarketingAnalysisOutput: Campaigns, engagement metrics

Pub/Sub Guarantees​

FeatureDescription
At-least-onceMessages delivered β‰₯1 time
AsynchronousNon-blocking processing
ScalableAuto-scales message volume
DurableMessages stored in topics
ReliableAutomatic retries on failure

Core Components​

Agent Configuration​

View the agent at pubsub_agent/agent.py:

# Coordinator agent
root_agent = LlmAgent(
name="pubsub_processor",
model="gemini-2.5-flash",
description="Event-driven document processing coordinator",
instruction="Routes documents to specialized analyzers",
tools=[financial_tool, technical_tool, sales_tool, marketing_tool],
)

# Sub-agents (financial, technical, sales, marketing)
# Each configured with output_schema for structured JSON

Output Schemas​

All sub-agents return structured Pydantic models:

# Financial documents return:
FinancialAnalysisOutput(
summary: DocumentSummary,
entities: EntityExtraction,
financial_metrics: FinancialMetrics,
fiscal_periods: list[str],
recommendations: list[str]
)

# Technical documents return:
TechnicalAnalysisOutput(
summary: DocumentSummary,
entities: EntityExtraction,
technologies: list[str],
components: list[str],
recommendations: list[str]
)

# Similar for Sales and Marketing analyzers

Example Usage​

Locally without GCP:

cd tutorial_implementation/tutorial34
make test

Test the agent in code:

import asyncio
from google.adk import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from pubsub_agent.agent import root_agent

async def test_document_analysis():
session_service = InMemorySessionService()
runner = Runner(
app_name="document_analyzer",
agent=root_agent,
session_service=session_service
)

session = await session_service.create_session(
app_name="document_analyzer",
user_id="test_user"
)

prompt = types.Content(
role="user",
parts=[types.Part(
text="Analyze: Revenue $1.2M, Profit 33%, Q4 2024"
)]
)

async for event in runner.run_async(
user_id="test_user",
session_id=session.id,
new_message=prompt
):
print("Response:", event)

asyncio.run(test_document_analysis())

Using ADK Web Interface:

adk web

Then visit http://localhost:8000 and select pubsub_processor from the agent dropdown.


Running Locally​

Without Pub/Sub (Local Testing)​

cd tutorial_implementation/tutorial34

# Run all tests
make test

# See test coverage
make test-cov

Tests validate:

  • Agent configuration
  • Sub-agent setup
  • Pydantic output schemas
  • Agent imports and structure

With Pub/Sub (Google Cloud)​

After setting up GCP (see Prerequisites), run publisher and subscriber:

Terminal 1 - Start subscriber:

export GCP_PROJECT="your-project-id"
export GOOGLE_API_KEY="your_api_key"

python subscriber.py

Terminal 2 - Publish documents:

export GCP_PROJECT="your-project-id"

python publisher.py

The subscriber will process each document with the coordinator agent.


Google Cloud Deployment​

Step 1: Set Up Pub/Sub Resources​

gcloud pubsub topics create document-uploads
gcloud pubsub subscriptions create document-processor \
--topic=document-uploads \
--ack-deadline=600

Step 2: Run Subscriber​

export GCP_PROJECT=$(gcloud config get-value project)
export GOOGLE_API_KEY="your_api_key"

python subscriber.py

Step 3: Publish Documents​

python publisher.py

The subscriber will automatically process each Pub/Sub message using the coordinator agent.


Troubleshooting​

Common Issues​

Issue 1: gcloud command not found​

Cause: Google Cloud CLI not installed

Solution:

# macOS
brew install --cask google-cloud-sdk

# After installation, verify
gcloud --version

Issue 2: Agent not found when running locally​

Cause: Agent module not properly installed

Solution:

cd tutorial_implementation/tutorial34

# Install in development mode
pip install -e .

# Verify agent imports
python -c "from pubsub_agent.agent import root_agent; print(root_agent.name)"

Issue 3: Tests fail with import errors​

Cause: Dependencies not installed

Solution:

cd tutorial_implementation/tutorial34

# Install dependencies
make setup

# Or manually
pip install -r requirements.txt

# Run tests
make test

Issue 4: Messages Not Delivered on Pub/Sub​

Cause: Subscription not receiving published messages

Solution:

# Verify subscription exists
gcloud pubsub subscriptions list

# Check subscription details
gcloud pubsub subscriptions describe document-processor

# Manually pull a message to test
gcloud pubsub subscriptions pull document-processor --limit=1

# Check IAM permissions
gcloud pubsub subscriptions get-iam-policy document-processor

Issue 5: Pub/Sub Authentication Error​

Error: DefaultCredentialsError: Could not automatically determine credentials

Solution:

# Set up application default credentials
gcloud auth application-default login

# Or set explicit credentials
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/key.json"

# Verify setup
gcloud auth list

Issue 6: Tests fail with "GOOGLE_API_KEY not set"​

Cause: Gemini API key not configured

Solution:

# Set your Gemini API key
export GOOGLE_API_KEY="your_actual_api_key"

# Verify it's set
echo $GOOGLE_API_KEY

# Run tests again
make test

Issue 7: Agent processes documents but returns empty results​

Cause: Model not returning expected output format

Solution:

  • Verify GOOGLE_API_KEY is set and valid
  • Check that the document content is clear and valid
  • Review agent instructions in pubsub_agent/agent.py
  • Test with a simple document first
# Test the agent directly
import asyncio
from google.adk import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from pubsub_agent.agent import root_agent

async def test():
session_service = InMemorySessionService()
runner = Runner(
app_name="test",
agent=root_agent,
session_service=session_service
)
session = await session_service.create_session(
app_name="test",
user_id="test"
)
message = types.Content(
role="user",
parts=[types.Part(text="Revenue $1M, Profit 30%")]
)
async for event in runner.run_async(
user_id="test",
session_id=session.id,
new_message=message
):
print(event)

asyncio.run(test())

Next Steps​

You've Mastered Event-Driven Agents with Pub/Sub! πŸŽ‰β€‹

You now know how to:

βœ… Build multi-agent coordinator systems
βœ… Use Pydantic for structured JSON output
βœ… Implement async agent processing
βœ… Route documents to specialized analyzers
βœ… Use Google Cloud Pub/Sub for event-driven processing
βœ… Test agents locally without GCP
βœ… Deploy to production with Pub/Sub integration

Key Patterns Learned​

  • Coordinator + Specialist: One agent routes to many specialized agents
  • Structured Output: Pydantic models enforce JSON schemas
  • Async Processing: Non-blocking document analysis
  • Event-Driven: Pub/Sub handles message buffering and retries
  • Tool Composition: Sub-agents as tools within coordinator

Continue Learning​

Tutorial 29: UI Integration Overview
Compare all integration approaches (Next.js, Vite, Streamlit, etc.)

Tutorial 30: Next.js + CopilotKit Integration
Build real-time chat interfaces with React

Tutorial 35+: Advanced Patterns
Master deployment, scaling, and production optimization

Additional Resources​


πŸŽ‰ Tutorial 34 Complete!

You've successfully built an event-driven document processing system with a multi-agent coordinator architecture. This pattern scales to millions of documents while maintaining structured, validated output.


Questions or feedback? Open an issue on the ADK Training Repository.

πŸ’¬ Join the Discussion

Have questions or feedback? Discuss this tutorial with the community on GitHub Discussions.