diff --git a/go/api/v1alpha1/agent_types.go b/go/api/v1alpha1/agent_types.go index ffd98b31c..dab1386fe 100644 --- a/go/api/v1alpha1/agent_types.go +++ b/go/api/v1alpha1/agent_types.go @@ -37,7 +37,7 @@ type AgentSpec struct { // +optional ModelConfig string `json:"modelConfig,omitempty"` // Whether to stream the response from the model. - // If not specified, the default value is true. + // If not specified, the default value is false. // +optional Stream *bool `json:"stream,omitempty"` // +kubebuilder:validation:MaxItems=20 diff --git a/go/api/v1alpha2/agent_types.go b/go/api/v1alpha2/agent_types.go index f28c3f01d..5e1702b71 100644 --- a/go/api/v1alpha2/agent_types.go +++ b/go/api/v1alpha2/agent_types.go @@ -86,7 +86,7 @@ type DeclarativeAgentSpec struct { // +optional ModelConfig string `json:"modelConfig,omitempty"` // Whether to stream the response from the model. - // If not specified, the default value is true. + // If not specified, the default value is false. // +optional Stream *bool `json:"stream,omitempty"` // +kubebuilder:validation:MaxItems=20 diff --git a/go/config/crd/bases/kagent.dev_agents.yaml b/go/config/crd/bases/kagent.dev_agents.yaml index 7b6bbc1ea..530a77aaa 100644 --- a/go/config/crd/bases/kagent.dev_agents.yaml +++ b/go/config/crd/bases/kagent.dev_agents.yaml @@ -2204,7 +2204,7 @@ spec: stream: description: |- Whether to stream the response from the model. - If not specified, the default value is true. + If not specified, the default value is false. type: boolean systemMessage: minLength: 1 @@ -8828,7 +8828,7 @@ spec: stream: description: |- Whether to stream the response from the model. - If not specified, the default value is true. + If not specified, the default value is false. type: boolean systemMessage: description: SystemMessage is a string specifying the system message diff --git a/go/internal/adk/types.go b/go/internal/adk/types.go index ca08f8ab3..e6be4a959 100644 --- a/go/internal/adk/types.go +++ b/go/internal/adk/types.go @@ -253,6 +253,7 @@ type AgentConfig struct { SseTools []SseMcpServerConfig `json:"sse_tools"` RemoteAgents []RemoteAgentConfig `json:"remote_agents"` ExecuteCode bool `json:"execute_code,omitempty"` + Stream bool `json:"stream,omitempty"` } func (a *AgentConfig) UnmarshalJSON(data []byte) error { @@ -263,6 +264,8 @@ func (a *AgentConfig) UnmarshalJSON(data []byte) error { HttpTools []HttpMcpServerConfig `json:"http_tools"` SseTools []SseMcpServerConfig `json:"sse_tools"` RemoteAgents []RemoteAgentConfig `json:"remote_agents"` + ExecuteCode bool `json:"execute_code"` + Stream bool `json:"stream"` } if err := json.Unmarshal(data, &tmp); err != nil { return err @@ -277,6 +280,8 @@ func (a *AgentConfig) UnmarshalJSON(data []byte) error { a.HttpTools = tmp.HttpTools a.SseTools = tmp.SseTools a.RemoteAgents = tmp.RemoteAgents + a.ExecuteCode = tmp.ExecuteCode + a.Stream = tmp.Stream return nil } diff --git a/go/internal/controller/translator/agent/adk_api_translator.go b/go/internal/controller/translator/agent/adk_api_translator.go index a215ad383..7a81b88c7 100644 --- a/go/internal/controller/translator/agent/adk_api_translator.go +++ b/go/internal/controller/translator/agent/adk_api_translator.go @@ -512,6 +512,7 @@ func (a *adkApiTranslator) translateInlineAgent(ctx context.Context, agent *v1al Instruction: systemMessage, Model: model, ExecuteCode: ptr.Deref(agent.Spec.Declarative.ExecuteCodeBlocks, false), + Stream: ptr.Deref(agent.Spec.Declarative.Stream, false), } for _, tool := range agent.Spec.Declarative.Tools { diff --git a/go/pkg/app/app.go b/go/pkg/app/app.go index deee627da..8ee3b9589 100644 --- a/go/pkg/app/app.go +++ b/go/pkg/app/app.go @@ -453,7 +453,6 @@ func Start(getExtensionConfig GetExtensionConfig) { } } - //nolint:govet if webhookCertWatcher != nil { setupLog.Info("Adding webhook certificate watcher to manager") if err := mgr.Add(webhookCertWatcher); err != nil { diff --git a/go/test/e2e/invoke_api_test.go b/go/test/e2e/invoke_api_test.go index b01af46f3..c2e2c0ff8 100644 --- a/go/test/e2e/invoke_api_test.go +++ b/go/test/e2e/invoke_api_test.go @@ -668,7 +668,7 @@ func TestE2EInvokeSTSIntegration(t *testing.T) { agent := setupAgentWithOptions(t, cli, tools, AgentOptions{ Name: "test-sts-agent", SystemMessage: "You are an agent that adds numbers using the add tool available to you through the everything-mcp-server.", - Stream: &[]bool{true}[0], + Stream: ptr.To(true), Env: []corev1.EnvVar{ { Name: "STS_WELL_KNOWN_URI", @@ -685,7 +685,7 @@ func TestE2EInvokeSTSIntegration(t *testing.T) { // access token for test user with the may act claim allowing system:serviceaccount:kagent:test-sts to // perform operations on behalf of the test user - subjectToken := "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ0ZXN0LXVzZXIiLCJtYXlfYWN0Ijp7InN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDprYWdlbnQ6dGVzdC1zdHMifSwibmFtZSI6IkpvaG4gRG9lIiwiYWRtaW4iOnRydWUsImlhdCI6MTc2MDEzNDM3M30.f3BcH4mGgmx0v9SCrZAfmg9uB_pP523AChoW-VfEpIdOncyis1OQWPwfQaIzmDOyclKKSYdeOS6j3znWDjAhWDbX3oJtxahy2sE5UVUjiknyAeN2YoNarK3n97gOHLuS6_Whabm8IuZVR78a0c5cIBlbOHv6M9g9LJZOofxozoOOmtMA5Qr4J3gXrrl5WBH52l6TqkdM3ak79mWYTmjijs4FLndKpqjRGvVaP2GRLJ9hkNRKsh40klIud6LXl7SePt3gTXD1Vtmv8WLqmpHrpiOMOsLfTpryA9OSFFKP0Ju7lLtUdfa_ZukH13ZuOnYVA6v0lOs6_7Ic75elc7YCOQ" + subjectToken := "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ0ZXN0LXVzZXIiLCJtYXlfYWN0Ijp7InN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDprYWdlbnQ6dGVzdC1zdHMifSwibmFtZSI6IkpvaG4gRG9lIiwiYWRtaW4iOnRydWUsImlhdCI6MTc2MDEzNDM3M3M.f3BcH4mGgmx0v9SCrZAfmg9uB_pP523AChoW-VfEpIdOncyis1OQWPwfQaIzmDOyclKKSYdeOS6j3znWDjAhWDbX3oJtxahy2sE5UVUjiknyAeN2YoNarK3n97gOHLuS6_Whabm8IuZVR78a0c5cIBlbOHv6M9g9LJZOofxozoOOmtMA5Qr4J3gXrrl5WBH52l6TqkdM3ak79mWYTmjijs4FLndKpqjRGvVaP2GRLJ9hkNRKsh40klIud6LXl7SePt3gTXD1Vtmv8WLqmpHrpiOMOsLfTpryA9OSFFKP0Ju7lLtUdfa_ZukH13ZuOnYVA6v0lOs6_7Ic75elc7YCOQ" // create custom http client with the access token // to be exchanged with the STS server @@ -705,12 +705,12 @@ func TestE2EInvokeSTSIntegration(t *testing.T) { a2aclient.WithHTTPClient(httpClient)) require.NoError(t, err) - t.Run("sync_invocation", func(t *testing.T) { - runSyncTest(t, a2aClient, "add 3 and 5", "8", nil) + t.Run("streaming_invocation", func(t *testing.T) { + runStreamingTest(t, a2aClient, "add 3 and 5", "8") // verify our mock STS server received the token exchange request stsRequests := stsServer.GetRequests() - require.Len(t, stsRequests, 2, "Expected 2 STS token exchange requests") + require.Len(t, stsRequests, 1, "Expected 1 STS token exchange requests") // ensure the subject token is the same as the one we sent // which contains the may act claim diff --git a/helm/kagent-crds/templates/kagent.dev_agents.yaml b/helm/kagent-crds/templates/kagent.dev_agents.yaml index 7b6bbc1ea..530a77aaa 100644 --- a/helm/kagent-crds/templates/kagent.dev_agents.yaml +++ b/helm/kagent-crds/templates/kagent.dev_agents.yaml @@ -2204,7 +2204,7 @@ spec: stream: description: |- Whether to stream the response from the model. - If not specified, the default value is true. + If not specified, the default value is false. type: boolean systemMessage: minLength: 1 @@ -8828,7 +8828,7 @@ spec: stream: description: |- Whether to stream the response from the model. - If not specified, the default value is true. + If not specified, the default value is false. type: boolean systemMessage: description: SystemMessage is a string specifying the system message diff --git a/python/packages/kagent-adk/src/kagent/adk/_a2a.py b/python/packages/kagent-adk/src/kagent/adk/_a2a.py index c9a6c4a7e..7ff6ef37d 100644 --- a/python/packages/kagent-adk/src/kagent/adk/_a2a.py +++ b/python/packages/kagent-adk/src/kagent/adk/_a2a.py @@ -54,12 +54,14 @@ def __init__( kagent_url: str, app_name: str, plugins: List[BasePlugin] = None, + stream: bool = False, ): self.root_agent = root_agent self.kagent_url = kagent_url self.app_name = app_name self.agent_card = agent_card self.plugins = plugins if plugins is not None else [] + self.stream = stream def build(self) -> FastAPI: token_service = KAgentTokenService(self.app_name) @@ -83,6 +85,7 @@ def create_runner() -> Runner: agent_executor = A2aAgentExecutor( runner=create_runner, + stream=self.stream, ) kagent_task_store = KAgentTaskStore(http_client) @@ -122,6 +125,7 @@ def create_runner() -> Runner: agent_executor = A2aAgentExecutor( runner=create_runner, + stream=self.stream, ) task_store = InMemoryTaskStore() diff --git a/python/packages/kagent-adk/src/kagent/adk/_agent_executor.py b/python/packages/kagent-adk/src/kagent/adk/_agent_executor.py index 9c265c72e..abb45d778 100644 --- a/python/packages/kagent-adk/src/kagent/adk/_agent_executor.py +++ b/python/packages/kagent-adk/src/kagent/adk/_agent_executor.py @@ -59,10 +59,12 @@ def __init__( *, runner: Callable[..., Runner | Awaitable[Runner]], config: Optional[A2aAgentExecutorConfig] = None, + stream: bool = False, ): super().__init__() self._runner = runner self._config = config + self._stream = stream async def _resolve_runner(self) -> Runner: """Resolve the runner, handling cases where it's a callable that returns a Runner.""" @@ -110,7 +112,7 @@ async def execute( raise ValueError("A2A request must have a message") # Convert the a2a request to ADK run args - run_args = convert_a2a_request_to_adk_run_args(context) + run_args = convert_a2a_request_to_adk_run_args(context, stream=self._stream) # Prepare span attributes. span_attributes = {} diff --git a/python/packages/kagent-adk/src/kagent/adk/cli.py b/python/packages/kagent-adk/src/kagent/adk/cli.py index a203c3f2e..ad38c45df 100644 --- a/python/packages/kagent-adk/src/kagent/adk/cli.py +++ b/python/packages/kagent-adk/src/kagent/adk/cli.py @@ -44,7 +44,12 @@ def static( plugins = [SkillsPlugin(skills_directory=skills_directory)] kagent_app = KAgentApp( - root_agent, agent_card, app_cfg.url, app_cfg.app_name, plugins=plugins if skills_directory else None + root_agent, + agent_card, + app_cfg.url, + app_cfg.app_name, + plugins=plugins if skills_directory else None, + stream=agent_config.stream, ) server = kagent_app.build() diff --git a/python/packages/kagent-adk/src/kagent/adk/converters/request_converter.py b/python/packages/kagent-adk/src/kagent/adk/converters/request_converter.py index 5007c654e..e2a71291d 100644 --- a/python/packages/kagent-adk/src/kagent/adk/converters/request_converter.py +++ b/python/packages/kagent-adk/src/kagent/adk/converters/request_converter.py @@ -2,6 +2,7 @@ from a2a.server.agent_execution import RequestContext from google.adk.runners import RunConfig +from google.adk.agents.run_config import StreamingMode from google.genai import types as genai_types from .part_converter import convert_a2a_part_to_genai_part @@ -18,10 +19,14 @@ def _get_user_id(request: RequestContext) -> str: def convert_a2a_request_to_adk_run_args( request: RequestContext, + stream: bool = False, ) -> dict[str, Any]: if not request.message: raise ValueError("Request message cannot be None") + # Map bool to StreamingMode enum + streaming_mode = StreamingMode.SSE if stream else StreamingMode.NONE + return { "user_id": _get_user_id(request), "session_id": request.context_id, @@ -29,5 +34,5 @@ def convert_a2a_request_to_adk_run_args( role="user", parts=[convert_a2a_part_to_genai_part(part) for part in request.message.parts], ), - "run_config": RunConfig(), + "run_config": RunConfig(streaming_mode=streaming_mode), } diff --git a/python/packages/kagent-adk/src/kagent/adk/types.py b/python/packages/kagent-adk/src/kagent/adk/types.py index 8370c2f9c..790e6b017 100644 --- a/python/packages/kagent-adk/src/kagent/adk/types.py +++ b/python/packages/kagent-adk/src/kagent/adk/types.py @@ -101,6 +101,7 @@ class AgentConfig(BaseModel): sse_tools: list[SseMcpServerConfig] | None = None # SSE MCP tools remote_agents: list[RemoteAgentConfig] | None = None # remote agents execute_code: bool | None = None + stream: bool = False # Enable streaming responses from the model def to_agent(self, name: str) -> Agent: if name is None or not str(name).strip(): diff --git a/python/packages/kagent-adk/tests/unittests/converters/test_request_converter.py b/python/packages/kagent-adk/tests/unittests/converters/test_request_converter.py new file mode 100644 index 000000000..c321cc4f7 --- /dev/null +++ b/python/packages/kagent-adk/tests/unittests/converters/test_request_converter.py @@ -0,0 +1,48 @@ +from unittest.mock import Mock + +import pytest +from a2a.server.agent_execution import RequestContext +from google.adk.runners import RunConfig +from google.adk.agents.run_config import StreamingMode + +from kagent.adk.converters.request_converter import convert_a2a_request_to_adk_run_args + + +def _create_mock_request_context(context_id="test_session"): + """Create a mock request context for testing.""" + context = Mock(spec=RequestContext) + context.context_id = context_id + context.message = Mock() + context.message.parts = [] # Empty parts for simplicity + context.call_context = Mock() + context.call_context.user = Mock() + context.call_context.user.user_name = "test_user" + return context + + +class TestRequestConverter: + """Test cases for request converter functions.""" + + def test_convert_request_streaming_modes(self): + """Test that the stream parameter correctly maps to StreamingMode.""" + request = _create_mock_request_context() + + # Test case 1: Stream = False (default) + result_default = convert_a2a_request_to_adk_run_args(request, stream=False) + assert isinstance(result_default["run_config"], RunConfig) + assert result_default["run_config"].streaming_mode == StreamingMode.NONE + + # Test case 2: Stream = True + result_stream = convert_a2a_request_to_adk_run_args(request, stream=True) + assert isinstance(result_stream["run_config"], RunConfig) + assert result_stream["run_config"].streaming_mode == StreamingMode.SSE + + def test_convert_request_basic_fields(self): + """Test that basic fields are correctly mapped.""" + request = _create_mock_request_context(context_id="my_session_123") + + result = convert_a2a_request_to_adk_run_args(request) + + assert result["user_id"] == "test_user" + assert result["session_id"] == "my_session_123" + assert result["new_message"].role == "user"