Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/api/v1alpha1/agent_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type AgentSpec struct {
// +optional
ModelConfig string `json:"modelConfig,omitempty"`
// Whether to stream the response from the model.
// If not specified, the default value is true.
// If not specified, the default value is false.
// +optional
Stream *bool `json:"stream,omitempty"`
// +kubebuilder:validation:MaxItems=20
Expand Down
2 changes: 1 addition & 1 deletion go/api/v1alpha2/agent_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type DeclarativeAgentSpec struct {
// +optional
ModelConfig string `json:"modelConfig,omitempty"`
// Whether to stream the response from the model.
// If not specified, the default value is true.
// If not specified, the default value is false.
// +optional
Stream *bool `json:"stream,omitempty"`
// +kubebuilder:validation:MaxItems=20
Expand Down
4 changes: 2 additions & 2 deletions go/config/crd/bases/kagent.dev_agents.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2204,7 +2204,7 @@ spec:
stream:
description: |-
Whether to stream the response from the model.
If not specified, the default value is true.
If not specified, the default value is false.
type: boolean
systemMessage:
minLength: 1
Expand Down Expand Up @@ -8828,7 +8828,7 @@ spec:
stream:
description: |-
Whether to stream the response from the model.
If not specified, the default value is true.
If not specified, the default value is false.
type: boolean
systemMessage:
description: SystemMessage is a string specifying the system message
Expand Down
5 changes: 5 additions & 0 deletions go/internal/adk/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ type AgentConfig struct {
SseTools []SseMcpServerConfig `json:"sse_tools"`
RemoteAgents []RemoteAgentConfig `json:"remote_agents"`
ExecuteCode bool `json:"execute_code,omitempty"`
Stream bool `json:"stream,omitempty"`
}

func (a *AgentConfig) UnmarshalJSON(data []byte) error {
Expand All @@ -263,6 +264,8 @@ func (a *AgentConfig) UnmarshalJSON(data []byte) error {
HttpTools []HttpMcpServerConfig `json:"http_tools"`
SseTools []SseMcpServerConfig `json:"sse_tools"`
RemoteAgents []RemoteAgentConfig `json:"remote_agents"`
ExecuteCode bool `json:"execute_code"`
Stream bool `json:"stream"`
}
if err := json.Unmarshal(data, &tmp); err != nil {
return err
Expand All @@ -277,6 +280,8 @@ func (a *AgentConfig) UnmarshalJSON(data []byte) error {
a.HttpTools = tmp.HttpTools
a.SseTools = tmp.SseTools
a.RemoteAgents = tmp.RemoteAgents
a.ExecuteCode = tmp.ExecuteCode
a.Stream = tmp.Stream
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ func (a *adkApiTranslator) translateInlineAgent(ctx context.Context, agent *v1al
Instruction: systemMessage,
Model: model,
ExecuteCode: ptr.Deref(agent.Spec.Declarative.ExecuteCodeBlocks, false),
Stream: ptr.Deref(agent.Spec.Declarative.Stream, false),
}

for _, tool := range agent.Spec.Declarative.Tools {
Expand Down
1 change: 0 additions & 1 deletion go/pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,6 @@ func Start(getExtensionConfig GetExtensionConfig) {
}
}

//nolint:govet
if webhookCertWatcher != nil {
setupLog.Info("Adding webhook certificate watcher to manager")
if err := mgr.Add(webhookCertWatcher); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions go/test/e2e/invoke_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ func TestE2EInvokeSTSIntegration(t *testing.T) {
agent := setupAgentWithOptions(t, cli, tools, AgentOptions{
Name: "test-sts-agent",
SystemMessage: "You are an agent that adds numbers using the add tool available to you through the everything-mcp-server.",
Stream: &[]bool{true}[0],
Stream: ptr.To(true),
Env: []corev1.EnvVar{
{
Name: "STS_WELL_KNOWN_URI",
Expand All @@ -685,7 +685,7 @@ func TestE2EInvokeSTSIntegration(t *testing.T) {

// access token for test user with the may act claim allowing system:serviceaccount:kagent:test-sts to
// perform operations on behalf of the test user
subjectToken := "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ0ZXN0LXVzZXIiLCJtYXlfYWN0Ijp7InN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDprYWdlbnQ6dGVzdC1zdHMifSwibmFtZSI6IkpvaG4gRG9lIiwiYWRtaW4iOnRydWUsImlhdCI6MTc2MDEzNDM3M30.f3BcH4mGgmx0v9SCrZAfmg9uB_pP523AChoW-VfEpIdOncyis1OQWPwfQaIzmDOyclKKSYdeOS6j3znWDjAhWDbX3oJtxahy2sE5UVUjiknyAeN2YoNarK3n97gOHLuS6_Whabm8IuZVR78a0c5cIBlbOHv6M9g9LJZOofxozoOOmtMA5Qr4J3gXrrl5WBH52l6TqkdM3ak79mWYTmjijs4FLndKpqjRGvVaP2GRLJ9hkNRKsh40klIud6LXl7SePt3gTXD1Vtmv8WLqmpHrpiOMOsLfTpryA9OSFFKP0Ju7lLtUdfa_ZukH13ZuOnYVA6v0lOs6_7Ic75elc7YCOQ"
subjectToken := "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ0ZXN0LXVzZXIiLCJtYXlfYWN0Ijp7InN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDprYWdlbnQ6dGVzdC1zdHMifSwibmFtZSI6IkpvaG4gRG9lIiwiYWRtaW4iOnRydWUsImlhdCI6MTc2MDEzNDM3M3M.f3BcH4mGgmx0v9SCrZAfmg9uB_pP523AChoW-VfEpIdOncyis1OQWPwfQaIzmDOyclKKSYdeOS6j3znWDjAhWDbX3oJtxahy2sE5UVUjiknyAeN2YoNarK3n97gOHLuS6_Whabm8IuZVR78a0c5cIBlbOHv6M9g9LJZOofxozoOOmtMA5Qr4J3gXrrl5WBH52l6TqkdM3ak79mWYTmjijs4FLndKpqjRGvVaP2GRLJ9hkNRKsh40klIud6LXl7SePt3gTXD1Vtmv8WLqmpHrpiOMOsLfTpryA9OSFFKP0Ju7lLtUdfa_ZukH13ZuOnYVA6v0lOs6_7Ic75elc7YCOQ"

// create custom http client with the access token
// to be exchanged with the STS server
Expand All @@ -705,12 +705,12 @@ func TestE2EInvokeSTSIntegration(t *testing.T) {
a2aclient.WithHTTPClient(httpClient))
require.NoError(t, err)

t.Run("sync_invocation", func(t *testing.T) {
runSyncTest(t, a2aClient, "add 3 and 5", "8", nil)
t.Run("streaming_invocation", func(t *testing.T) {
runStreamingTest(t, a2aClient, "add 3 and 5", "8")

// verify our mock STS server received the token exchange request
stsRequests := stsServer.GetRequests()
require.Len(t, stsRequests, 2, "Expected 2 STS token exchange requests")
require.Len(t, stsRequests, 1, "Expected 1 STS token exchange requests")

// ensure the subject token is the same as the one we sent
// which contains the may act claim
Expand Down
4 changes: 2 additions & 2 deletions helm/kagent-crds/templates/kagent.dev_agents.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2204,7 +2204,7 @@ spec:
stream:
description: |-
Whether to stream the response from the model.
If not specified, the default value is true.
If not specified, the default value is false.
type: boolean
systemMessage:
minLength: 1
Expand Down Expand Up @@ -8828,7 +8828,7 @@ spec:
stream:
description: |-
Whether to stream the response from the model.
If not specified, the default value is true.
If not specified, the default value is false.
type: boolean
systemMessage:
description: SystemMessage is a string specifying the system message
Expand Down
4 changes: 4 additions & 0 deletions python/packages/kagent-adk/src/kagent/adk/_a2a.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ def __init__(
kagent_url: str,
app_name: str,
plugins: List[BasePlugin] = None,
stream: bool = False,
):
self.root_agent = root_agent
self.kagent_url = kagent_url
self.app_name = app_name
self.agent_card = agent_card
self.plugins = plugins if plugins is not None else []
self.stream = stream

def build(self) -> FastAPI:
token_service = KAgentTokenService(self.app_name)
Expand All @@ -83,6 +85,7 @@ def create_runner() -> Runner:

agent_executor = A2aAgentExecutor(
runner=create_runner,
stream=self.stream,
)

kagent_task_store = KAgentTaskStore(http_client)
Expand Down Expand Up @@ -122,6 +125,7 @@ def create_runner() -> Runner:

agent_executor = A2aAgentExecutor(
runner=create_runner,
stream=self.stream,
)

task_store = InMemoryTaskStore()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,12 @@ def __init__(
*,
runner: Callable[..., Runner | Awaitable[Runner]],
config: Optional[A2aAgentExecutorConfig] = None,
stream: bool = False,
):
super().__init__()
self._runner = runner
self._config = config
self._stream = stream

async def _resolve_runner(self) -> Runner:
"""Resolve the runner, handling cases where it's a callable that returns a Runner."""
Expand Down Expand Up @@ -110,7 +112,7 @@ async def execute(
raise ValueError("A2A request must have a message")

# Convert the a2a request to ADK run args
run_args = convert_a2a_request_to_adk_run_args(context)
run_args = convert_a2a_request_to_adk_run_args(context, stream=self._stream)

# Prepare span attributes.
span_attributes = {}
Expand Down
7 changes: 6 additions & 1 deletion python/packages/kagent-adk/src/kagent/adk/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ def static(
plugins = [SkillsPlugin(skills_directory=skills_directory)]

kagent_app = KAgentApp(
root_agent, agent_card, app_cfg.url, app_cfg.app_name, plugins=plugins if skills_directory else None
root_agent,
agent_card,
app_cfg.url,
app_cfg.app_name,
plugins=plugins if skills_directory else None,
stream=agent_config.stream,
)

server = kagent_app.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from a2a.server.agent_execution import RequestContext
from google.adk.runners import RunConfig
from google.adk.agents.run_config import StreamingMode
from google.genai import types as genai_types

from .part_converter import convert_a2a_part_to_genai_part
Expand All @@ -18,16 +19,20 @@ def _get_user_id(request: RequestContext) -> str:

def convert_a2a_request_to_adk_run_args(
request: RequestContext,
stream: bool = False,
) -> dict[str, Any]:
if not request.message:
raise ValueError("Request message cannot be None")

# Map bool to StreamingMode enum
streaming_mode = StreamingMode.SSE if stream else StreamingMode.NONE

return {
"user_id": _get_user_id(request),
"session_id": request.context_id,
"new_message": genai_types.Content(
role="user",
parts=[convert_a2a_part_to_genai_part(part) for part in request.message.parts],
),
"run_config": RunConfig(),
"run_config": RunConfig(streaming_mode=streaming_mode),
}
1 change: 1 addition & 0 deletions python/packages/kagent-adk/src/kagent/adk/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class AgentConfig(BaseModel):
sse_tools: list[SseMcpServerConfig] | None = None # SSE MCP tools
remote_agents: list[RemoteAgentConfig] | None = None # remote agents
execute_code: bool | None = None
stream: bool = False # Enable streaming responses from the model

def to_agent(self, name: str) -> Agent:
if name is None or not str(name).strip():
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from unittest.mock import Mock

import pytest
from a2a.server.agent_execution import RequestContext
from google.adk.runners import RunConfig
from google.adk.agents.run_config import StreamingMode

from kagent.adk.converters.request_converter import convert_a2a_request_to_adk_run_args


def _create_mock_request_context(context_id="test_session"):
"""Create a mock request context for testing."""
context = Mock(spec=RequestContext)
context.context_id = context_id
context.message = Mock()
context.message.parts = [] # Empty parts for simplicity
context.call_context = Mock()
context.call_context.user = Mock()
context.call_context.user.user_name = "test_user"
return context


class TestRequestConverter:
"""Test cases for request converter functions."""

def test_convert_request_streaming_modes(self):
"""Test that the stream parameter correctly maps to StreamingMode."""
request = _create_mock_request_context()

# Test case 1: Stream = False (default)
result_default = convert_a2a_request_to_adk_run_args(request, stream=False)
assert isinstance(result_default["run_config"], RunConfig)
assert result_default["run_config"].streaming_mode == StreamingMode.NONE

# Test case 2: Stream = True
result_stream = convert_a2a_request_to_adk_run_args(request, stream=True)
assert isinstance(result_stream["run_config"], RunConfig)
assert result_stream["run_config"].streaming_mode == StreamingMode.SSE

def test_convert_request_basic_fields(self):
"""Test that basic fields are correctly mapped."""
request = _create_mock_request_context(context_id="my_session_123")

result = convert_a2a_request_to_adk_run_args(request)

assert result["user_id"] == "test_user"
assert result["session_id"] == "my_session_123"
assert result["new_message"].role == "user"
Loading