From e157fa5e1879a3d900fcc8762e89a052b1764a96 Mon Sep 17 00:00:00 2001 From: "drizzle.zk" Date: Wed, 24 Dec 2025 13:54:42 +0800 Subject: [PATCH 01/22] add rocketmq-a2a Change-Id: Idec4d097683e5bd691f65ba1a40b5f48ba5312ab --- pom.xml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pom.xml b/pom.xml index 591fd8c1..60ed0ee9 100644 --- a/pom.xml +++ b/pom.xml @@ -293,6 +293,18 @@ 2.0.1 + + org.apache.rocketmq + rocketmq-a2a + 1.0.5 + + + org.jboss.slf4j + slf4j-jboss-logmanager + + + + org.springframework.boot spring-boot-starter-test From bc6fc79329a5f005642461a299f01f45a2e9874e Mon Sep 17 00:00:00 2001 From: "drizzle.zk" Date: Wed, 24 Dec 2025 14:05:46 +0800 Subject: [PATCH 02/22] add rocketmq service Change-Id: I6b6f412a072d585ce1695cc80547ff26c0a4d11d --- web/pom.xml | 5 + .../a2a/controller/A2aController.java | 10 +- .../a2a/controller/A2aRocketMQController.java | 247 ++++++++++++++++++ 3 files changed, 258 insertions(+), 4 deletions(-) create mode 100644 web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java diff --git a/web/pom.xml b/web/pom.xml index 6b527be6..e1c6999c 100644 --- a/web/pom.xml +++ b/web/pom.xml @@ -113,6 +113,11 @@ test + + org.apache.rocketmq + rocketmq-a2a + + diff --git a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aController.java b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aController.java index 45086bb0..ecd83739 100644 --- a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aController.java +++ b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aController.java @@ -76,13 +76,15 @@ public Object handleRequest(@RequestBody String body, HttpServletRequest httpReq return result; } - private boolean isStreamingRequest(String requestBody, ServerCallContext context) { + protected boolean isStreamingRequest(String requestBody, ServerCallContext context) { try { JsonNode node = Utils.OBJECT_MAPPER.readTree(requestBody); JsonNode method = node != null ? node.get("method") : null; String methodName = method != null ? method.asText() : null; if (methodName != null) { - context.getState().put(ContextKeys.METHOD_NAME_KEY, methodName); + if (null != context && null != context.getState()) { + context.getState().put(ContextKeys.METHOD_NAME_KEY, methodName); + } return SendStreamingMessageRequest.METHOD.equals(methodName) || TaskResubscriptionRequest.METHOD.equals(methodName); } return false; @@ -91,7 +93,7 @@ private boolean isStreamingRequest(String requestBody, ServerCallContext context } } - private Flux> handleStreamRequest(String body, ServerCallContext context) + protected Flux> handleStreamRequest(String body, ServerCallContext context) throws JsonProcessingException { StreamingJSONRPCRequest request = Utils.OBJECT_MAPPER.readValue(body, StreamingJSONRPCRequest.class); Flow.Publisher> publisher; @@ -131,7 +133,7 @@ private ServerSentEvent createErrorSSE(JSONRPCResponse errorResponse) } } - private JSONRPCResponse handleNonStreamRequest(String body, ServerCallContext context) + protected JSONRPCResponse handleNonStreamRequest(String body, ServerCallContext context) throws JsonProcessingException { NonStreamingJSONRPCRequest request = Utils.OBJECT_MAPPER.readValue(body, NonStreamingJSONRPCRequest.class); if (request instanceof GetTaskRequest req) { diff --git a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java new file mode 100644 index 00000000..56347b82 --- /dev/null +++ b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java @@ -0,0 +1,247 @@ +package io.agentscope.runtime.protocol.a2a.controller; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.logging.Logger; +import com.alibaba.fastjson.JSON; +import com.fasterxml.jackson.core.JsonProcessingException; +import io.a2a.spec.AgentCard; +import io.a2a.spec.JSONParseError; +import io.a2a.spec.JSONRPCErrorResponse; +import io.a2a.spec.JSONRPCResponse; +import io.agentscope.runtime.engine.Runner; +import io.agentscope.runtime.protocol.ProtocolConfig; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.RoutingContext; +import jakarta.annotation.PostConstruct; +import org.apache.rocketmq.a2a.common.RocketMQRequest; +import org.apache.rocketmq.a2a.common.RocketMQResponse; +import org.apache.rocketmq.client.apis.ClientConfiguration; +import org.apache.rocketmq.client.apis.ClientException; +import org.apache.rocketmq.client.apis.ClientServiceProvider; +import org.apache.rocketmq.client.apis.SessionCredentialsProvider; +import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.consumer.FilterExpression; +import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; +import org.apache.rocketmq.client.apis.consumer.PushConsumer; +import org.apache.rocketmq.client.apis.message.Message; +import org.apache.rocketmq.client.apis.producer.Producer; +import org.apache.rocketmq.client.apis.producer.ProducerBuilder; +import org.apache.rocketmq.client.apis.producer.SendReceipt; +import org.apache.rocketmq.shaded.commons.lang3.StringUtils; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.http.codec.ServerSentEvent; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.RequestMapping; +import reactor.core.publisher.Flux; +import static io.a2a.util.Utils.OBJECT_MAPPER; + +@Controller +@RequestMapping("/a2a-rocketmq") +public class A2aRocketMQController extends A2aController { + private static Logger logger = Logger.getLogger(A2aRocketMQController.class.getName()); + private static final String ROCKETMQ_ENDPOINT = System.getProperty("rocketMQEndpoint", ""); + private static final String ROCKETMQ_NAMESPACE = System.getProperty("rocketMQNamespace", ""); + private static final String BIZ_TOPIC = System.getProperty("bizTopic", ""); + private static final String BIZ_CONSUMER_GROUP = System.getProperty("bizConsumerGroup", ""); + private static final String ACCESS_KEY = System.getProperty("rocketMQAK", ""); + private static final String SECRET_KEY = System.getProperty("rocketMQSK", ""); + + private Producer producer; + private PushConsumer pushConsumer; + private FluxSseSupport fluxSseSupport; + Executor executor = Executors.newFixedThreadPool(6); + + public A2aRocketMQController(Runner runner, AgentCard agentCard, ObjectProvider protocolConfigs) { + super(runner, agentCard, protocolConfigs); + } + + @PostConstruct + public void init() { + try { + if (!checkConfigParam()) { + logger.info("checkConfigParam rocketmq config param is not ready, ignore start rocketmq server!!!"); + return; + } + producer = buildProducer(); + fluxSseSupport = new FluxSseSupport(producer); + pushConsumer = buildConsumer(); + } catch (Exception e) { + logger.info("A2aRocketMQController init error, please check the rocketmq config, e: " + e.getMessage()); + } + } + + private static class FluxSseSupport { + private final Producer producer; + + private FluxSseSupport(Producer producer) { + this.producer = producer; + } + + public void subscribeObjectRocketmq(Flux multi, RoutingContext rc, String WorkAgentResponseTopic, String liteTopic, String msgId, CompletableFuture completableFuture) { + AtomicLong count = new AtomicLong(); + Flux map = multi.map(new Function() { + @Override + public Buffer apply(Object o) { + if (o instanceof ServerSentEvent ev) { + String id = !StringUtils.isEmpty(ev.id()) ? ev.id() : String.valueOf(count.getAndIncrement()); + return Buffer.buffer("data: " + ev.data() + "\nid: " + id + "\n\n"); + } + return Buffer.buffer("data: " + toJsonString(o) + "\nid: " + count.getAndIncrement() + "\n\n"); + } + }); + writeRocketmq(map, rc, WorkAgentResponseTopic, liteTopic, msgId, completableFuture); + } + + public void writeRocketmq(Flux flux, RoutingContext rc, String WorkAgentResponseTopic, String liteTopic, String msgId, CompletableFuture completableFuture) { + flux.subscribe( + event -> { + // onNext: 接收到事件 + try { + SendReceipt send = producer.send(buildMessage(WorkAgentResponseTopic, liteTopic, new RocketMQResponse(liteTopic, null, event.toString(), msgId, true, false))); + logger.info("rocketmq send stream success: " + send.getMessageId() + " time " + System.currentTimeMillis()); + } catch (ClientException error) { + logger.info("rocketmq send stream error: " + error.getMessage()); + } + }, + error -> { + logger.info("send stream error: " + error.getMessage()); + completableFuture.complete(false); + }, + () -> { + logger.info("send stream completed."); + try { + SendReceipt send = producer.send(buildMessage(WorkAgentResponseTopic, liteTopic, new RocketMQResponse(liteTopic, null, null, msgId, true, true))); + logger.info("rocketmq send stream success: " + send.getMessageId() + " time " + System.currentTimeMillis()); + } catch (ClientException e) { + logger.info("rocketmq send stream error: " + e.getMessage() ); + } + completableFuture.complete(true); + } + ); + } + } + + private Producer buildProducer() throws ClientException { + final ClientServiceProvider provider = ClientServiceProvider.loadService(); + SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY); + ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() + .setEndpoints(ROCKETMQ_ENDPOINT) + .setNamespace(ROCKETMQ_NAMESPACE) + .setCredentialProvider(sessionCredentialsProvider) + .setRequestTimeout(Duration.ofSeconds(15)) + .build(); + final ProducerBuilder builder = provider.newProducerBuilder().setClientConfiguration(clientConfiguration); + return builder.build(); + } + + private PushConsumer buildConsumer() throws ClientException { + final ClientServiceProvider provider = ClientServiceProvider.loadService(); + SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY); + ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() + .setEndpoints(ROCKETMQ_ENDPOINT) + .setNamespace(ROCKETMQ_NAMESPACE) + .setCredentialProvider(sessionCredentialsProvider) + .build(); + String tag = "*"; + FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); + return provider.newPushConsumerBuilder() + .setClientConfiguration(clientConfiguration) + .setConsumerGroup(BIZ_CONSUMER_GROUP) + .setSubscriptionExpressions(Collections.singletonMap(BIZ_TOPIC, filterExpression)) + .setMessageListener(messageView -> { + try { + byte[] result = new byte[messageView.getBody().remaining()]; + messageView.getBody().get(result); + RocketMQRequest request = JSON.parseObject(new String(result, StandardCharsets.UTF_8), RocketMQRequest.class); + String body = request.getRequestBody(); + JSONRPCResponse nonStreamingResponse = null; + JSONRPCErrorResponse error = null; + + boolean streaming = isStreamingRequest(body, null); + try { + if (streaming) { + Flux> serverSentEventFlux = handleStreamRequest(body, null); + CompletableFuture completableFuture = new CompletableFuture(); + executor.execute(() -> { + fluxSseSupport.subscribeObjectRocketmq(serverSentEventFlux.map(i -> (Object)i), null, request.getWorkAgentResponseTopic(), request.getLiteTopic(), messageView.getMessageId().toString(), completableFuture); + }); + Boolean streamResult = completableFuture.get(15, TimeUnit.MINUTES); + if (null != streamResult && streamResult) { + return ConsumeResult.SUCCESS; + } else { + return ConsumeResult.FAILURE; + } + } else { + nonStreamingResponse = handleNonStreamRequest(body, null); + } + } catch (JsonProcessingException e) { + logger.info("JSON parsing error: " + e.getMessage()); + error = new JSONRPCErrorResponse(null, new JSONParseError()); + } finally { + if (!streaming) { + RocketMQResponse response = null; + if (error != null) { + response = new RocketMQResponse(request.getLiteTopic(), null, JSON.toJSONString(error), messageView.getMessageId().toString(), false, true); + } else { + response = new RocketMQResponse(request.getLiteTopic(), null, toJsonString(nonStreamingResponse), messageView.getMessageId().toString(), false, true); + } + SendReceipt send = this.producer.send(buildMessage(request.getWorkAgentResponseTopic(), request.getLiteTopic(), response)); + logger.info("send response success:" + send.getMessageId() + ", time: " + System.currentTimeMillis() ); + } + } + } catch (Exception e) { + logger.info("error " + e.getMessage()); + return ConsumeResult.FAILURE; + } + return ConsumeResult.SUCCESS; + }).build(); + } + + private static Message buildMessage(String topic, String liteTopic, RocketMQResponse response) { + if (StringUtils.isEmpty(topic) || StringUtils.isEmpty(liteTopic)) { + logger.info("buildMessage param error topic: " + topic + ", liteTopic: " + liteTopic + ", response: " + response); + return null; + } + String missionJsonStr = JSON.toJSONString(response); + final ClientServiceProvider provider = ClientServiceProvider.loadService(); + final Message message = provider.newMessageBuilder() + .setTopic(topic) + .setBody(missionJsonStr.getBytes(StandardCharsets.UTF_8)) + .setLiteTopic(liteTopic) + .build(); + return message; + } + + private static String toJsonString(Object o) { + try { + return OBJECT_MAPPER.writeValueAsString(o); + } catch (JsonProcessingException ex) { + throw new RuntimeException(ex); + } + } + + private static boolean checkConfigParam() { + if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT) || StringUtils.isEmpty(BIZ_TOPIC) || StringUtils.isEmpty(BIZ_CONSUMER_GROUP)) { + if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT)) { + logger.info("rocketMQEndpoint is empty"); + } + if (StringUtils.isEmpty(BIZ_TOPIC)) { + logger.info("bizTopic is empty"); + } + if (StringUtils.isEmpty(BIZ_CONSUMER_GROUP)) { + logger.info("bizConsumerGroup is empty"); + } + return false; + } + return true; + } +} From 9aa5786a6bdfed729ab55809390352f2488a888e Mon Sep 17 00:00:00 2001 From: "drizzle.zk" Date: Wed, 24 Dec 2025 14:13:55 +0800 Subject: [PATCH 03/22] optimize the code Change-Id: Ib27fec64f35f3a196ac58689b65e0af63cf9e4c8 --- .../runtime/protocol/a2a/RocketMQUtils.java | 63 ++++++++ .../a2a/controller/A2aRocketMQController.java | 144 +++++++----------- 2 files changed, 116 insertions(+), 91 deletions(-) create mode 100644 web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java diff --git a/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java b/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java new file mode 100644 index 00000000..8a431797 --- /dev/null +++ b/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java @@ -0,0 +1,63 @@ +package io.agentscope.runtime.protocol.a2a; + +import java.nio.charset.StandardCharsets; +import java.util.logging.Logger; +import com.alibaba.fastjson.JSON; +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.rocketmq.a2a.common.RocketMQResponse; +import org.apache.rocketmq.client.apis.ClientServiceProvider; +import org.apache.rocketmq.client.apis.message.Message; +import org.apache.rocketmq.shaded.commons.lang3.StringUtils; + +import static io.a2a.util.Utils.OBJECT_MAPPER; + +public class RocketMQUtils { + private static Logger logger = Logger.getLogger(RocketMQUtils.class.getName()); + + public static final String ROCKETMQ_ENDPOINT = System.getProperty("rocketMQEndpoint", ""); + public static final String ROCKETMQ_NAMESPACE = System.getProperty("rocketMQNamespace", ""); + public static final String BIZ_TOPIC = System.getProperty("bizTopic", ""); + public static final String BIZ_CONSUMER_GROUP = System.getProperty("bizConsumerGroup", ""); + public static final String ACCESS_KEY = System.getProperty("rocketMQAK", ""); + public static final String SECRET_KEY = System.getProperty("rocketMQSK", ""); + + public static Message buildMessage(String topic, String liteTopic, RocketMQResponse response) { + if (StringUtils.isEmpty(topic) || StringUtils.isEmpty(liteTopic)) { + logger.info("buildMessage param error topic: " + topic + ", liteTopic: " + liteTopic + ", response: " + response); + return null; + } + String missionJsonStr = JSON.toJSONString(response); + final ClientServiceProvider provider = ClientServiceProvider.loadService(); + final Message message = provider.newMessageBuilder() + .setTopic(topic) + .setBody(missionJsonStr.getBytes(StandardCharsets.UTF_8)) + .setLiteTopic(liteTopic) + .build(); + return message; + } + + public static String toJsonString(Object o) { + try { + return OBJECT_MAPPER.writeValueAsString(o); + } catch (JsonProcessingException ex) { + throw new RuntimeException(ex); + } + } + + public static boolean checkConfigParam() { + if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT) || StringUtils.isEmpty(BIZ_TOPIC) || StringUtils.isEmpty(BIZ_CONSUMER_GROUP)) { + if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT)) { + logger.info("rocketMQEndpoint is empty"); + } + if (StringUtils.isEmpty(BIZ_TOPIC)) { + logger.info("bizTopic is empty"); + } + if (StringUtils.isEmpty(BIZ_CONSUMER_GROUP)) { + logger.info("bizConsumerGroup is empty"); + } + return false; + } + return true; + } + +} diff --git a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java index 56347b82..82fbeb33 100644 --- a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java +++ b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java @@ -32,7 +32,6 @@ import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; -import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.ProducerBuilder; import org.apache.rocketmq.client.apis.producer.SendReceipt; @@ -42,18 +41,20 @@ import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import reactor.core.publisher.Flux; -import static io.a2a.util.Utils.OBJECT_MAPPER; +import static io.agentscope.runtime.protocol.a2a.RocketMQUtils.ACCESS_KEY; +import static io.agentscope.runtime.protocol.a2a.RocketMQUtils.BIZ_CONSUMER_GROUP; +import static io.agentscope.runtime.protocol.a2a.RocketMQUtils.BIZ_TOPIC; +import static io.agentscope.runtime.protocol.a2a.RocketMQUtils.ROCKETMQ_ENDPOINT; +import static io.agentscope.runtime.protocol.a2a.RocketMQUtils.ROCKETMQ_NAMESPACE; +import static io.agentscope.runtime.protocol.a2a.RocketMQUtils.SECRET_KEY; +import static io.agentscope.runtime.protocol.a2a.RocketMQUtils.buildMessage; +import static io.agentscope.runtime.protocol.a2a.RocketMQUtils.checkConfigParam; +import static io.agentscope.runtime.protocol.a2a.RocketMQUtils.toJsonString; @Controller @RequestMapping("/a2a-rocketmq") public class A2aRocketMQController extends A2aController { private static Logger logger = Logger.getLogger(A2aRocketMQController.class.getName()); - private static final String ROCKETMQ_ENDPOINT = System.getProperty("rocketMQEndpoint", ""); - private static final String ROCKETMQ_NAMESPACE = System.getProperty("rocketMQNamespace", ""); - private static final String BIZ_TOPIC = System.getProperty("bizTopic", ""); - private static final String BIZ_CONSUMER_GROUP = System.getProperty("bizConsumerGroup", ""); - private static final String ACCESS_KEY = System.getProperty("rocketMQAK", ""); - private static final String SECRET_KEY = System.getProperty("rocketMQSK", ""); private Producer producer; private PushConsumer pushConsumer; @@ -79,57 +80,6 @@ public void init() { } } - private static class FluxSseSupport { - private final Producer producer; - - private FluxSseSupport(Producer producer) { - this.producer = producer; - } - - public void subscribeObjectRocketmq(Flux multi, RoutingContext rc, String WorkAgentResponseTopic, String liteTopic, String msgId, CompletableFuture completableFuture) { - AtomicLong count = new AtomicLong(); - Flux map = multi.map(new Function() { - @Override - public Buffer apply(Object o) { - if (o instanceof ServerSentEvent ev) { - String id = !StringUtils.isEmpty(ev.id()) ? ev.id() : String.valueOf(count.getAndIncrement()); - return Buffer.buffer("data: " + ev.data() + "\nid: " + id + "\n\n"); - } - return Buffer.buffer("data: " + toJsonString(o) + "\nid: " + count.getAndIncrement() + "\n\n"); - } - }); - writeRocketmq(map, rc, WorkAgentResponseTopic, liteTopic, msgId, completableFuture); - } - - public void writeRocketmq(Flux flux, RoutingContext rc, String WorkAgentResponseTopic, String liteTopic, String msgId, CompletableFuture completableFuture) { - flux.subscribe( - event -> { - // onNext: 接收到事件 - try { - SendReceipt send = producer.send(buildMessage(WorkAgentResponseTopic, liteTopic, new RocketMQResponse(liteTopic, null, event.toString(), msgId, true, false))); - logger.info("rocketmq send stream success: " + send.getMessageId() + " time " + System.currentTimeMillis()); - } catch (ClientException error) { - logger.info("rocketmq send stream error: " + error.getMessage()); - } - }, - error -> { - logger.info("send stream error: " + error.getMessage()); - completableFuture.complete(false); - }, - () -> { - logger.info("send stream completed."); - try { - SendReceipt send = producer.send(buildMessage(WorkAgentResponseTopic, liteTopic, new RocketMQResponse(liteTopic, null, null, msgId, true, true))); - logger.info("rocketmq send stream success: " + send.getMessageId() + " time " + System.currentTimeMillis()); - } catch (ClientException e) { - logger.info("rocketmq send stream error: " + e.getMessage() ); - } - completableFuture.complete(true); - } - ); - } - } - private Producer buildProducer() throws ClientException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY); @@ -206,42 +156,54 @@ private PushConsumer buildConsumer() throws ClientException { }).build(); } - private static Message buildMessage(String topic, String liteTopic, RocketMQResponse response) { - if (StringUtils.isEmpty(topic) || StringUtils.isEmpty(liteTopic)) { - logger.info("buildMessage param error topic: " + topic + ", liteTopic: " + liteTopic + ", response: " + response); - return null; + private static class FluxSseSupport { + private final Producer producer; + + private FluxSseSupport(Producer producer) { + this.producer = producer; } - String missionJsonStr = JSON.toJSONString(response); - final ClientServiceProvider provider = ClientServiceProvider.loadService(); - final Message message = provider.newMessageBuilder() - .setTopic(topic) - .setBody(missionJsonStr.getBytes(StandardCharsets.UTF_8)) - .setLiteTopic(liteTopic) - .build(); - return message; - } - private static String toJsonString(Object o) { - try { - return OBJECT_MAPPER.writeValueAsString(o); - } catch (JsonProcessingException ex) { - throw new RuntimeException(ex); + public void subscribeObjectRocketmq(Flux multi, RoutingContext rc, String WorkAgentResponseTopic, String liteTopic, String msgId, CompletableFuture completableFuture) { + AtomicLong count = new AtomicLong(); + Flux map = multi.map(new Function() { + @Override + public Buffer apply(Object o) { + if (o instanceof ServerSentEvent ev) { + String id = !StringUtils.isEmpty(ev.id()) ? ev.id() : String.valueOf(count.getAndIncrement()); + return Buffer.buffer("data: " + ev.data() + "\nid: " + id + "\n\n"); + } + return Buffer.buffer("data: " + toJsonString(o) + "\nid: " + count.getAndIncrement() + "\n\n"); + } + }); + writeRocketmq(map, rc, WorkAgentResponseTopic, liteTopic, msgId, completableFuture); } - } - private static boolean checkConfigParam() { - if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT) || StringUtils.isEmpty(BIZ_TOPIC) || StringUtils.isEmpty(BIZ_CONSUMER_GROUP)) { - if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT)) { - logger.info("rocketMQEndpoint is empty"); - } - if (StringUtils.isEmpty(BIZ_TOPIC)) { - logger.info("bizTopic is empty"); - } - if (StringUtils.isEmpty(BIZ_CONSUMER_GROUP)) { - logger.info("bizConsumerGroup is empty"); - } - return false; + public void writeRocketmq(Flux flux, RoutingContext rc, String WorkAgentResponseTopic, String liteTopic, String msgId, CompletableFuture completableFuture) { + flux.subscribe( + event -> { + try { + SendReceipt send = producer.send(buildMessage(WorkAgentResponseTopic, liteTopic, new RocketMQResponse(liteTopic, null, event.toString(), msgId, true, false))); + logger.info("rocketmq send stream success: " + send.getMessageId() + " time " + System.currentTimeMillis()); + } catch (ClientException error) { + logger.info("rocketmq send stream error: " + error.getMessage()); + } + }, + error -> { + logger.info("send stream error: " + error.getMessage()); + completableFuture.complete(false); + }, + () -> { + logger.info("send stream completed."); + try { + SendReceipt send = producer.send(buildMessage(WorkAgentResponseTopic, liteTopic, new RocketMQResponse(liteTopic, null, null, msgId, true, true))); + logger.info("rocketmq send stream success: " + send.getMessageId() + " time " + System.currentTimeMillis()); + } catch (ClientException e) { + logger.info("rocketmq send stream error: " + e.getMessage() ); + } + completableFuture.complete(true); + } + ); } - return true; } + } From 91bf9337a8cfd858e1406bf0566d6e4cd015953d Mon Sep 17 00:00:00 2001 From: "drizzle.zk" Date: Wed, 24 Dec 2025 14:20:27 +0800 Subject: [PATCH 04/22] optimize the code Change-Id: I33f5c050d19c318ff251ca80548e764c0270d589 --- .../runtime/protocol/a2a/RocketMQUtils.java | 43 +++++- .../a2a/controller/A2aRocketMQController.java | 141 +++++++----------- 2 files changed, 93 insertions(+), 91 deletions(-) diff --git a/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java b/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java index 8a431797..97831637 100644 --- a/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java +++ b/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java @@ -1,19 +1,30 @@ package io.agentscope.runtime.protocol.a2a; import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Collections; import java.util.logging.Logger; import com.alibaba.fastjson.JSON; import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.rocketmq.a2a.common.RocketMQResponse; +import org.apache.rocketmq.client.apis.ClientConfiguration; +import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; +import org.apache.rocketmq.client.apis.SessionCredentialsProvider; +import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; +import org.apache.rocketmq.client.apis.consumer.FilterExpression; +import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; +import org.apache.rocketmq.client.apis.consumer.MessageListener; +import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.apache.rocketmq.client.apis.message.Message; +import org.apache.rocketmq.client.apis.producer.Producer; +import org.apache.rocketmq.client.apis.producer.ProducerBuilder; import org.apache.rocketmq.shaded.commons.lang3.StringUtils; import static io.a2a.util.Utils.OBJECT_MAPPER; public class RocketMQUtils { private static Logger logger = Logger.getLogger(RocketMQUtils.class.getName()); - public static final String ROCKETMQ_ENDPOINT = System.getProperty("rocketMQEndpoint", ""); public static final String ROCKETMQ_NAMESPACE = System.getProperty("rocketMQNamespace", ""); public static final String BIZ_TOPIC = System.getProperty("bizTopic", ""); @@ -21,6 +32,36 @@ public class RocketMQUtils { public static final String ACCESS_KEY = System.getProperty("rocketMQAK", ""); public static final String SECRET_KEY = System.getProperty("rocketMQSK", ""); + public static Producer buildProducer() throws ClientException { + final ClientServiceProvider provider = ClientServiceProvider.loadService(); + SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY); + ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() + .setEndpoints(ROCKETMQ_ENDPOINT) + .setNamespace(ROCKETMQ_NAMESPACE) + .setCredentialProvider(sessionCredentialsProvider) + .setRequestTimeout(Duration.ofSeconds(15)) + .build(); + final ProducerBuilder builder = provider.newProducerBuilder().setClientConfiguration(clientConfiguration); + return builder.build(); + } + + public static PushConsumer buildConsumer(MessageListener messageListener) throws ClientException { + final ClientServiceProvider provider = ClientServiceProvider.loadService(); + SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY); + ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() + .setEndpoints(ROCKETMQ_ENDPOINT) + .setNamespace(ROCKETMQ_NAMESPACE) + .setCredentialProvider(sessionCredentialsProvider) + .build(); + String tag = "*"; + FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); + return provider.newPushConsumerBuilder() + .setClientConfiguration(clientConfiguration) + .setConsumerGroup(BIZ_CONSUMER_GROUP) + .setSubscriptionExpressions(Collections.singletonMap(BIZ_TOPIC, filterExpression)) + .setMessageListener(messageListener).build(); + } + public static Message buildMessage(String topic, String liteTopic, RocketMQResponse response) { if (StringUtils.isEmpty(topic) || StringUtils.isEmpty(liteTopic)) { logger.info("buildMessage param error topic: " + topic + ", liteTopic: " + liteTopic + ", response: " + response); diff --git a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java index 82fbeb33..267c47cb 100644 --- a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java +++ b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java @@ -1,8 +1,6 @@ package io.agentscope.runtime.protocol.a2a.controller; import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -23,17 +21,11 @@ import jakarta.annotation.PostConstruct; import org.apache.rocketmq.a2a.common.RocketMQRequest; import org.apache.rocketmq.a2a.common.RocketMQResponse; -import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; -import org.apache.rocketmq.client.apis.ClientServiceProvider; -import org.apache.rocketmq.client.apis.SessionCredentialsProvider; -import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; -import org.apache.rocketmq.client.apis.consumer.FilterExpression; -import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; +import org.apache.rocketmq.client.apis.consumer.MessageListener; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.apache.rocketmq.client.apis.producer.Producer; -import org.apache.rocketmq.client.apis.producer.ProducerBuilder; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.apache.rocketmq.shaded.commons.lang3.StringUtils; import org.springframework.beans.factory.ObjectProvider; @@ -41,13 +33,9 @@ import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import reactor.core.publisher.Flux; -import static io.agentscope.runtime.protocol.a2a.RocketMQUtils.ACCESS_KEY; -import static io.agentscope.runtime.protocol.a2a.RocketMQUtils.BIZ_CONSUMER_GROUP; -import static io.agentscope.runtime.protocol.a2a.RocketMQUtils.BIZ_TOPIC; -import static io.agentscope.runtime.protocol.a2a.RocketMQUtils.ROCKETMQ_ENDPOINT; -import static io.agentscope.runtime.protocol.a2a.RocketMQUtils.ROCKETMQ_NAMESPACE; -import static io.agentscope.runtime.protocol.a2a.RocketMQUtils.SECRET_KEY; +import static io.agentscope.runtime.protocol.a2a.RocketMQUtils.buildConsumer; import static io.agentscope.runtime.protocol.a2a.RocketMQUtils.buildMessage; +import static io.agentscope.runtime.protocol.a2a.RocketMQUtils.buildProducer; import static io.agentscope.runtime.protocol.a2a.RocketMQUtils.checkConfigParam; import static io.agentscope.runtime.protocol.a2a.RocketMQUtils.toJsonString; @@ -55,7 +43,6 @@ @RequestMapping("/a2a-rocketmq") public class A2aRocketMQController extends A2aController { private static Logger logger = Logger.getLogger(A2aRocketMQController.class.getName()); - private Producer producer; private PushConsumer pushConsumer; private FluxSseSupport fluxSseSupport; @@ -74,86 +61,60 @@ public void init() { } producer = buildProducer(); fluxSseSupport = new FluxSseSupport(producer); - pushConsumer = buildConsumer(); + pushConsumer = buildConsumer(buildMessageListener()); } catch (Exception e) { logger.info("A2aRocketMQController init error, please check the rocketmq config, e: " + e.getMessage()); } } - private Producer buildProducer() throws ClientException { - final ClientServiceProvider provider = ClientServiceProvider.loadService(); - SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY); - ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() - .setEndpoints(ROCKETMQ_ENDPOINT) - .setNamespace(ROCKETMQ_NAMESPACE) - .setCredentialProvider(sessionCredentialsProvider) - .setRequestTimeout(Duration.ofSeconds(15)) - .build(); - final ProducerBuilder builder = provider.newProducerBuilder().setClientConfiguration(clientConfiguration); - return builder.build(); - } - - private PushConsumer buildConsumer() throws ClientException { - final ClientServiceProvider provider = ClientServiceProvider.loadService(); - SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY); - ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() - .setEndpoints(ROCKETMQ_ENDPOINT) - .setNamespace(ROCKETMQ_NAMESPACE) - .setCredentialProvider(sessionCredentialsProvider) - .build(); - String tag = "*"; - FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); - return provider.newPushConsumerBuilder() - .setClientConfiguration(clientConfiguration) - .setConsumerGroup(BIZ_CONSUMER_GROUP) - .setSubscriptionExpressions(Collections.singletonMap(BIZ_TOPIC, filterExpression)) - .setMessageListener(messageView -> { - try { - byte[] result = new byte[messageView.getBody().remaining()]; - messageView.getBody().get(result); - RocketMQRequest request = JSON.parseObject(new String(result, StandardCharsets.UTF_8), RocketMQRequest.class); - String body = request.getRequestBody(); - JSONRPCResponse nonStreamingResponse = null; - JSONRPCErrorResponse error = null; + private MessageListener buildMessageListener() { + return messageView -> { + try { + byte[] result = new byte[messageView.getBody().remaining()]; + messageView.getBody().get(result); + RocketMQRequest request = JSON.parseObject(new String(result, StandardCharsets.UTF_8), RocketMQRequest.class); + String body = request.getRequestBody(); + JSONRPCResponse nonStreamingResponse = null; + JSONRPCErrorResponse error = null; - boolean streaming = isStreamingRequest(body, null); - try { - if (streaming) { - Flux> serverSentEventFlux = handleStreamRequest(body, null); - CompletableFuture completableFuture = new CompletableFuture(); - executor.execute(() -> { - fluxSseSupport.subscribeObjectRocketmq(serverSentEventFlux.map(i -> (Object)i), null, request.getWorkAgentResponseTopic(), request.getLiteTopic(), messageView.getMessageId().toString(), completableFuture); - }); - Boolean streamResult = completableFuture.get(15, TimeUnit.MINUTES); - if (null != streamResult && streamResult) { - return ConsumeResult.SUCCESS; - } else { - return ConsumeResult.FAILURE; - } - } else { - nonStreamingResponse = handleNonStreamRequest(body, null); - } - } catch (JsonProcessingException e) { - logger.info("JSON parsing error: " + e.getMessage()); - error = new JSONRPCErrorResponse(null, new JSONParseError()); - } finally { - if (!streaming) { - RocketMQResponse response = null; - if (error != null) { - response = new RocketMQResponse(request.getLiteTopic(), null, JSON.toJSONString(error), messageView.getMessageId().toString(), false, true); - } else { - response = new RocketMQResponse(request.getLiteTopic(), null, toJsonString(nonStreamingResponse), messageView.getMessageId().toString(), false, true); - } - SendReceipt send = this.producer.send(buildMessage(request.getWorkAgentResponseTopic(), request.getLiteTopic(), response)); - logger.info("send response success:" + send.getMessageId() + ", time: " + System.currentTimeMillis() ); - } - } - } catch (Exception e) { - logger.info("error " + e.getMessage()); - return ConsumeResult.FAILURE; - } - return ConsumeResult.SUCCESS; - }).build(); + boolean streaming = isStreamingRequest(body, null); + try { + if (streaming) { + Flux> serverSentEventFlux = handleStreamRequest(body, null); + CompletableFuture completableFuture = new CompletableFuture(); + executor.execute(() -> { + fluxSseSupport.subscribeObjectRocketmq(serverSentEventFlux.map(i -> (Object)i), null, request.getWorkAgentResponseTopic(), request.getLiteTopic(), messageView.getMessageId().toString(), completableFuture); + }); + Boolean streamResult = completableFuture.get(15, TimeUnit.MINUTES); + if (null != streamResult && streamResult) { + return ConsumeResult.SUCCESS; + } else { + return ConsumeResult.FAILURE; + } + } else { + nonStreamingResponse = handleNonStreamRequest(body, null); + } + } catch (JsonProcessingException e) { + logger.info("JSON parsing error: " + e.getMessage()); + error = new JSONRPCErrorResponse(null, new JSONParseError()); + } finally { + if (!streaming) { + RocketMQResponse response = null; + if (error != null) { + response = new RocketMQResponse(request.getLiteTopic(), null, JSON.toJSONString(error), messageView.getMessageId().toString(), false, true); + } else { + response = new RocketMQResponse(request.getLiteTopic(), null, toJsonString(nonStreamingResponse), messageView.getMessageId().toString(), false, true); + } + SendReceipt send = producer.send(buildMessage(request.getWorkAgentResponseTopic(), request.getLiteTopic(), response)); + logger.info("send response success:" + send.getMessageId() + ", time: " + System.currentTimeMillis() ); + } + } + } catch (Exception e) { + logger.info("error " + e.getMessage()); + return ConsumeResult.FAILURE; + } + return ConsumeResult.SUCCESS; + }; } private static class FluxSseSupport { From ca1044f870757b48f3afffd4afbd188adb76e83e Mon Sep 17 00:00:00 2001 From: "drizzle.zk" Date: Wed, 24 Dec 2025 14:43:54 +0800 Subject: [PATCH 05/22] optimize the code Change-Id: Iba3174e66f27d09209e3f66aff6f7467457ad271 --- .../runtime/protocol/a2a/RocketMQUtils.java | 8 +++- .../a2a/controller/A2aRocketMQController.java | 44 ++++++------------- 2 files changed, 21 insertions(+), 31 deletions(-) diff --git a/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java b/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java index 97831637..1b2090fe 100644 --- a/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java +++ b/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java @@ -46,6 +46,9 @@ public static Producer buildProducer() throws ClientException { } public static PushConsumer buildConsumer(MessageListener messageListener) throws ClientException { + if (null == messageListener) { + throw new RuntimeException("buildConsumer messageListener is null"); + } final ClientServiceProvider provider = ClientServiceProvider.loadService(); SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() @@ -63,7 +66,7 @@ public static PushConsumer buildConsumer(MessageListener messageListener) throws } public static Message buildMessage(String topic, String liteTopic, RocketMQResponse response) { - if (StringUtils.isEmpty(topic) || StringUtils.isEmpty(liteTopic)) { + if (StringUtils.isEmpty(topic) || StringUtils.isEmpty(liteTopic) || null == response) { logger.info("buildMessage param error topic: " + topic + ", liteTopic: " + liteTopic + ", response: " + response); return null; } @@ -78,6 +81,9 @@ public static Message buildMessage(String topic, String liteTopic, RocketMQRespo } public static String toJsonString(Object o) { + if (o == null) { + return null; + } try { return OBJECT_MAPPER.writeValueAsString(o); } catch (JsonProcessingException ex) { diff --git a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java index 267c47cb..20975c0d 100644 --- a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java +++ b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java @@ -17,7 +17,6 @@ import io.agentscope.runtime.engine.Runner; import io.agentscope.runtime.protocol.ProtocolConfig; import io.vertx.core.buffer.Buffer; -import io.vertx.ext.web.RoutingContext; import jakarta.annotation.PostConstruct; import org.apache.rocketmq.a2a.common.RocketMQRequest; import org.apache.rocketmq.a2a.common.RocketMQResponse; @@ -26,7 +25,6 @@ import org.apache.rocketmq.client.apis.consumer.MessageListener; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.apache.rocketmq.client.apis.producer.Producer; -import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.apache.rocketmq.shaded.commons.lang3.StringUtils; import org.springframework.beans.factory.ObjectProvider; import org.springframework.http.codec.ServerSentEvent; @@ -59,8 +57,8 @@ public void init() { logger.info("checkConfigParam rocketmq config param is not ready, ignore start rocketmq server!!!"); return; } - producer = buildProducer(); - fluxSseSupport = new FluxSseSupport(producer); + this.producer = buildProducer(); + fluxSseSupport = new FluxSseSupport(this.producer); pushConsumer = buildConsumer(buildMessageListener()); } catch (Exception e) { logger.info("A2aRocketMQController init error, please check the rocketmq config, e: " + e.getMessage()); @@ -76,21 +74,16 @@ private MessageListener buildMessageListener() { String body = request.getRequestBody(); JSONRPCResponse nonStreamingResponse = null; JSONRPCErrorResponse error = null; - boolean streaming = isStreamingRequest(body, null); try { if (streaming) { Flux> serverSentEventFlux = handleStreamRequest(body, null); CompletableFuture completableFuture = new CompletableFuture(); executor.execute(() -> { - fluxSseSupport.subscribeObjectRocketmq(serverSentEventFlux.map(i -> (Object)i), null, request.getWorkAgentResponseTopic(), request.getLiteTopic(), messageView.getMessageId().toString(), completableFuture); + fluxSseSupport.subscribeObjectRocketmq(serverSentEventFlux.map(i -> (Object)i), request.getWorkAgentResponseTopic(), request.getLiteTopic(), messageView.getMessageId().toString(), completableFuture); }); Boolean streamResult = completableFuture.get(15, TimeUnit.MINUTES); - if (null != streamResult && streamResult) { - return ConsumeResult.SUCCESS; - } else { - return ConsumeResult.FAILURE; - } + return Boolean.TRUE.equals(streamResult) ? ConsumeResult.SUCCESS : ConsumeResult.FAILURE; } else { nonStreamingResponse = handleNonStreamRequest(body, null); } @@ -99,14 +92,8 @@ private MessageListener buildMessageListener() { error = new JSONRPCErrorResponse(null, new JSONParseError()); } finally { if (!streaming) { - RocketMQResponse response = null; - if (error != null) { - response = new RocketMQResponse(request.getLiteTopic(), null, JSON.toJSONString(error), messageView.getMessageId().toString(), false, true); - } else { - response = new RocketMQResponse(request.getLiteTopic(), null, toJsonString(nonStreamingResponse), messageView.getMessageId().toString(), false, true); - } - SendReceipt send = producer.send(buildMessage(request.getWorkAgentResponseTopic(), request.getLiteTopic(), response)); - logger.info("send response success:" + send.getMessageId() + ", time: " + System.currentTimeMillis() ); + String responseBody = (error != null) ? JSON.toJSONString(error) : toJsonString(nonStreamingResponse); + producer.send(buildMessage(request.getWorkAgentResponseTopic(), request.getLiteTopic(), new RocketMQResponse(request.getLiteTopic(), null, responseBody, messageView.getMessageId().toString(), false, true))); } } } catch (Exception e) { @@ -124,7 +111,7 @@ private FluxSseSupport(Producer producer) { this.producer = producer; } - public void subscribeObjectRocketmq(Flux multi, RoutingContext rc, String WorkAgentResponseTopic, String liteTopic, String msgId, CompletableFuture completableFuture) { + public void subscribeObjectRocketmq(Flux multi, String WorkAgentResponseTopic, String liteTopic, String msgId, CompletableFuture completableFuture) { AtomicLong count = new AtomicLong(); Flux map = multi.map(new Function() { @Override @@ -136,30 +123,27 @@ public Buffer apply(Object o) { return Buffer.buffer("data: " + toJsonString(o) + "\nid: " + count.getAndIncrement() + "\n\n"); } }); - writeRocketmq(map, rc, WorkAgentResponseTopic, liteTopic, msgId, completableFuture); + writeRocketmq(map, WorkAgentResponseTopic, liteTopic, msgId, completableFuture); } - public void writeRocketmq(Flux flux, RoutingContext rc, String WorkAgentResponseTopic, String liteTopic, String msgId, CompletableFuture completableFuture) { + public void writeRocketmq(Flux flux, String WorkAgentResponseTopic, String liteTopic, String msgId, CompletableFuture completableFuture) { flux.subscribe( event -> { try { - SendReceipt send = producer.send(buildMessage(WorkAgentResponseTopic, liteTopic, new RocketMQResponse(liteTopic, null, event.toString(), msgId, true, false))); - logger.info("rocketmq send stream success: " + send.getMessageId() + " time " + System.currentTimeMillis()); + producer.send(buildMessage(WorkAgentResponseTopic, liteTopic, new RocketMQResponse(liteTopic, null, event.toString(), msgId, true, false))); } catch (ClientException error) { - logger.info("rocketmq send stream error: " + error.getMessage()); + logger.info("writeRocketmq send stream error: " + error.getMessage()); } }, error -> { - logger.info("send stream error: " + error.getMessage()); + logger.info("writeRocketmq send stream error: " + error.getMessage()); completableFuture.complete(false); }, () -> { - logger.info("send stream completed."); try { - SendReceipt send = producer.send(buildMessage(WorkAgentResponseTopic, liteTopic, new RocketMQResponse(liteTopic, null, null, msgId, true, true))); - logger.info("rocketmq send stream success: " + send.getMessageId() + " time " + System.currentTimeMillis()); + producer.send(buildMessage(WorkAgentResponseTopic, liteTopic, new RocketMQResponse(liteTopic, null, null, msgId, true, true))); } catch (ClientException e) { - logger.info("rocketmq send stream error: " + e.getMessage() ); + logger.info("writeRocketmq send stream error: " + e.getMessage() ); } completableFuture.complete(true); } From 2357467a4e26917a86436d3976d3e0f15fbbc9e4 Mon Sep 17 00:00:00 2001 From: "drizzle.zk" Date: Wed, 24 Dec 2025 14:49:11 +0800 Subject: [PATCH 06/22] update Change-Id: I8cf36089d53971ffdac673040dde16cbc447431b --- .../a2a/controller/A2aRocketMQController.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java index 20975c0d..4d92459a 100644 --- a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java +++ b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java @@ -1,9 +1,10 @@ package io.agentscope.runtime.protocol.a2a.controller; import java.nio.charset.StandardCharsets; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; @@ -44,7 +45,13 @@ public class A2aRocketMQController extends A2aController { private Producer producer; private PushConsumer pushConsumer; private FluxSseSupport fluxSseSupport; - Executor executor = Executors.newFixedThreadPool(6); + private final ThreadPoolExecutor executor = new ThreadPoolExecutor( + 6, + 6, + 60, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(10_0000), + new CallerRunsPolicy() + ); public A2aRocketMQController(Runner runner, AgentCard agentCard, ObjectProvider protocolConfigs) { super(runner, agentCard, protocolConfigs); @@ -88,7 +95,6 @@ private MessageListener buildMessageListener() { nonStreamingResponse = handleNonStreamRequest(body, null); } } catch (JsonProcessingException e) { - logger.info("JSON parsing error: " + e.getMessage()); error = new JSONRPCErrorResponse(null, new JSONParseError()); } finally { if (!streaming) { From fce589b7bd605738293364c6d72007dee26717f0 Mon Sep 17 00:00:00 2001 From: "drizzle.zk" Date: Wed, 24 Dec 2025 15:00:53 +0800 Subject: [PATCH 07/22] optimize the code Change-Id: I2c43992ef9f66f9898f017f7c76f2ba31745f619 --- .../a2a/controller/A2aRocketMQController.java | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java index 4d92459a..0c447b65 100644 --- a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java +++ b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java @@ -61,12 +61,12 @@ public A2aRocketMQController(Runner runner, AgentCard agentCard, ObjectProvider< public void init() { try { if (!checkConfigParam()) { - logger.info("checkConfigParam rocketmq config param is not ready, ignore start rocketmq server!!!"); + logger.info("checkConfigParam rocketmq config param is not ok, ignore rocketmq server!!!"); return; } this.producer = buildProducer(); fluxSseSupport = new FluxSseSupport(this.producer); - pushConsumer = buildConsumer(buildMessageListener()); + this.pushConsumer = buildConsumer(buildMessageListener()); } catch (Exception e) { logger.info("A2aRocketMQController init error, please check the rocketmq config, e: " + e.getMessage()); } @@ -103,7 +103,7 @@ private MessageListener buildMessageListener() { } } } catch (Exception e) { - logger.info("error " + e.getMessage()); + logger.info("consumer error " + e.getMessage()); return ConsumeResult.FAILURE; } return ConsumeResult.SUCCESS; @@ -117,7 +117,13 @@ private FluxSseSupport(Producer producer) { this.producer = producer; } - public void subscribeObjectRocketmq(Flux multi, String WorkAgentResponseTopic, String liteTopic, String msgId, CompletableFuture completableFuture) { + public void subscribeObjectRocketmq(Flux multi, String workAgentResponseTopic, String liteTopic, String msgId, CompletableFuture completableFuture) { + if (null == multi || StringUtils.isEmpty(workAgentResponseTopic) || StringUtils.isEmpty(liteTopic) || StringUtils.isEmpty(msgId)) { + logger.info("subscribeObjectRocketmq param error"); + completableFuture.complete(false); + return; + } + AtomicLong count = new AtomicLong(); Flux map = multi.map(new Function() { @Override @@ -129,14 +135,14 @@ public Buffer apply(Object o) { return Buffer.buffer("data: " + toJsonString(o) + "\nid: " + count.getAndIncrement() + "\n\n"); } }); - writeRocketmq(map, WorkAgentResponseTopic, liteTopic, msgId, completableFuture); + writeRocketmq(map, workAgentResponseTopic, liteTopic, msgId, completableFuture); } - public void writeRocketmq(Flux flux, String WorkAgentResponseTopic, String liteTopic, String msgId, CompletableFuture completableFuture) { + private void writeRocketmq(Flux flux, String workAgentResponseTopic, String liteTopic, String msgId, CompletableFuture completableFuture) { flux.subscribe( event -> { try { - producer.send(buildMessage(WorkAgentResponseTopic, liteTopic, new RocketMQResponse(liteTopic, null, event.toString(), msgId, true, false))); + producer.send(buildMessage(workAgentResponseTopic, liteTopic, new RocketMQResponse(liteTopic, null, event.toString(), msgId, true, false))); } catch (ClientException error) { logger.info("writeRocketmq send stream error: " + error.getMessage()); } @@ -147,7 +153,7 @@ public void writeRocketmq(Flux flux, String WorkAgentResponseTopic, Stri }, () -> { try { - producer.send(buildMessage(WorkAgentResponseTopic, liteTopic, new RocketMQResponse(liteTopic, null, null, msgId, true, true))); + producer.send(buildMessage(workAgentResponseTopic, liteTopic, new RocketMQResponse(liteTopic, null, null, msgId, true, true))); } catch (ClientException e) { logger.info("writeRocketmq send stream error: " + e.getMessage() ); } From 1f1741093daa3da1f097bfdbe40596fc253dad62 Mon Sep 17 00:00:00 2001 From: "drizzle.zk" Date: Wed, 24 Dec 2025 15:09:03 +0800 Subject: [PATCH 08/22] optimize the code Change-Id: I4ded067f7a22f0e573ed3c96b99ae334bbb8ec31 --- .../io/agentscope/runtime/protocol/a2a/RocketMQUtils.java | 1 - .../protocol/a2a/controller/A2aRocketMQController.java | 8 +++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java b/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java index 1b2090fe..ae644901 100644 --- a/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java +++ b/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java @@ -20,7 +20,6 @@ import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.ProducerBuilder; import org.apache.rocketmq.shaded.commons.lang3.StringUtils; - import static io.a2a.util.Utils.OBJECT_MAPPER; public class RocketMQUtils { diff --git a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java index 0c447b65..3e80c9c5 100644 --- a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java +++ b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java @@ -65,7 +65,7 @@ public void init() { return; } this.producer = buildProducer(); - fluxSseSupport = new FluxSseSupport(this.producer); + this.fluxSseSupport = new FluxSseSupport(this.producer); this.pushConsumer = buildConsumer(buildMessageListener()); } catch (Exception e) { logger.info("A2aRocketMQController init error, please check the rocketmq config, e: " + e.getMessage()); @@ -85,7 +85,7 @@ private MessageListener buildMessageListener() { try { if (streaming) { Flux> serverSentEventFlux = handleStreamRequest(body, null); - CompletableFuture completableFuture = new CompletableFuture(); + CompletableFuture completableFuture = new CompletableFuture<>(); executor.execute(() -> { fluxSseSupport.subscribeObjectRocketmq(serverSentEventFlux.map(i -> (Object)i), request.getWorkAgentResponseTopic(), request.getLiteTopic(), messageView.getMessageId().toString(), completableFuture); }); @@ -123,7 +123,6 @@ public void subscribeObjectRocketmq(Flux multi, String workAgentResponse completableFuture.complete(false); return; } - AtomicLong count = new AtomicLong(); Flux map = multi.map(new Function() { @Override @@ -140,6 +139,7 @@ public Buffer apply(Object o) { private void writeRocketmq(Flux flux, String workAgentResponseTopic, String liteTopic, String msgId, CompletableFuture completableFuture) { flux.subscribe( + //next event -> { try { producer.send(buildMessage(workAgentResponseTopic, liteTopic, new RocketMQResponse(liteTopic, null, event.toString(), msgId, true, false))); @@ -147,10 +147,12 @@ private void writeRocketmq(Flux flux, String workAgentResponseTopic, Str logger.info("writeRocketmq send stream error: " + error.getMessage()); } }, + //error error -> { logger.info("writeRocketmq send stream error: " + error.getMessage()); completableFuture.complete(false); }, + //complete () -> { try { producer.send(buildMessage(workAgentResponseTopic, liteTopic, new RocketMQResponse(liteTopic, null, null, msgId, true, true))); From 7250b7b7f1238810b02aded019e936a7fa36bfb0 Mon Sep 17 00:00:00 2001 From: "drizzle.zk" Date: Wed, 24 Dec 2025 16:23:47 +0800 Subject: [PATCH 09/22] optimize the code Change-Id: I712a5413ab43393e398a00cf12afe4e916e361fb --- .../runtime/protocol/a2a/RocketMQUtils.java | 2 +- .../a2a/controller/A2aRocketMQController.java | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java b/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java index ae644901..674c198d 100644 --- a/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java +++ b/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java @@ -23,7 +23,7 @@ import static io.a2a.util.Utils.OBJECT_MAPPER; public class RocketMQUtils { - private static Logger logger = Logger.getLogger(RocketMQUtils.class.getName()); + private static final Logger logger = Logger.getLogger(RocketMQUtils.class.getName()); public static final String ROCKETMQ_ENDPOINT = System.getProperty("rocketMQEndpoint", ""); public static final String ROCKETMQ_NAMESPACE = System.getProperty("rocketMQNamespace", ""); public static final String BIZ_TOPIC = System.getProperty("bizTopic", ""); diff --git a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java index 3e80c9c5..48ec80d0 100644 --- a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java +++ b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java @@ -41,7 +41,7 @@ @Controller @RequestMapping("/a2a-rocketmq") public class A2aRocketMQController extends A2aController { - private static Logger logger = Logger.getLogger(A2aRocketMQController.class.getName()); + private static final Logger logger = Logger.getLogger(A2aRocketMQController.class.getName()); private Producer producer; private PushConsumer pushConsumer; private FluxSseSupport fluxSseSupport; @@ -87,7 +87,7 @@ private MessageListener buildMessageListener() { Flux> serverSentEventFlux = handleStreamRequest(body, null); CompletableFuture completableFuture = new CompletableFuture<>(); executor.execute(() -> { - fluxSseSupport.subscribeObjectRocketmq(serverSentEventFlux.map(i -> (Object)i), request.getWorkAgentResponseTopic(), request.getLiteTopic(), messageView.getMessageId().toString(), completableFuture); + fluxSseSupport.subscribeObjectRocketMQ(serverSentEventFlux.map(i -> (Object)i), request.getWorkAgentResponseTopic(), request.getLiteTopic(), messageView.getMessageId().toString(), completableFuture); }); Boolean streamResult = completableFuture.get(15, TimeUnit.MINUTES); return Boolean.TRUE.equals(streamResult) ? ConsumeResult.SUCCESS : ConsumeResult.FAILURE; @@ -117,9 +117,9 @@ private FluxSseSupport(Producer producer) { this.producer = producer; } - public void subscribeObjectRocketmq(Flux multi, String workAgentResponseTopic, String liteTopic, String msgId, CompletableFuture completableFuture) { + public void subscribeObjectRocketMQ(Flux multi, String workAgentResponseTopic, String liteTopic, String msgId, CompletableFuture completableFuture) { if (null == multi || StringUtils.isEmpty(workAgentResponseTopic) || StringUtils.isEmpty(liteTopic) || StringUtils.isEmpty(msgId)) { - logger.info("subscribeObjectRocketmq param error"); + logger.info("subscribeObjectRocketMQ param error"); completableFuture.complete(false); return; } @@ -134,22 +134,22 @@ public Buffer apply(Object o) { return Buffer.buffer("data: " + toJsonString(o) + "\nid: " + count.getAndIncrement() + "\n\n"); } }); - writeRocketmq(map, workAgentResponseTopic, liteTopic, msgId, completableFuture); + writeRocketMQ(map, workAgentResponseTopic, liteTopic, msgId, completableFuture); } - private void writeRocketmq(Flux flux, String workAgentResponseTopic, String liteTopic, String msgId, CompletableFuture completableFuture) { + private void writeRocketMQ(Flux flux, String workAgentResponseTopic, String liteTopic, String msgId, CompletableFuture completableFuture) { flux.subscribe( //next event -> { try { producer.send(buildMessage(workAgentResponseTopic, liteTopic, new RocketMQResponse(liteTopic, null, event.toString(), msgId, true, false))); } catch (ClientException error) { - logger.info("writeRocketmq send stream error: " + error.getMessage()); + logger.info("writeRocketMQ send stream error: " + error.getMessage()); } }, //error error -> { - logger.info("writeRocketmq send stream error: " + error.getMessage()); + logger.info("writeRocketMQ send stream error: " + error.getMessage()); completableFuture.complete(false); }, //complete @@ -157,7 +157,7 @@ private void writeRocketmq(Flux flux, String workAgentResponseTopic, Str try { producer.send(buildMessage(workAgentResponseTopic, liteTopic, new RocketMQResponse(liteTopic, null, null, msgId, true, true))); } catch (ClientException e) { - logger.info("writeRocketmq send stream error: " + e.getMessage() ); + logger.info("writeRocketMQ send stream error: " + e.getMessage() ); } completableFuture.complete(true); } From 343a95c0716d9ebe3993188c0c556220239b8449 Mon Sep 17 00:00:00 2001 From: "drizzle.zk" Date: Thu, 25 Dec 2025 16:13:20 +0800 Subject: [PATCH 10/22] update Change-Id: Ib7dc4eb71d63b8d236e2a9a091e10eb61e94ac8d --- .../runtime/protocol/a2a/controller/A2aRocketMQController.java | 1 - 1 file changed, 1 deletion(-) diff --git a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java index 48ec80d0..a696805d 100644 --- a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java +++ b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java @@ -164,5 +164,4 @@ private void writeRocketMQ(Flux flux, String workAgentResponseTopic, Str ); } } - } From 56b44cd06adbbd5f536c3f1b3652a47e4be92f6f Mon Sep 17 00:00:00 2001 From: "drizzle.zk" Date: Thu, 25 Dec 2025 20:21:50 +0800 Subject: [PATCH 11/22] add rocketmq example Change-Id: I212ee343bce187cde7d37ab897cd31cad4ccbca1 --- .../agentscope_use_example/pom.xml | 6 + .../AgentScopeDeployRocketMQExample.java | 132 ++++++++++++++++++ pom.xml | 2 +- .../a2a/controller/A2aRocketMQController.java | 7 +- .../a2a/controller/AgentCardController.java | 4 + 5 files changed, 148 insertions(+), 3 deletions(-) create mode 100755 examples/simple_agent_use_examples/agentscope_use_example/src/main/java/io/agentscope/AgentScopeDeployRocketMQExample.java diff --git a/examples/simple_agent_use_examples/agentscope_use_example/pom.xml b/examples/simple_agent_use_examples/agentscope_use_example/pom.xml index 36d9d9fb..a0168b4f 100755 --- a/examples/simple_agent_use_examples/agentscope_use_example/pom.xml +++ b/examples/simple_agent_use_examples/agentscope_use_example/pom.xml @@ -53,6 +53,12 @@ 1.0.0 + + io.agentscope + spring-boot-starter-agentscope-runtime-a2a-nacos + 1.0.3 + + org.slf4j diff --git a/examples/simple_agent_use_examples/agentscope_use_example/src/main/java/io/agentscope/AgentScopeDeployRocketMQExample.java b/examples/simple_agent_use_examples/agentscope_use_example/src/main/java/io/agentscope/AgentScopeDeployRocketMQExample.java new file mode 100755 index 00000000..ff660099 --- /dev/null +++ b/examples/simple_agent_use_examples/agentscope_use_example/src/main/java/io/agentscope/AgentScopeDeployRocketMQExample.java @@ -0,0 +1,132 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.agentscope; + +import java.util.List; +import io.a2a.spec.AgentInterface; +import io.agentscope.core.ReActAgent; +import io.agentscope.core.agent.EventType; +import io.agentscope.core.agent.StreamOptions; +import io.agentscope.core.message.Msg; +import io.agentscope.core.message.MsgRole; +import io.agentscope.core.model.DashScopeChatModel; +import io.agentscope.runtime.LocalDeployManager; +import io.agentscope.runtime.adapters.agentscope.AgentScopeAgentHandler; +import io.agentscope.runtime.app.AgentApp; +import io.agentscope.runtime.engine.schemas.AgentRequest; +import io.agentscope.runtime.protocol.a2a.A2aProtocolConfig; +import io.agentscope.runtime.protocol.a2a.ConfigurableAgentCard; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.a2a.common.RocketMQA2AConstant; +import reactor.core.publisher.Flux; + +/** + * Example demonstrating how to use AgentScope to proxy ReActAgent + */ +public class AgentScopeDeployRocketMQExample { + private static final String ROCKETMQ_ENDPOINT = System.getProperty("rocketMQEndpoint", ""); + private static final String ROCKETMQ_NAMESPACE = System.getProperty("rocketMQNamespace", ""); + private static final String BIZ_TOPIC = System.getProperty("bizTopic", ""); + private static final String BIZ_CONSUMER_GROUP = System.getProperty("bizConsumerGroup", ""); + private static final String ACCESS_KEY = System.getProperty("rocketMQAK", ""); + private static final String SECRET_KEY = System.getProperty("rocketMQSK", ""); + private static final String DASHSCOPE_API_KEY = System.getProperty("apiKey", ""); + + public static void main(String[] args) { + if (!checkConfigParam()) { + System.exit(1); + } + runAgent(); + } + + private static void runAgent() { + AgentInterface agentInterface = new AgentInterface(RocketMQA2AConstant.ROCKETMQ_PROTOCOL, buildRocketMQUrl()); + ConfigurableAgentCard agentCard = new ConfigurableAgentCard.Builder().url(buildRocketMQUrl()).preferredTransport(RocketMQA2AConstant.ROCKETMQ_PROTOCOL).additionalInterfaces(List.of(agentInterface)).description("use rocketmq as transport").build(); + AgentApp agentApp = new AgentApp(agent(agentBuilder(dashScopeChatModel(DASHSCOPE_API_KEY)))); + agentApp.deployManager(LocalDeployManager.builder().protocolConfigs(List.of(new A2aProtocolConfig(agentCard, 60, 10))).port(10001).build()); + agentApp.cors(registry -> registry.addMapping("/**").allowedOriginPatterns("*").allowedMethods("GET", "POST", "PUT", "DELETE").allowCredentials(true)); + agentApp.run(); + } + + public static ReActAgent.Builder agentBuilder(DashScopeChatModel model) { + return ReActAgent.builder().model(model).name("agentscope-a2a-example-agent").sysPrompt("You are an example of A2A(Agent2Agent) Protocol Agent. You can answer some simple question according to your knowledge."); + } + + public static AgentScopeAgentHandler agent(ReActAgent.Builder builder) { + return new AgentScopeAgentHandler() { + @Override + public boolean isHealthy() { + return true; + } + + @Override + public Flux streamQuery(AgentRequest request, Object messages) { + ReActAgent agent = builder.build(); + StreamOptions streamOptions = StreamOptions.builder().eventTypes(EventType.REASONING, EventType.TOOL_RESULT).incremental(true).build(); + if (messages instanceof List) { + return agent.stream((List)messages, streamOptions); + } else if (messages instanceof Msg) { + return agent.stream((Msg)messages, streamOptions); + } else { + Msg msg = Msg.builder().role(MsgRole.USER).build(); + return agent.stream(msg, streamOptions); + } + } + @Override + public String getName() { + return builder.build().getName(); + } + + @Override + public String getDescription() { + return builder.build().getDescription(); + } + }; + } + public static DashScopeChatModel dashScopeChatModel(String dashScopeApiKey) { + if (StringUtils.isEmpty(dashScopeApiKey)) { + throw new IllegalStateException("DashScope API Key is empty, please set environment variable `AI_DASHSCOPE_API_KEY`"); + } + return DashScopeChatModel.builder().apiKey(dashScopeApiKey).modelName("qwen-max").stream(true).enableThinking(true).build(); + } + + private static String buildRocketMQUrl() { + if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT) || StringUtils.isEmpty(BIZ_TOPIC)) { + throw new RuntimeException("buildRocketMQUrl param error, please check rocketmq config"); + } + return "http://" + ROCKETMQ_ENDPOINT + "/" + ROCKETMQ_NAMESPACE + "/" + BIZ_TOPIC; + } + + private static boolean checkConfigParam() { + if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT) || StringUtils.isEmpty(BIZ_TOPIC) || StringUtils.isEmpty(BIZ_CONSUMER_GROUP) || StringUtils.isEmpty(DASHSCOPE_API_KEY)) { + if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT)) { + System.err.println("rocketMQEndpoint is empty"); + } + if (StringUtils.isEmpty(BIZ_TOPIC)) { + System.err.println("bizTopic is empty"); + } + if (StringUtils.isEmpty(BIZ_CONSUMER_GROUP)) { + System.err.println("bizConsumerGroup is empty"); + } + if (StringUtils.isEmpty(DASHSCOPE_API_KEY)) { + System.err.println("apiKey is empty"); + } + return false; + } + return true; + } +} diff --git a/pom.xml b/pom.xml index 60ed0ee9..2fe77191 100644 --- a/pom.xml +++ b/pom.xml @@ -296,7 +296,7 @@ org.apache.rocketmq rocketmq-a2a - 1.0.5 + 1.0.8-SNAPSHOT org.jboss.slf4j diff --git a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java index a696805d..866ad1d1 100644 --- a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java +++ b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java @@ -1,6 +1,8 @@ package io.agentscope.runtime.protocol.a2a.controller; import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; @@ -11,6 +13,7 @@ import java.util.logging.Logger; import com.alibaba.fastjson.JSON; import com.fasterxml.jackson.core.JsonProcessingException; +import io.a2a.server.ServerCallContext; import io.a2a.spec.AgentCard; import io.a2a.spec.JSONParseError; import io.a2a.spec.JSONRPCErrorResponse; @@ -84,7 +87,7 @@ private MessageListener buildMessageListener() { boolean streaming = isStreamingRequest(body, null); try { if (streaming) { - Flux> serverSentEventFlux = handleStreamRequest(body, null); + Flux> serverSentEventFlux = handleStreamRequest(body, new ServerCallContext(null, Map.of(ContextKeys.IS_STREAM_KEY, true), Set.of())); CompletableFuture completableFuture = new CompletableFuture<>(); executor.execute(() -> { fluxSseSupport.subscribeObjectRocketMQ(serverSentEventFlux.map(i -> (Object)i), request.getWorkAgentResponseTopic(), request.getLiteTopic(), messageView.getMessageId().toString(), completableFuture); @@ -92,7 +95,7 @@ private MessageListener buildMessageListener() { Boolean streamResult = completableFuture.get(15, TimeUnit.MINUTES); return Boolean.TRUE.equals(streamResult) ? ConsumeResult.SUCCESS : ConsumeResult.FAILURE; } else { - nonStreamingResponse = handleNonStreamRequest(body, null); + nonStreamingResponse = handleNonStreamRequest(body, new ServerCallContext(null, Map.of(), Set.of())); } } catch (JsonProcessingException e) { error = new JSONRPCErrorResponse(null, new JSONParseError()); diff --git a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/AgentCardController.java b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/AgentCardController.java index a3d005a2..5588b8e3 100644 --- a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/AgentCardController.java +++ b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/AgentCardController.java @@ -42,4 +42,8 @@ public AgentCard getAgentCard() { return jsonRpcHandler.getAgentCard(); } + @GetMapping(value = "/.well-known/agent-card.json", produces = MediaType.APPLICATION_JSON_VALUE) + public AgentCard getAgentCardInfo() { + return jsonRpcHandler.getAgentCard(); + } } From cf72b1c714e9caf4af221044a641c971382128c2 Mon Sep 17 00:00:00 2001 From: "drizzle.zk" Date: Fri, 26 Dec 2025 10:32:31 +0800 Subject: [PATCH 12/22] add use rocketmq transport example Change-Id: Ic0c12d9137ea6212e59beab28adbc1a58ec04d6a --- .../pom.xml | 52 +++++++++ .../io/agentscope/A2aAgentCallerExample.java | 101 +++++++++++++++++ .../src/main/resources/logback.xml | 29 +++++ .../pom.xml | 103 ++++++++++++++++++ .../AgentScopeDeployRocketMQExample.java | 3 +- pom.xml | 2 + .../a2a/controller/A2aRocketMQController.java | 2 +- 7 files changed, 290 insertions(+), 2 deletions(-) create mode 100644 examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/pom.xml create mode 100644 examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/src/main/java/io/agentscope/A2aAgentCallerExample.java create mode 100644 examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/src/main/resources/logback.xml create mode 100644 examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/pom.xml rename examples/{simple_agent_use_examples/agentscope_use_example => simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example}/src/main/java/io/agentscope/AgentScopeDeployRocketMQExample.java (95%) diff --git a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/pom.xml b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/pom.xml new file mode 100644 index 00000000..1e282009 --- /dev/null +++ b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/pom.xml @@ -0,0 +1,52 @@ + + + 4.0.0 + + io.agentscope + agentscope-runtime + 1.0.0 + ../../../pom.xml + + + agentscope_use_rocketmq_client_example + + + 17 + 17 + UTF-8 + + 1.0.3 + 1.0.3 + ${revision} + + 1.5.20 + + + + + io.agentscope + agentscope-extensions-a2a-nacos + ${agentscope-extensions.version} + + + + io.agentscope + agentscope-core + ${agentscope.version} + + + + ch.qos.logback + logback-classic + ${logback.version} + + + org.apache.rocketmq + rocketmq-a2a + 1.0.8-SNAPSHOT + + + + diff --git a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/src/main/java/io/agentscope/A2aAgentCallerExample.java b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/src/main/java/io/agentscope/A2aAgentCallerExample.java new file mode 100644 index 00000000..dd89f089 --- /dev/null +++ b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/src/main/java/io/agentscope/A2aAgentCallerExample.java @@ -0,0 +1,101 @@ +/* + * Copyright 1999-2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.agentscope; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import com.alibaba.nacos.api.exception.NacosException; +import io.a2a.client.http.JdkA2AHttpClient; +import io.agentscope.core.a2a.agent.A2aAgent; +import io.agentscope.core.a2a.agent.A2aAgentConfig; +import io.agentscope.core.a2a.agent.A2aAgentConfig.A2aAgentConfigBuilder; +import io.agentscope.core.a2a.agent.card.WellKnownAgentCardResolver; +import io.agentscope.core.message.Msg; +import io.agentscope.core.message.MsgRole; +import io.agentscope.core.message.TextBlock; +import org.apache.rocketmq.a2a.transport.RocketMQTransport; +import org.apache.rocketmq.a2a.transport.RocketMQTransportConfig; +import reactor.core.publisher.Flux; + +public class A2aAgentCallerExample { + private static final String USER_INPUT_PREFIX = "\u001B[34mYou>\u001B[0m "; + private static final String AGENT_RESPONSE_PREFIX = "\u001B[32mAgent>\u001B[0m "; + private static final String ACCESS_KEY = System.getProperty("rocketMQAK"); + private static final String SECRET_KEY = System.getProperty("rocketMQSK"); + private static final String WORK_AGENT_RESPONSE_TOPIC = System.getProperty("workAgentResponseTopic"); + private static final String WORK_AGENT_RESPONSE_GROUP_ID = System.getProperty("workAgentResponseGroupID"); + private static final String ROCKETMQ_NAMESPACE = System.getProperty("rocketMQNamespace"); + private static final String agentName = "agentscope-a2a-rocketmq-example-agent"; + + + // Can change this to false disable streaming. + static boolean streaming = true; + + public static void main(String[] args) throws NacosException { + RocketMQTransportConfig rocketMQTransportConfig = new RocketMQTransportConfig(); + rocketMQTransportConfig.setAccessKey(ACCESS_KEY); + rocketMQTransportConfig.setSecretKey(SECRET_KEY); + rocketMQTransportConfig.setWorkAgentResponseTopic(WORK_AGENT_RESPONSE_TOPIC); + rocketMQTransportConfig.setWorkAgentResponseGroupID(WORK_AGENT_RESPONSE_GROUP_ID); + rocketMQTransportConfig.setNamespace(ROCKETMQ_NAMESPACE); + rocketMQTransportConfig.setHttpClient(new JdkA2AHttpClient()); + A2aAgentConfig a2aAgentConfig = new A2aAgentConfigBuilder().withTransport(RocketMQTransport.class, rocketMQTransportConfig).build(); + A2aAgent agent = A2aAgent.builder().a2aAgentConfig(a2aAgentConfig).name("agentscope-a2a-example-agent").agentCardResolver(WellKnownAgentCardResolver.builder().baseUrl("http://localhost:10001").relativeCardPath("/.well-known/agent.json").build()).build(); + startExample(agent); + } + + private static void startExample(A2aAgent agent) { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(System.in))) { + while (true) { + // 用户输入提示 + System.out.print(USER_INPUT_PREFIX); + String input = reader.readLine(); + // 退出条件检测 + if (input == null || input.trim().equalsIgnoreCase("exit") || input.trim().equalsIgnoreCase("quit")) { + System.out.println(AGENT_RESPONSE_PREFIX + "Bye!"); + break; + } + System.out.println(AGENT_RESPONSE_PREFIX + "I have received your question: " + input); + // 流式输出带前缀 + System.out.print(AGENT_RESPONSE_PREFIX); + // 处理输入并获取响应 + processInput(agent, input).doOnNext(System.out::print).then().block(); + // 换行分隔 + System.out.println(); + } + } catch (IOException e) { + System.err.println("input error: " + e.getMessage()); + } + } + + private static Flux processInput(A2aAgent agent, String input) { + Msg msg = Msg.builder().role(MsgRole.USER).content(TextBlock.builder().text(input).build()).build(); + return agent.stream(msg).map(event -> { + if (streaming && event.isLast()) { + // The last message is whole artifact message result, which has been solved and print in before event handle. + // Weather need to handle the last message, depends on the use case. + return ""; + } + Msg message = event.getMessage(); + StringBuilder partText = new StringBuilder(); + message.getContent().stream().filter(block -> block instanceof TextBlock).map(block -> (TextBlock) block) + .forEach(block -> partText.append(block.getText())); + return partText.toString(); + }); + } +} diff --git a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/src/main/resources/logback.xml b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/src/main/resources/logback.xml new file mode 100644 index 00000000..5157ba8f --- /dev/null +++ b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/src/main/resources/logback.xml @@ -0,0 +1,29 @@ + + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + diff --git a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/pom.xml b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/pom.xml new file mode 100644 index 00000000..3d9b53a4 --- /dev/null +++ b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/pom.xml @@ -0,0 +1,103 @@ + + + 4.0.0 + + io.agentscope + agentscope-runtime + 1.0.0 + ../../../pom.xml + + + agentscope_use_rocketmq_server_example + + + 17 + 17 + 17 + UTF-8 + + + + + io.agentscope + agentscope-runtime-agentscope + 1.0.0 + + + + io.agentscope + agentscope-runtime-web + 1.0.0 + + + + io.agentscope + spring-boot-starter-agentscope-runtime-a2a-nacos + 1.0.3 + + + + + org.slf4j + slf4j-api + + + + ch.qos.logback + logback-classic + + + + + + + spring-milestones + Spring Milestones + https://repo.spring.io/milestone + + false + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/snapshot + + false + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + io.agentscope.browser.BrowserAgentApplication + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.11.0 + + 17 + 17 + + + + + org.springframework.boot + spring-boot-maven-plugin + + io.agentscope.AgentScopeDeployExample + + + + + + diff --git a/examples/simple_agent_use_examples/agentscope_use_example/src/main/java/io/agentscope/AgentScopeDeployRocketMQExample.java b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/src/main/java/io/agentscope/AgentScopeDeployRocketMQExample.java similarity index 95% rename from examples/simple_agent_use_examples/agentscope_use_example/src/main/java/io/agentscope/AgentScopeDeployRocketMQExample.java rename to examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/src/main/java/io/agentscope/AgentScopeDeployRocketMQExample.java index ff660099..e06b0723 100755 --- a/examples/simple_agent_use_examples/agentscope_use_example/src/main/java/io/agentscope/AgentScopeDeployRocketMQExample.java +++ b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/src/main/java/io/agentscope/AgentScopeDeployRocketMQExample.java @@ -45,6 +45,7 @@ public class AgentScopeDeployRocketMQExample { private static final String ACCESS_KEY = System.getProperty("rocketMQAK", ""); private static final String SECRET_KEY = System.getProperty("rocketMQSK", ""); private static final String DASHSCOPE_API_KEY = System.getProperty("apiKey", ""); + private static final String AGENT_NAME = "agentscope-a2a-rocketmq-example-agent"; public static void main(String[] args) { if (!checkConfigParam()) { @@ -63,7 +64,7 @@ private static void runAgent() { } public static ReActAgent.Builder agentBuilder(DashScopeChatModel model) { - return ReActAgent.builder().model(model).name("agentscope-a2a-example-agent").sysPrompt("You are an example of A2A(Agent2Agent) Protocol Agent. You can answer some simple question according to your knowledge."); + return ReActAgent.builder().model(model).name(AGENT_NAME).sysPrompt("You are an example of A2A(Agent2Agent) Protocol Agent. You can answer some simple question according to your knowledge."); } public static AgentScopeAgentHandler agent(ReActAgent.Builder builder) { diff --git a/pom.xml b/pom.xml index 2fe77191..046c5efa 100644 --- a/pom.xml +++ b/pom.xml @@ -59,6 +59,8 @@ web maven_plugin starters/spring-boot-starter-runtime-a2a + examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example + examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example git://github.com/agentscope-ai/agentscope-runtime-java.git diff --git a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java index 866ad1d1..0497dd2e 100644 --- a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java +++ b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java @@ -145,7 +145,7 @@ private void writeRocketMQ(Flux flux, String workAgentResponseTopic, Str //next event -> { try { - producer.send(buildMessage(workAgentResponseTopic, liteTopic, new RocketMQResponse(liteTopic, null, event.toString(), msgId, true, false))); + producer.send(buildMessage(workAgentResponseTopic, liteTopic, new RocketMQResponse(liteTopic, null, event.toString(), msgId, true, false))); } catch (ClientException error) { logger.info("writeRocketMQ send stream error: " + error.getMessage()); } From ff30b09877d8ef7143113bbeabd49665d1783150 Mon Sep 17 00:00:00 2001 From: "drizzle.zk" Date: Fri, 26 Dec 2025 14:47:43 +0800 Subject: [PATCH 13/22] add rocketmq example Change-Id: I48b4426d86061e0b3c8ec0f9c44fd313dd494e2d --- .../README.md | 42 +++++++++++ .../pom.xml | 15 ++++ .../io/agentscope/A2aAgentCallerExample.java | 5 +- .../README.md | 75 +++++++++++++++++++ .../pom.xml | 17 +++++ .../AgentScopeDeployRocketMQExample.java | 2 +- 6 files changed, 152 insertions(+), 4 deletions(-) create mode 100755 examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/README.md create mode 100755 examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/README.md diff --git a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/README.md b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/README.md new file mode 100755 index 00000000..e6832a87 --- /dev/null +++ b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/README.md @@ -0,0 +1,42 @@ +# AgentScope Usage Example For Client + +This directory contains example demonstrating the Agent-to-Agent (A2A) protocol implementation with RocketMQ integration in AgentScope. + +## Prerequisites + +- Java 17+ +- Maven 3.6+ + +## Basic preliminary preparations + +### Deploy Apache RocketMQ + +Deploy the LiteTopic version of [Apache RocketMQ](http://rocketmq.apache.org/) (the open-source version is expected to be released by the end of December), or purchase a commercial RocketMQ instance that supports LiteTopic, and create the following resources: +- **1.1** Create a LiteTopic: `WorkerAgentResponse` +- **1.2** Create a bound Lite consumer group ID for WorkerAgentResponse: `CID_HOST_AGENT_LITE` + +## Quick Start + +| Parameter Name | Description | Required | +|-------|------------------|------| +| rocketMQNamespace | RocketMQ namespace | No | +| rocketMQAK | RocketMQ access key | No | +| rocketMQSK | RocketMQ secret key | No | +| workAgentResponseTopic | LiteTopic | Yes | +| workAgentResponseGroupID | LiteConsumer CID | Yes | + +### Build the Project + +```bash +mvn clean compile +``` + +### Run the Client Example + +```bash +mvn compile exec:java -Dexec.mainClass=io.agentscope.A2aAgentCallerExample -DrocketMQNamespace= -DworkAgentResponseTopic=WorkerAgentResponse -DworkAgentResponseGroupID=CID_HOST_AGENT_LITE -DrocketMQAK= -DrocketMQSK= +``` + +## Contributing + +If you'd like to contribute to these examples or report issues, please submit a pull request or open an issue in the repository. diff --git a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/pom.xml b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/pom.xml index 1e282009..a98ed49e 100644 --- a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/pom.xml +++ b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/pom.xml @@ -1,4 +1,19 @@ + diff --git a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/src/main/java/io/agentscope/A2aAgentCallerExample.java b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/src/main/java/io/agentscope/A2aAgentCallerExample.java index dd89f089..f5053387 100644 --- a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/src/main/java/io/agentscope/A2aAgentCallerExample.java +++ b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/src/main/java/io/agentscope/A2aAgentCallerExample.java @@ -40,8 +40,7 @@ public class A2aAgentCallerExample { private static final String WORK_AGENT_RESPONSE_TOPIC = System.getProperty("workAgentResponseTopic"); private static final String WORK_AGENT_RESPONSE_GROUP_ID = System.getProperty("workAgentResponseGroupID"); private static final String ROCKETMQ_NAMESPACE = System.getProperty("rocketMQNamespace"); - private static final String agentName = "agentscope-a2a-rocketmq-example-agent"; - + private static final String AGENT_NAME = "agentscope-a2a-rocketmq-example-agent"; // Can change this to false disable streaming. static boolean streaming = true; @@ -55,7 +54,7 @@ public static void main(String[] args) throws NacosException { rocketMQTransportConfig.setNamespace(ROCKETMQ_NAMESPACE); rocketMQTransportConfig.setHttpClient(new JdkA2AHttpClient()); A2aAgentConfig a2aAgentConfig = new A2aAgentConfigBuilder().withTransport(RocketMQTransport.class, rocketMQTransportConfig).build(); - A2aAgent agent = A2aAgent.builder().a2aAgentConfig(a2aAgentConfig).name("agentscope-a2a-example-agent").agentCardResolver(WellKnownAgentCardResolver.builder().baseUrl("http://localhost:10001").relativeCardPath("/.well-known/agent.json").build()).build(); + A2aAgent agent = A2aAgent.builder().a2aAgentConfig(a2aAgentConfig).name(AGENT_NAME).agentCardResolver(WellKnownAgentCardResolver.builder().baseUrl("http://localhost:10001").relativeCardPath("/.well-known/agent.json").build()).build(); startExample(agent); } diff --git a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/README.md b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/README.md new file mode 100755 index 00000000..05e0de5d --- /dev/null +++ b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/README.md @@ -0,0 +1,75 @@ +# AgentScope Usage Example For Server + +A minimal, end-to-end example showing how to build and deploy an intelligent agent with AgentScope Runtime Java using a ReAct-style agent, sandboxed tools, and local A2A deployment. + +## Features + +- Build agents using AgentScope's `ReActAgent` +- Integrate DashScope (Qwen) large language models +- Use RocketMQ for asynchronous communication between agents +- Call tools: Python execution, Shell command execution, browser navigation +- Deploy locally as an A2A application +- Stream responses and show thinking process +- One-click Docker image packaging + +## Prerequisites + +- Java 17+ +- Maven 3.6+ + +## Basic preliminary preparations + +### 1. Deploy Apache RocketMQ + +Deploy the LiteTopic version of [Apache RocketMQ](http://rocketmq.apache.org/) (the open-source version is expected to be released by the end of December), or purchase a commercial RocketMQ instance that supports LiteTopic, and create the following resources: +- **1.1** Create a standard topic for the AI assistant: `AgentTask` +- **1.2** Create a standard consumer group ID for the AI assistant: `AgentTaskConsumerGroup` + +### 2. Get Qwen API key +- Go to the Bailian platform to obtain the corresponding Qwen API key. + +## Quick Start + +| Parameter Name | Description | Required | +|-------|------------------|------| +| rocketMQEndpoint | RocketMQ service endpoint | Yes | +| rocketMQNamespace | RocketMQ namespace | No | +| bizTopic | Standard topic | Yes | +| bizConsumerGroup | Standard consumer group ID (CID) | Yes | +| rocketMQAK | RocketMQ access key | No | +| rocketMQSK | RocketMQ secret key | No | +| apiKey | API key for calling Bailian platform | Yes | + +### 1.Build the Project + +```bash +mvn clean compile +``` + +### 2.Run the Example + +```bash +mvn exec:java -Dexec.mainClass=io.agentscope.AgentScopeDeployRocketMQExample -DrocketMQEndpoint= -DrocketMQNamespace= -DbizTopic=AgentTask -DbizConsumerGroup=AgentTaskConsumerGroup -DrocketMQAK= -DrocketMQSK= -DapiKey= +``` + +### 3.Test the Deployed Agent +After deployment, the agent listens on `http://localhost:10001`. +Query AgentCard Info, `http://localhost:10001/.well-known/agent-card.json` + +## Notes + +1. Default deployment port is `10001` (change in code if needed) +2. The sandbox manager uses default settings; customize via `ManagerConfig` as required +3. This example uses in-memory storage; use persistent storage for production + +## Extension Ideas + +- Add custom tools +- Configure persistent storage (Redis, OSS, etc.) +- Deploy sandboxes with Kubernetes +- Integrate additional MCP tools + +## Related Documentation + +- [AgentScope Runtime Java (root)](../../README.md) +- [Examples Overview](../README.md) diff --git a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/pom.xml b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/pom.xml index 3d9b53a4..77be8ba8 100644 --- a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/pom.xml +++ b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/pom.xml @@ -1,4 +1,19 @@ + @@ -42,11 +57,13 @@ org.slf4j slf4j-api + 2.0.17 ch.qos.logback logback-classic + 1.5.18 diff --git a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/src/main/java/io/agentscope/AgentScopeDeployRocketMQExample.java b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/src/main/java/io/agentscope/AgentScopeDeployRocketMQExample.java index e06b0723..41506eb8 100755 --- a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/src/main/java/io/agentscope/AgentScopeDeployRocketMQExample.java +++ b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/src/main/java/io/agentscope/AgentScopeDeployRocketMQExample.java @@ -64,7 +64,7 @@ private static void runAgent() { } public static ReActAgent.Builder agentBuilder(DashScopeChatModel model) { - return ReActAgent.builder().model(model).name(AGENT_NAME).sysPrompt("You are an example of A2A(Agent2Agent) Protocol Agent. You can answer some simple question according to your knowledge."); + return ReActAgent.builder().model(model).name(AGENT_NAME).sysPrompt("You are an example of A2A(Agent2Agent) Protocol(use RocketmqTransport) Agent. You can answer some simple question according to your knowledge."); } public static AgentScopeAgentHandler agent(ReActAgent.Builder builder) { From 7706085b1d2d54632c675d5727bcdd0e0fb1c309 Mon Sep 17 00:00:00 2001 From: "drizzle.zk" Date: Fri, 26 Dec 2025 14:48:54 +0800 Subject: [PATCH 14/22] update Change-Id: I20cd1318bc2a3652d7c492bf0d59e79ce5179f41 --- .../agentscope_use_rocketmq_client_example/pom.xml | 2 +- pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/pom.xml b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/pom.xml index a98ed49e..9d46c19f 100644 --- a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/pom.xml +++ b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/pom.xml @@ -60,7 +60,7 @@ org.apache.rocketmq rocketmq-a2a - 1.0.8-SNAPSHOT + 1.0.7 diff --git a/pom.xml b/pom.xml index 046c5efa..e8d4f44f 100644 --- a/pom.xml +++ b/pom.xml @@ -298,7 +298,7 @@ org.apache.rocketmq rocketmq-a2a - 1.0.8-SNAPSHOT + 1.0.7 org.jboss.slf4j From b1438e1f7f9c4727c06a0490c5c76449f53c47c7 Mon Sep 17 00:00:00 2001 From: "drizzle.zk" Date: Fri, 26 Dec 2025 15:00:24 +0800 Subject: [PATCH 15/22] update Change-Id: I5bdecce5c0c6c00b108a88992eeb76a1e342983c --- .../src/main/java/io/agentscope/A2aAgentCallerExample.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/src/main/java/io/agentscope/A2aAgentCallerExample.java b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/src/main/java/io/agentscope/A2aAgentCallerExample.java index f5053387..30aa2706 100644 --- a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/src/main/java/io/agentscope/A2aAgentCallerExample.java +++ b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/src/main/java/io/agentscope/A2aAgentCallerExample.java @@ -54,7 +54,7 @@ public static void main(String[] args) throws NacosException { rocketMQTransportConfig.setNamespace(ROCKETMQ_NAMESPACE); rocketMQTransportConfig.setHttpClient(new JdkA2AHttpClient()); A2aAgentConfig a2aAgentConfig = new A2aAgentConfigBuilder().withTransport(RocketMQTransport.class, rocketMQTransportConfig).build(); - A2aAgent agent = A2aAgent.builder().a2aAgentConfig(a2aAgentConfig).name(AGENT_NAME).agentCardResolver(WellKnownAgentCardResolver.builder().baseUrl("http://localhost:10001").relativeCardPath("/.well-known/agent.json").build()).build(); + A2aAgent agent = A2aAgent.builder().a2aAgentConfig(a2aAgentConfig).name(AGENT_NAME).agentCardResolver(WellKnownAgentCardResolver.builder().baseUrl("http://localhost:10001").build()).build(); startExample(agent); } From c86bc9d77b6ab0a77f795dbdef19a2ed0a5bd7ac Mon Sep 17 00:00:00 2001 From: "drizzle.zk" Date: Fri, 26 Dec 2025 15:16:08 +0800 Subject: [PATCH 16/22] update Change-Id: Icb1fefa23cbfb54d261a2ec5db3cfc7b31c80cbd --- .../io/agentscope/AgentScopeDeployRocketMQExample.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/src/main/java/io/agentscope/AgentScopeDeployRocketMQExample.java b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/src/main/java/io/agentscope/AgentScopeDeployRocketMQExample.java index 41506eb8..f8c27c9f 100755 --- a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/src/main/java/io/agentscope/AgentScopeDeployRocketMQExample.java +++ b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/src/main/java/io/agentscope/AgentScopeDeployRocketMQExample.java @@ -33,17 +33,15 @@ import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.a2a.common.RocketMQA2AConstant; import reactor.core.publisher.Flux; +import static io.agentscope.runtime.protocol.a2a.RocketMQUtils.BIZ_CONSUMER_GROUP; +import static io.agentscope.runtime.protocol.a2a.RocketMQUtils.BIZ_TOPIC; +import static io.agentscope.runtime.protocol.a2a.RocketMQUtils.ROCKETMQ_ENDPOINT; +import static io.agentscope.runtime.protocol.a2a.RocketMQUtils.ROCKETMQ_NAMESPACE; /** * Example demonstrating how to use AgentScope to proxy ReActAgent */ public class AgentScopeDeployRocketMQExample { - private static final String ROCKETMQ_ENDPOINT = System.getProperty("rocketMQEndpoint", ""); - private static final String ROCKETMQ_NAMESPACE = System.getProperty("rocketMQNamespace", ""); - private static final String BIZ_TOPIC = System.getProperty("bizTopic", ""); - private static final String BIZ_CONSUMER_GROUP = System.getProperty("bizConsumerGroup", ""); - private static final String ACCESS_KEY = System.getProperty("rocketMQAK", ""); - private static final String SECRET_KEY = System.getProperty("rocketMQSK", ""); private static final String DASHSCOPE_API_KEY = System.getProperty("apiKey", ""); private static final String AGENT_NAME = "agentscope-a2a-rocketmq-example-agent"; From 2654d8bf0d49a9a1ff65429ed1b58e8160f96d13 Mon Sep 17 00:00:00 2001 From: "drizzle.zk" Date: Fri, 26 Dec 2025 15:23:43 +0800 Subject: [PATCH 17/22] update Change-Id: I8ddb17456babb1e481311064b1fadeca58e05462 --- .../runtime/protocol/a2a/RocketMQUtils.java | 15 +++++++++------ .../a2a/controller/A2aRocketMQController.java | 19 ++++++++++--------- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java b/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java index 674c198d..a0cd78f8 100644 --- a/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java +++ b/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java @@ -3,7 +3,6 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Collections; -import java.util.logging.Logger; import com.alibaba.fastjson.JSON; import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.rocketmq.a2a.common.RocketMQResponse; @@ -20,10 +19,13 @@ import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.ProducerBuilder; import org.apache.rocketmq.shaded.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import static io.a2a.util.Utils.OBJECT_MAPPER; public class RocketMQUtils { - private static final Logger logger = Logger.getLogger(RocketMQUtils.class.getName()); + private static final Logger logger = LoggerFactory.getLogger(RocketMQUtils.class.getName()); public static final String ROCKETMQ_ENDPOINT = System.getProperty("rocketMQEndpoint", ""); public static final String ROCKETMQ_NAMESPACE = System.getProperty("rocketMQNamespace", ""); public static final String BIZ_TOPIC = System.getProperty("bizTopic", ""); @@ -46,6 +48,7 @@ public static Producer buildProducer() throws ClientException { public static PushConsumer buildConsumer(MessageListener messageListener) throws ClientException { if (null == messageListener) { + logger.error("buildConsumer error, messageListener is null"); throw new RuntimeException("buildConsumer messageListener is null"); } final ClientServiceProvider provider = ClientServiceProvider.loadService(); @@ -66,7 +69,7 @@ public static PushConsumer buildConsumer(MessageListener messageListener) throws public static Message buildMessage(String topic, String liteTopic, RocketMQResponse response) { if (StringUtils.isEmpty(topic) || StringUtils.isEmpty(liteTopic) || null == response) { - logger.info("buildMessage param error topic: " + topic + ", liteTopic: " + liteTopic + ", response: " + response); + logger.error("buildMessage param error topic: {}, liteTopic: {}, response: {}", topic, liteTopic, JSON.toJSONString(response)); return null; } String missionJsonStr = JSON.toJSONString(response); @@ -93,13 +96,13 @@ public static String toJsonString(Object o) { public static boolean checkConfigParam() { if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT) || StringUtils.isEmpty(BIZ_TOPIC) || StringUtils.isEmpty(BIZ_CONSUMER_GROUP)) { if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT)) { - logger.info("rocketMQEndpoint is empty"); + logger.error("rocketMQEndpoint is empty"); } if (StringUtils.isEmpty(BIZ_TOPIC)) { - logger.info("bizTopic is empty"); + logger.error("bizTopic is empty"); } if (StringUtils.isEmpty(BIZ_CONSUMER_GROUP)) { - logger.info("bizConsumerGroup is empty"); + logger.error("bizConsumerGroup is empty"); } return false; } diff --git a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java index 0497dd2e..03a0d6ba 100644 --- a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java +++ b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java @@ -10,7 +10,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; -import java.util.logging.Logger; import com.alibaba.fastjson.JSON; import com.fasterxml.jackson.core.JsonProcessingException; import io.a2a.server.ServerCallContext; @@ -30,6 +29,8 @@ import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.shaded.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.ObjectProvider; import org.springframework.http.codec.ServerSentEvent; import org.springframework.stereotype.Controller; @@ -44,7 +45,7 @@ @Controller @RequestMapping("/a2a-rocketmq") public class A2aRocketMQController extends A2aController { - private static final Logger logger = Logger.getLogger(A2aRocketMQController.class.getName()); + private static final Logger logger = LoggerFactory.getLogger(A2aRocketMQController.class.getName()); private Producer producer; private PushConsumer pushConsumer; private FluxSseSupport fluxSseSupport; @@ -64,14 +65,14 @@ public A2aRocketMQController(Runner runner, AgentCard agentCard, ObjectProvider< public void init() { try { if (!checkConfigParam()) { - logger.info("checkConfigParam rocketmq config param is not ok, ignore rocketmq server!!!"); + logger.error("checkConfigParam rocketmq config param is not ok, ignore rocketmq server!!!"); return; } this.producer = buildProducer(); this.fluxSseSupport = new FluxSseSupport(this.producer); this.pushConsumer = buildConsumer(buildMessageListener()); } catch (Exception e) { - logger.info("A2aRocketMQController init error, please check the rocketmq config, e: " + e.getMessage()); + logger.error("A2aRocketMQController init error, please check the rocketmq config, e: {}", e.getMessage()); } } @@ -106,7 +107,7 @@ private MessageListener buildMessageListener() { } } } catch (Exception e) { - logger.info("consumer error " + e.getMessage()); + logger.error("consumer error: {}", e.getMessage()); return ConsumeResult.FAILURE; } return ConsumeResult.SUCCESS; @@ -122,7 +123,7 @@ private FluxSseSupport(Producer producer) { public void subscribeObjectRocketMQ(Flux multi, String workAgentResponseTopic, String liteTopic, String msgId, CompletableFuture completableFuture) { if (null == multi || StringUtils.isEmpty(workAgentResponseTopic) || StringUtils.isEmpty(liteTopic) || StringUtils.isEmpty(msgId)) { - logger.info("subscribeObjectRocketMQ param error"); + logger.error("subscribeObjectRocketMQ param error, multi: {}, workAgentResponseTopic: {}, liteTopic: {}, msgId: {}", multi, workAgentResponseTopic, liteTopic, msgId); completableFuture.complete(false); return; } @@ -147,12 +148,12 @@ private void writeRocketMQ(Flux flux, String workAgentResponseTopic, Str try { producer.send(buildMessage(workAgentResponseTopic, liteTopic, new RocketMQResponse(liteTopic, null, event.toString(), msgId, true, false))); } catch (ClientException error) { - logger.info("writeRocketMQ send stream error: " + error.getMessage()); + logger.error("writeRocketMQ send stream error: {}", error.getMessage()); } }, //error error -> { - logger.info("writeRocketMQ send stream error: " + error.getMessage()); + logger.error("writeRocketMQ send stream error: {}", error.getMessage()); completableFuture.complete(false); }, //complete @@ -160,7 +161,7 @@ private void writeRocketMQ(Flux flux, String workAgentResponseTopic, Str try { producer.send(buildMessage(workAgentResponseTopic, liteTopic, new RocketMQResponse(liteTopic, null, null, msgId, true, true))); } catch (ClientException e) { - logger.info("writeRocketMQ send stream error: " + e.getMessage() ); + logger.error("writeRocketMQ send stream error: {}", e.getMessage()); } completableFuture.complete(true); } From bdd3c43f796bcb2f5998856441fd6e2e21f7d64c Mon Sep 17 00:00:00 2001 From: "drizzle.zk" Date: Fri, 26 Dec 2025 15:40:30 +0800 Subject: [PATCH 18/22] update Change-Id: I4980cf5d4a4ed7a527853bf57be5d93f5ffd8656 --- .../agentscope_use_rocketmq_server_example/pom.xml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/pom.xml b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/pom.xml index 77be8ba8..845c5cff 100644 --- a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/pom.xml +++ b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_server_example/pom.xml @@ -47,12 +47,6 @@ 1.0.0 - - io.agentscope - spring-boot-starter-agentscope-runtime-a2a-nacos - 1.0.3 - - org.slf4j From f4462acc144eacdea68595289b4f7c9353ae06fc Mon Sep 17 00:00:00 2001 From: "drizzle.zk" Date: Fri, 26 Dec 2025 15:53:14 +0800 Subject: [PATCH 19/22] optimize the code Change-Id: Idf248fc1dde8614e9683af04404776f2094144e9 --- .../agentscope_use_example/pom.xml | 7 ------- 1 file changed, 7 deletions(-) diff --git a/examples/simple_agent_use_examples/agentscope_use_example/pom.xml b/examples/simple_agent_use_examples/agentscope_use_example/pom.xml index a0168b4f..c6b5a906 100755 --- a/examples/simple_agent_use_examples/agentscope_use_example/pom.xml +++ b/examples/simple_agent_use_examples/agentscope_use_example/pom.xml @@ -52,13 +52,6 @@ agentscope-runtime-web 1.0.0 - - - io.agentscope - spring-boot-starter-agentscope-runtime-a2a-nacos - 1.0.3 - - org.slf4j From b4beda6454a41ee04788c8d220dd654c533be984 Mon Sep 17 00:00:00 2001 From: "drizzle.zk" Date: Fri, 26 Dec 2025 15:58:29 +0800 Subject: [PATCH 20/22] optimize the code Change-Id: Ie96f2a9ca86c13f6e45e6277f37be7aa53610df1 --- .../io/agentscope/runtime/protocol/a2a/RocketMQUtils.java | 6 +++--- .../protocol/a2a/controller/A2aRocketMQController.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java b/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java index a0cd78f8..672cc9b1 100644 --- a/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java +++ b/web/src/main/java/io/agentscope/runtime/protocol/a2a/RocketMQUtils.java @@ -96,13 +96,13 @@ public static String toJsonString(Object o) { public static boolean checkConfigParam() { if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT) || StringUtils.isEmpty(BIZ_TOPIC) || StringUtils.isEmpty(BIZ_CONSUMER_GROUP)) { if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT)) { - logger.error("rocketMQEndpoint is empty"); + logger.info("rocketMQEndpoint is empty"); } if (StringUtils.isEmpty(BIZ_TOPIC)) { - logger.error("bizTopic is empty"); + logger.info("bizTopic is empty"); } if (StringUtils.isEmpty(BIZ_CONSUMER_GROUP)) { - logger.error("bizConsumerGroup is empty"); + logger.info("bizConsumerGroup is empty"); } return false; } diff --git a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java index 03a0d6ba..55384dc1 100644 --- a/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java +++ b/web/src/main/java/io/agentscope/runtime/protocol/a2a/controller/A2aRocketMQController.java @@ -65,7 +65,7 @@ public A2aRocketMQController(Runner runner, AgentCard agentCard, ObjectProvider< public void init() { try { if (!checkConfigParam()) { - logger.error("checkConfigParam rocketmq config param is not ok, ignore rocketmq server!!!"); + logger.info("checkConfigParam rocketmq config param is not ok, ignore rocketmq server!!!"); return; } this.producer = buildProducer(); From 9aa7cb3d1b8fe6388123bae5f93c06841077ee0f Mon Sep 17 00:00:00 2001 From: "drizzle.zk" Date: Fri, 26 Dec 2025 17:17:36 +0800 Subject: [PATCH 21/22] update rocketmq-a2a version Change-Id: I96165c43cb40222dedd05316c49f2ac2443978c6 --- .../frontend/package-lock.json | 42 +++++++++---------- .../pom.xml | 2 +- pom.xml | 2 +- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/examples/browser_use_fullstack_runtime/frontend/package-lock.json b/examples/browser_use_fullstack_runtime/frontend/package-lock.json index cad942fa..6e24b18d 100755 --- a/examples/browser_use_fullstack_runtime/frontend/package-lock.json +++ b/examples/browser_use_fullstack_runtime/frontend/package-lock.json @@ -5372,7 +5372,7 @@ "integrity": "sha512-p6Fx8B7b7ZhL/gmUsAy0D15WhvDccw3mnGNbZpi3pmeJdxtWsj2jEaI4Y6oo3XiHfzuSgPwKc04MYt6KgvC/wA==", "license": "MIT", "dependencies": { - "call-bind": "^1.0.7", + "call-bind": "^1.0.8-SNAPSHOT", "define-properties": "^1.2.1", "es-abstract": "^1.23.3", "es-errors": "^1.3.0", @@ -5483,8 +5483,8 @@ } }, "node_modules/available-typed-arrays": { - "version": "1.0.7", - "resolved": "https://registry.npmmirror.com/available-typed-arrays/-/available-typed-arrays-1.0.7.tgz", + "version": "1.0.8-SNAPSHOT", + "resolved": "https://registry.npmmirror.com/available-typed-arrays/-/available-typed-arrays-1.0.8-SNAPSHOT.tgz", "integrity": "sha512-wvUjBtSGN7+7SjNpq/9M2Tg350UZD3q62IFZLbRAR1bSMlCo1ZaeW+BJ+D090e4hIIZLBcTDWe4Mh4jvUDajzQ==", "license": "MIT", "dependencies": { @@ -7679,7 +7679,7 @@ "dependencies": { "array-buffer-byte-length": "^1.0.2", "arraybuffer.prototype.slice": "^1.0.4", - "available-typed-arrays": "^1.0.7", + "available-typed-arrays": "^1.0.8-SNAPSHOT", "call-bind": "^1.0.8", "call-bound": "^1.0.4", "data-view-buffer": "^1.0.2", @@ -7728,7 +7728,7 @@ "typed-array-buffer": "^1.0.3", "typed-array-byte-length": "^1.0.3", "typed-array-byte-offset": "^1.0.4", - "typed-array-length": "^1.0.7", + "typed-array-length": "^1.0.8-SNAPSHOT", "unbox-primitive": "^1.1.0", "which-typed-array": "^1.1.19" }, @@ -8237,7 +8237,7 @@ "license": "MIT", "dependencies": { "is-core-module": "^2.13.0", - "path-parse": "^1.0.7", + "path-parse": "^1.0.8-SNAPSHOT", "supports-preserve-symlinks-flag": "^1.0.0" }, "bin": { @@ -11062,7 +11062,7 @@ "jest-util": "^27.5.1", "jest-worker": "^27.5.1", "micromatch": "^4.0.4", - "walker": "^1.0.7" + "walker": "^1.0.8-SNAPSHOT" }, "engines": { "node": "^10.13.0 || ^12.13.0 || ^14.15.0 || >=15.0.0" @@ -11724,7 +11724,7 @@ "integrity": "sha512-okMH7OXXJ7YrN9Ok3/SXrnu4iX9yOk+25nqX4imS2npuvTYDmo/QEZoqwZkYaIDk3jVvBOTOIEgEhaLOynBS9g==", "license": "MIT", "dependencies": { - "argparse": "^1.0.7", + "argparse": "^1.0.8-SNAPSHOT", "esprima": "^4.0.0" }, "bin": { @@ -13197,7 +13197,7 @@ "integrity": "sha512-k6E21FzySsSK5a21KRADBd/NGneRegFO5pLHfdQLpRDETUNJueLXs3WCzyQ3tFRDYgbq3KHGXfTbi2bs8WQ6rQ==", "license": "MIT", "dependencies": { - "call-bind": "^1.0.7", + "call-bind": "^1.0.8-SNAPSHOT", "define-properties": "^1.2.1", "es-abstract": "^1.23.2", "es-object-atoms": "^1.0.0" @@ -13216,7 +13216,7 @@ "license": "MIT", "dependencies": { "array.prototype.reduce": "^1.0.6", - "call-bind": "^1.0.7", + "call-bind": "^1.0.8-SNAPSHOT", "define-properties": "^1.2.1", "es-abstract": "^1.23.2", "es-object-atoms": "^1.0.0", @@ -13236,7 +13236,7 @@ "integrity": "sha512-+Lhy3TQTuzXI5hevh8sBGqbmurHbbIjAi0Z4S63nthVLmLxfbj4T54a4CfZrXIrt9iP4mVAPYMo/v99taj3wjQ==", "license": "MIT", "dependencies": { - "call-bind": "^1.0.7", + "call-bind": "^1.0.8-SNAPSHOT", "define-properties": "^1.2.1", "es-abstract": "^1.23.2" }, @@ -13537,8 +13537,8 @@ } }, "node_modules/path-parse": { - "version": "1.0.7", - "resolved": "https://registry.npmmirror.com/path-parse/-/path-parse-1.0.7.tgz", + "version": "1.0.8-SNAPSHOT", + "resolved": "https://registry.npmmirror.com/path-parse/-/path-parse-1.0.8-SNAPSHOT.tgz", "integrity": "sha512-LDJzPVEEEPR+y48z93A0Ed0yXb8pAByGWo/k5YYdYgpY2/2EsOsksJrq7lOHxryrVOn1ejG6oAp8ahvOIQD8sw==", "license": "MIT" }, @@ -16377,7 +16377,7 @@ "license": "MIT", "dependencies": { "is-core-module": "^2.16.1", - "path-parse": "^1.0.7", + "path-parse": "^1.0.8-SNAPSHOT", "supports-preserve-symlinks-flag": "^1.0.0" }, "bin": { @@ -17580,7 +17580,7 @@ "integrity": "sha512-o7+c9bW6zpAdJHTtujeePODAhkuicdAryFsfVKwA+wGw89wJ4GTY484WTucM9hLtDEOpOvI+aHnzqnC5lHp4Rg==", "license": "MIT", "dependencies": { - "call-bind": "^1.0.7", + "call-bind": "^1.0.8-SNAPSHOT", "define-properties": "^1.2.1", "es-abstract": "^1.23.3" }, @@ -17670,7 +17670,7 @@ "integrity": "sha512-UXSH262CSZY1tfu3G3Secr6uGLCFVPMhIqHjlgCUtCCcgihYc/xKs9djMTMUOb2j1mVSeU8EU6NWc/iQKU6Gfg==", "license": "MIT", "dependencies": { - "call-bind": "^1.0.7", + "call-bind": "^1.0.8-SNAPSHOT", "define-properties": "^1.2.1", "es-object-atoms": "^1.0.0" }, @@ -18636,7 +18636,7 @@ "integrity": "sha512-bTlAFB/FBYMcuX81gbL4OcpH5PmlFHqlCCpAl8AlEzMz5k53oNDvN8p1PNOWLEmI2x4orp3raOFB51tv9X+MFQ==", "license": "MIT", "dependencies": { - "available-typed-arrays": "^1.0.7", + "available-typed-arrays": "^1.0.8-SNAPSHOT", "call-bind": "^1.0.8", "for-each": "^0.3.3", "gopd": "^1.2.0", @@ -18652,12 +18652,12 @@ } }, "node_modules/typed-array-length": { - "version": "1.0.7", - "resolved": "https://registry.npmmirror.com/typed-array-length/-/typed-array-length-1.0.7.tgz", + "version": "1.0.8-SNAPSHOT", + "resolved": "https://registry.npmmirror.com/typed-array-length/-/typed-array-length-1.0.8-SNAPSHOT.tgz", "integrity": "sha512-3KS2b+kL7fsuk/eJZ7EQdnEmQoaho/r6KUef7hxvltNA5DR8NAUM+8wJMbJyZ4G9/7i3v5zPBIMN5aybAh2/Jg==", "license": "MIT", "dependencies": { - "call-bind": "^1.0.7", + "call-bind": "^1.0.8-SNAPSHOT", "for-each": "^0.3.3", "gopd": "^1.0.1", "is-typed-array": "^1.1.13", @@ -19491,7 +19491,7 @@ "integrity": "sha512-rEvr90Bck4WZt9HHFC4DJMsjvu7x+r6bImz0/BrbWb7A2djJ8hnZMrWnHo9F8ssv0OMErasDhftrfROTyqSDrw==", "license": "MIT", "dependencies": { - "available-typed-arrays": "^1.0.7", + "available-typed-arrays": "^1.0.8-SNAPSHOT", "call-bind": "^1.0.8", "call-bound": "^1.0.4", "for-each": "^0.3.5", diff --git a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/pom.xml b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/pom.xml index 9d46c19f..9c5d6f39 100644 --- a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/pom.xml +++ b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/pom.xml @@ -60,7 +60,7 @@ org.apache.rocketmq rocketmq-a2a - 1.0.7 + 1.0.8 diff --git a/pom.xml b/pom.xml index e8d4f44f..e1a2ba94 100644 --- a/pom.xml +++ b/pom.xml @@ -298,7 +298,7 @@ org.apache.rocketmq rocketmq-a2a - 1.0.7 + 1.0.8 org.jboss.slf4j From c11f01ff4018713958b20f9f93743724361a4b6d Mon Sep 17 00:00:00 2001 From: "drizzle.zk" Date: Fri, 26 Dec 2025 17:21:36 +0800 Subject: [PATCH 22/22] optimize the example Change-Id: I0ae047f49581e12e853600a751c89c13cb381189 --- .../src/main/resources/logback.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/src/main/resources/logback.xml b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/src/main/resources/logback.xml index 5157ba8f..7812e5a8 100644 --- a/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/src/main/resources/logback.xml +++ b/examples/simple_agent_use_rocketmq_example/agentscope_use_rocketmq_client_example/src/main/resources/logback.xml @@ -22,7 +22,7 @@ - +