From 349e00a1bcfac0acc6006e51f6b69ef3043b14be Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 22 Nov 2025 15:55:35 +0000 Subject: [PATCH 1/2] Add Google ADK runner support - Implement GoogleADKRunner class extending BaseAgentRunner - Add support for Google Agent Development Kit (ADK) integration - Include both run() and run_streamed() methods with event forwarding - Add conditional imports to make google-adk an optional dependency - Create example usage file demonstrating GoogleADKRunner The runner follows the same pattern as OpenAIRunner and provides: - Automatic activity tracking - Session management with InMemorySessionService - Streaming support with async event forwarding - Integration with runner.tools.report_status for progress tracking --- examples/google_adk_example.py | 133 ++++++++++++ src/agentexec/__init__.py | 8 + src/agentexec/runners/__init__.py | 8 + src/agentexec/runners/google_adk.py | 302 ++++++++++++++++++++++++++++ 4 files changed, 451 insertions(+) create mode 100644 examples/google_adk_example.py create mode 100644 src/agentexec/runners/google_adk.py diff --git a/examples/google_adk_example.py b/examples/google_adk_example.py new file mode 100644 index 0000000..a110caa --- /dev/null +++ b/examples/google_adk_example.py @@ -0,0 +1,133 @@ +"""Example usage of GoogleADKRunner with the Google Agent Development Kit. + +This example demonstrates how to: +1. Create a Google ADK agent with tools +2. Use GoogleADKRunner for execution +3. Track activity with the built-in report_status tool + +Requirements: + pip install google-adk agentexec + +Note: You'll need to set up Google Cloud credentials for this to work. +Set the GOOGLE_API_KEY environment variable or configure ADC. +""" + +import asyncio +import uuid + +from google.adk.agents import LlmAgent + +import agentexec as ax + + +# Example tool for the agent +def search_company_info(company_name: str) -> str: + """Search for basic information about a company. + + Args: + company_name: Name of the company to search for. + + Returns: + Basic company information. + """ + # This is a mock implementation + return f"Company: {company_name}\nIndustry: Technology\nFounded: 2020\nEmployees: 500" + + +async def main(): + """Run a simple Google ADK agent with activity tracking.""" + + # Generate a unique agent ID for tracking + agent_id = uuid.uuid4() + + # Create the runner with activity tracking + runner = ax.GoogleADKRunner( + agent_id=agent_id, + app_name="company_research", + max_turns_recovery=False, + report_status_prompt="Use report_activity(message, percentage) to report your progress.", + ) + + # Create a Google ADK agent + # Note: Include the runner's prompts and tools for activity tracking + research_agent = LlmAgent( + name="Company Research Agent", + model="gemini-2.0-flash", + instruction=f"""You are a thorough company research analyst. + +When researching a company: +1. Use the search_company_info tool to gather information +2. Analyze the data you find +3. Provide a concise summary + +{runner.prompts.report_status}""", + tools=[ + search_company_info, + runner.tools.report_status, # Add the activity tracking tool + ], + ) + + # Run the agent + print(f"Starting research with agent_id: {agent_id}") + print("-" * 60) + + result = await runner.run( + agent=research_agent, + input="Research Acme Corporation and provide a brief overview.", + max_turns=10, + ) + + # Extract and display the result + print("\n" + "=" * 60) + print("FINAL RESULT:") + print("=" * 60) + if result.final_output: + print(result.final_output) + else: + print("No final output received") + + print(f"\nTotal events: {len(result.events)}") + + +async def streaming_example(): + """Example of using the streaming mode with event forwarding.""" + + agent_id = uuid.uuid4() + + runner = ax.GoogleADKRunner( + agent_id=agent_id, + app_name="streaming_demo", + ) + + agent = LlmAgent( + name="Streaming Agent", + model="gemini-2.0-flash", + instruction="You are a helpful assistant. Be concise.", + tools=[runner.tools.report_status], + ) + + # Define an event forwarder to process events in real-time + async def handle_event(event): + """Process each event as it arrives.""" + print(f"[EVENT] {type(event).__name__}") + if hasattr(event, "is_final_response") and event.is_final_response(): + print(f"[FINAL] {event.content.parts[0].text if event.content.parts else 'No text'}") + + print(f"Starting streaming agent with agent_id: {agent_id}") + print("-" * 60) + + result = await runner.run_streamed( + agent=agent, + input="What is 2 + 2?", + forwarder=handle_event, + ) + + print(f"\nStreaming complete. Total events: {len(result.events)}") + + +if __name__ == "__main__": + # Run the basic example + asyncio.run(main()) + + # Uncomment to run the streaming example + # asyncio.run(streaming_example()) diff --git a/src/agentexec/__init__.py b/src/agentexec/__init__.py index 579a101..d1ddfd7 100644 --- a/src/agentexec/__init__.py +++ b/src/agentexec/__init__.py @@ -71,3 +71,11 @@ async def research_company(payload: dict, agent_id: str): __all__.append("OpenAIRunner") except ImportError: pass + +# Google ADK runner is only available if google-adk package is installed +try: + from agentexec.runners import GoogleADKRunner + + __all__.append("GoogleADKRunner") +except ImportError: + pass diff --git a/src/agentexec/runners/__init__.py b/src/agentexec/runners/__init__.py index d1af0fd..dc511db 100644 --- a/src/agentexec/runners/__init__.py +++ b/src/agentexec/runners/__init__.py @@ -11,3 +11,11 @@ __all__.append("OpenAIRunner") except ImportError: pass + +# Google ADK runner is only available if google-adk package is installed +try: + from agentexec.runners.google_adk import GoogleADKRunner + + __all__.append("GoogleADKRunner") +except ImportError: + pass diff --git a/src/agentexec/runners/google_adk.py b/src/agentexec/runners/google_adk.py new file mode 100644 index 0000000..1cbe5fd --- /dev/null +++ b/src/agentexec/runners/google_adk.py @@ -0,0 +1,302 @@ +import logging +import uuid +from typing import Any, Callable + +from google.adk.agents import LlmAgent +from google.adk.runners import InMemoryRunner, Runner +from google.adk.sessions import InMemorySessionService +from google.genai import types + +from agentexec.runners.base import BaseAgentRunner, _RunnerTools + + +logger = logging.getLogger(__name__) + + +class _GoogleADKRunnerTools(_RunnerTools): + """Google ADK-specific tools wrapper. + + Note: Google ADK tools are typically defined as regular Python functions. + The agent framework handles tool registration automatically. + """ + + @property + def report_status(self) -> Any: + """Get the status update tool for Google ADK. + + Returns the plain function as Google ADK handles tool registration. + """ + return super().report_status + + +class GoogleADKRunResult: + """Result wrapper for Google ADK agent execution.""" + + def __init__(self, final_content: types.Content | None, events: list[Any]): + """Initialize the result. + + Args: + final_content: The final response content from the agent. + events: List of all events generated during execution. + """ + self.final_content = final_content + self.events = events + + @property + def final_output(self) -> str | None: + """Extract the final text output from the result.""" + if self.final_content and self.final_content.parts: + # Extract text from the first part + for part in self.final_content.parts: + if hasattr(part, "text") and part.text: + return part.text + return None + + +class GoogleADKRunner(BaseAgentRunner): + """Runner for Google Agent Development Kit (ADK) with automatic activity tracking. + + This runner wraps the Google ADK and provides: + - Automatic agent_id generation + - Activity lifecycle management (QUEUED -> RUNNING -> COMPLETE/ERROR) + - Session management with InMemorySessionService + - Status update tool with agent_id pre-baked + - Streaming event support + + Example: + runner = agentexec.GoogleADKRunner( + agent_id=agent_id, + app_name="my_app", + max_turns_recovery=False, + report_status_prompt="Use report_activity(message, percentage) to report progress.", + ) + + agent = LlmAgent( + name="Research Agent", + model="gemini-2.0-flash", + instruction=f"Research companies. {runner.prompts.report_status}", + tools=[runner.tools.report_status], + ) + + result = await runner.run( + agent=agent, + input="Research Acme Corp", + max_turns=15, + ) + """ + + app_name: str + session_service: InMemorySessionService + _runner: Runner | None + + def __init__( + self, + agent_id: uuid.UUID, + *, + app_name: str = "agentexec", + max_turns_recovery: bool = False, + wrap_up_prompt: str | None = None, + recovery_turns: int = 5, + report_status_prompt: str | None = None, + ) -> None: + """Initialize the Google ADK runner. + + Args: + agent_id: UUID for tracking this agent's activity. + app_name: Application name for session management. + max_turns_recovery: Enable automatic recovery when max turns exceeded. + wrap_up_prompt: Prompt to use for recovery run. + recovery_turns: Number of turns allowed for recovery. + report_status_prompt: Instruction snippet about using the status tool. + """ + super().__init__( + agent_id, + max_turns_recovery=max_turns_recovery, + recovery_turns=recovery_turns, + wrap_up_prompt=wrap_up_prompt, + report_status_prompt=report_status_prompt, + ) + # Override with Google ADK-specific tools + self.tools = _GoogleADKRunnerTools(self.agent_id) + + # Initialize session service + self.app_name = app_name + self.session_service = InMemorySessionService() + self._runner = None + + def _get_or_create_runner(self, agent: LlmAgent) -> Runner: + """Get or create a Runner instance for the agent. + + Args: + agent: The LlmAgent to create a runner for. + + Returns: + Configured Runner instance. + """ + # Create a new runner for this agent + return Runner( + agent=agent, + app_name=self.app_name, + session_service=self.session_service, + ) + + async def _ensure_session(self, user_id: str, session_id: str) -> None: + """Ensure a session exists in the session service. + + Args: + user_id: User identifier for the session. + session_id: Session identifier. + """ + # Create session if it doesn't exist + try: + await self.session_service.create_session( + app_name=self.app_name, + user_id=user_id, + session_id=session_id, + ) + except Exception as e: + # Session might already exist, which is fine + logger.debug(f"Session creation note: {e}") + + async def run( + self, + agent: LlmAgent, + input: str | types.Content, + max_turns: int = 10, + context: Any | None = None, + user_id: str | None = None, + session_id: str | None = None, + ) -> GoogleADKRunResult: + """Run the agent with automatic activity tracking. + + Args: + agent: LlmAgent instance. + input: User input/prompt for the agent (string or Content object). + max_turns: Maximum number of agent iterations (Note: Google ADK manages this internally). + context: Optional context for the agent run (not used by Google ADK currently). + user_id: User ID for session management (defaults to agent_id). + session_id: Session ID (defaults to agent_id). + + Returns: + GoogleADKRunResult with final output and events. + """ + # Use agent_id as default for user_id and session_id + user_id = user_id or str(self.agent_id) + session_id = session_id or str(self.agent_id) + + # Ensure session exists + await self._ensure_session(user_id, session_id) + + # Create runner + runner = self._get_or_create_runner(agent) + + # Convert input to Content if it's a string + if isinstance(input, str): + content = types.Content( + role="user", + parts=[types.Part(text=input)] + ) + else: + content = input + + # Run the agent and collect events + events = [] + final_content = None + + try: + async for event in runner.run_async( + user_id=user_id, + session_id=session_id, + new_message=content, + ): + events.append(event) + if hasattr(event, "is_final_response") and event.is_final_response(): + final_content = event.content + + except Exception as e: + logger.error(f"Error during Google ADK agent execution: {e}") + raise + + return GoogleADKRunResult(final_content=final_content, events=events) + + async def run_streamed( + self, + agent: LlmAgent, + input: str | types.Content, + max_turns: int = 10, + context: Any | None = None, + forwarder: Callable | None = None, + user_id: str | None = None, + session_id: str | None = None, + ) -> GoogleADKRunResult: + """Run the agent in streaming mode with automatic activity tracking. + + The forwarder callback receives each event as it's generated, allowing + real-time processing of agent execution. + + Args: + agent: LlmAgent instance. + input: User input/prompt for the agent (string or Content object). + max_turns: Maximum number of agent iterations (Note: Google ADK manages this internally). + context: Optional context for the agent run (not used by Google ADK currently). + forwarder: Optional async callback to process each event as it arrives. + user_id: User ID for session management (defaults to agent_id). + session_id: Session ID (defaults to agent_id). + + Returns: + GoogleADKRunResult with final output and events. + + Example: + async def handle_event(event): + print(f"Event: {event}") + + result = await runner.run_streamed( + agent=agent, + input="Research XYZ", + forwarder=handle_event + ) + """ + # Use agent_id as default for user_id and session_id + user_id = user_id or str(self.agent_id) + session_id = session_id or str(self.agent_id) + + # Ensure session exists + await self._ensure_session(user_id, session_id) + + # Create runner + runner = self._get_or_create_runner(agent) + + # Convert input to Content if it's a string + if isinstance(input, str): + content = types.Content( + role="user", + parts=[types.Part(text=input)] + ) + else: + content = input + + # Run the agent in streaming mode + events = [] + final_content = None + + try: + async for event in runner.run_async( + user_id=user_id, + session_id=session_id, + new_message=content, + ): + events.append(event) + + # Forward event if forwarder is provided + if forwarder: + await forwarder(event) + + # Check if this is the final response + if hasattr(event, "is_final_response") and event.is_final_response(): + final_content = event.content + + except Exception as e: + logger.error(f"Error during Google ADK agent streaming execution: {e}") + raise + + return GoogleADKRunResult(final_content=final_content, events=events) From a335c07d2563cc229131856305a7e2e68b8ae58e Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 1 Dec 2025 07:54:57 +0000 Subject: [PATCH 2/2] Refactor GoogleADKRunner to follow ADK conventions Updated implementation to properly use Google ADK's built-in features: - Replace max_turns with RunConfig (ADK's execution control mechanism) - Use max_llm_calls parameter (default 500) instead of max_turns - Remove max_turns_recovery logic (not applicable to ADK) - Pass run_config parameter through to runner.run_async() - Defer to ADK's internal execution control - Update example to demonstrate RunConfig usage - Simplify API to match ADK conventions ADK uses RunConfig for runtime behavior control including: - max_llm_calls: safeguard against runaway loops - streaming_mode: control streaming behavior - support_cfc: function calling configuration This aligns with ADK best practices and reduces unnecessary abstraction layers. --- examples/google_adk_example.py | 12 ++++-- src/agentexec/runners/google_adk.py | 57 ++++++++++++++++++----------- 2 files changed, 45 insertions(+), 24 deletions(-) diff --git a/examples/google_adk_example.py b/examples/google_adk_example.py index a110caa..ed9ff39 100644 --- a/examples/google_adk_example.py +++ b/examples/google_adk_example.py @@ -4,6 +4,7 @@ 1. Create a Google ADK agent with tools 2. Use GoogleADKRunner for execution 3. Track activity with the built-in report_status tool +4. Control execution limits using RunConfig Requirements: pip install google-adk agentexec @@ -16,6 +17,7 @@ import uuid from google.adk.agents import LlmAgent +from google.adk.core.run_config import RunConfig import agentexec as ax @@ -44,7 +46,6 @@ async def main(): runner = ax.GoogleADKRunner( agent_id=agent_id, app_name="company_research", - max_turns_recovery=False, report_status_prompt="Use report_activity(message, percentage) to report your progress.", ) @@ -67,14 +68,19 @@ async def main(): ], ) - # Run the agent + # Run the agent with execution control + # RunConfig controls execution limits (default max_llm_calls=500) print(f"Starting research with agent_id: {agent_id}") print("-" * 60) + run_config = RunConfig( + max_llm_calls=100, # Limit to 100 LLM calls to prevent runaway execution + ) + result = await runner.run( agent=research_agent, input="Research Acme Corporation and provide a brief overview.", - max_turns=10, + run_config=run_config, ) # Extract and display the result diff --git a/src/agentexec/runners/google_adk.py b/src/agentexec/runners/google_adk.py index 1cbe5fd..edf9b12 100644 --- a/src/agentexec/runners/google_adk.py +++ b/src/agentexec/runners/google_adk.py @@ -3,7 +3,8 @@ from typing import Any, Callable from google.adk.agents import LlmAgent -from google.adk.runners import InMemoryRunner, Runner +from google.adk.core.run_config import RunConfig +from google.adk.runners import Runner from google.adk.sessions import InMemorySessionService from google.genai import types @@ -62,12 +63,14 @@ class GoogleADKRunner(BaseAgentRunner): - Session management with InMemorySessionService - Status update tool with agent_id pre-baked - Streaming event support + - Execution control via RunConfig (max_llm_calls, etc.) Example: + from google.adk.core.run_config import RunConfig + runner = agentexec.GoogleADKRunner( agent_id=agent_id, app_name="my_app", - max_turns_recovery=False, report_status_prompt="Use report_activity(message, percentage) to report progress.", ) @@ -78,10 +81,13 @@ class GoogleADKRunner(BaseAgentRunner): tools=[runner.tools.report_status], ) + # Use RunConfig to control execution limits (default max_llm_calls=500) + run_config = RunConfig(max_llm_calls=100) + result = await runner.run( agent=agent, input="Research Acme Corp", - max_turns=15, + run_config=run_config, ) """ @@ -94,9 +100,6 @@ def __init__( agent_id: uuid.UUID, *, app_name: str = "agentexec", - max_turns_recovery: bool = False, - wrap_up_prompt: str | None = None, - recovery_turns: int = 5, report_status_prompt: str | None = None, ) -> None: """Initialize the Google ADK runner. @@ -104,16 +107,15 @@ def __init__( Args: agent_id: UUID for tracking this agent's activity. app_name: Application name for session management. - max_turns_recovery: Enable automatic recovery when max turns exceeded. - wrap_up_prompt: Prompt to use for recovery run. - recovery_turns: Number of turns allowed for recovery. report_status_prompt: Instruction snippet about using the status tool. """ + # Google ADK handles execution control via RunConfig, so we disable + # the base class's max_turns_recovery feature super().__init__( agent_id, - max_turns_recovery=max_turns_recovery, - recovery_turns=recovery_turns, - wrap_up_prompt=wrap_up_prompt, + max_turns_recovery=False, + recovery_turns=0, + wrap_up_prompt=None, report_status_prompt=report_status_prompt, ) # Override with Google ADK-specific tools @@ -162,8 +164,7 @@ async def run( self, agent: LlmAgent, input: str | types.Content, - max_turns: int = 10, - context: Any | None = None, + run_config: RunConfig | None = None, user_id: str | None = None, session_id: str | None = None, ) -> GoogleADKRunResult: @@ -172,8 +173,8 @@ async def run( Args: agent: LlmAgent instance. input: User input/prompt for the agent (string or Content object). - max_turns: Maximum number of agent iterations (Note: Google ADK manages this internally). - context: Optional context for the agent run (not used by Google ADK currently). + run_config: Optional RunConfig for execution control (max_llm_calls, etc.). + Defaults to RunConfig() with max_llm_calls=500. user_id: User ID for session management (defaults to agent_id). session_id: Session ID (defaults to agent_id). @@ -199,6 +200,10 @@ async def run( else: content = input + # Use default RunConfig if not provided + if run_config is None: + run_config = RunConfig() + # Run the agent and collect events events = [] final_content = None @@ -208,6 +213,7 @@ async def run( user_id=user_id, session_id=session_id, new_message=content, + run_config=run_config, ): events.append(event) if hasattr(event, "is_final_response") and event.is_final_response(): @@ -223,8 +229,7 @@ async def run_streamed( self, agent: LlmAgent, input: str | types.Content, - max_turns: int = 10, - context: Any | None = None, + run_config: RunConfig | None = None, forwarder: Callable | None = None, user_id: str | None = None, session_id: str | None = None, @@ -232,13 +237,15 @@ async def run_streamed( """Run the agent in streaming mode with automatic activity tracking. The forwarder callback receives each event as it's generated, allowing - real-time processing of agent execution. + real-time processing of agent execution. This method is functionally + equivalent to run() but emphasizes the streaming nature of ADK's + run_async() method. Args: agent: LlmAgent instance. input: User input/prompt for the agent (string or Content object). - max_turns: Maximum number of agent iterations (Note: Google ADK manages this internally). - context: Optional context for the agent run (not used by Google ADK currently). + run_config: Optional RunConfig for execution control (max_llm_calls, etc.). + Defaults to RunConfig() with max_llm_calls=500. forwarder: Optional async callback to process each event as it arrives. user_id: User ID for session management (defaults to agent_id). session_id: Session ID (defaults to agent_id). @@ -247,12 +254,15 @@ async def run_streamed( GoogleADKRunResult with final output and events. Example: + from google.adk.core.run_config import RunConfig + async def handle_event(event): print(f"Event: {event}") result = await runner.run_streamed( agent=agent, input="Research XYZ", + run_config=RunConfig(max_llm_calls=100), forwarder=handle_event ) """ @@ -275,6 +285,10 @@ async def handle_event(event): else: content = input + # Use default RunConfig if not provided + if run_config is None: + run_config = RunConfig() + # Run the agent in streaming mode events = [] final_content = None @@ -284,6 +298,7 @@ async def handle_event(event): user_id=user_id, session_id=session_id, new_message=content, + run_config=run_config, ): events.append(event)