Skip to content

Commit 38fbbe3

Browse files
zk-drizzledrizzle.zkchickenlj
authored
Main add rocketmqa2a component (#990)
## AgentScope-Java Version [The version of AgentScope-Java you are working on, e.g. 1.0.9, check your pom.xml dependency version or run `mvn dependency:tree | grep agentscope-parent:pom`(only mac/linux)] ## Description add the rocketmq a2a component ## Checklist Please check the following items before code is ready to be reviewed. - [ ] Code has been formatted with `mvn spotless:apply` - [ ] All tests are passing (`mvn test`) - [ ] Javadoc comments are complete and follow project conventions - [ ] Related documentation has been updated (e.g. links, examples, etc.) - [ ] Code is ready for review --------- Co-authored-by: drizzle.zk <drizzle.zk@alibaba-inc.com> Co-authored-by: Ken Liu <ken.lj.hz@gmail.com>
1 parent cef7546 commit 38fbbe3

File tree

26 files changed

+2280
-6
lines changed

26 files changed

+2280
-6
lines changed

agentscope-distribution/agentscope-all/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,14 @@
228228
<optional>true</optional>
229229
</dependency>
230230

231+
<dependency>
232+
<groupId>io.agentscope</groupId>
233+
<artifactId>agentscope-extensions-rocketmq</artifactId>
234+
<scope>compile</scope>
235+
<optional>true</optional>
236+
</dependency>
237+
238+
231239
<!-- Model Context Protocol (MCP) SDK -->
232240
<dependency>
233241
<groupId>io.modelcontextprotocol.sdk</groupId>

agentscope-distribution/agentscope-bom/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,13 @@
269269
<version>${project.version}</version>
270270
</dependency>
271271

272+
<!-- AgentScope RocketMQ Extension -->
273+
<dependency>
274+
<groupId>io.agentscope</groupId>
275+
<artifactId>agentscope-extensions-rocketmq</artifactId>
276+
<version>${project.version}</version>
277+
</dependency>
278+
272279
<!-- AgentScope Nacos Extension -->
273280
<dependency>
274281
<groupId>io.agentscope</groupId>
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
# Quick Start
2+
3+
本案例展示了一个基于 [Apache RocketMQ](http://rocketmq.apache.org/) Lite 版本与 [AgentScope](https://github.com/agentscope-ai) 集成的演示案例,通过消息中间件实现跨 Agent
4+
的解耦通信与事件驱动交互,支持高并发、低延迟的分布式智能体协作场景。
5+
6+
## 基本的前期准备工作
7+
8+
### 1. 部署 Apache RocketMQ
9+
10+
部署 [Apache RocketMQ](http://rocketmq.apache.org/) 的 LiteTopic 版本,或购买支持 LiteTopic 的 RocketMQ 商业版实例,并创建以下资源:
11+
12+
- **1.1** 创建接收响应请求的轻量级Topic:`WorkerAgentResponse`(agentscope-client用于接收响应结果)
13+
- **1.2** 创建与`WorkerAgentResponse` 绑定的轻量级消费者ID:`CID_HOST_AGENT_LITE`(agentscope-client中用于订阅`WorkerAgentResponse`)
14+
- **1.3** 创建普通Topic:`LLM_TOPIC`(agentscope-server用于接收请求)
15+
- **1.4** 创建普通消费者CID:`LLM_CID`(agentscope-server中用于订阅`LLM_TOPIC`)
16+
- **1.5** 创建接收定向请求的轻量级Topic:`WorkerAgentResponseServer`(agentscope-server用于接收定向请求)
17+
- **1.6** 创建与`WorkerAgentResponseServer` 绑定的轻量级消费者ID:`CID_HOST_AGENT_LITE_SERVER`(agentscope-server中用于订阅`WorkerAgentResponseServer`)
18+
19+
### 2. 获取大模型服务
20+
21+
1. 进入阿里云百炼平台
22+
2. 获取对应调用服务的apiKey
23+
24+
## 运行环境
25+
26+
- JDK 17 及以上
27+
- [Maven](http://maven.apache.org/) 3.9 及以上
28+
29+
## 代码打包与示例运行
30+
31+
### 1. 基本参数介绍
32+
33+
| 参数名称 | 基本介绍 | 是否必填 |
34+
|--------------------------|------------------|------|
35+
| rocketMQEndpoint | rocketmq服务接入点 ||
36+
| rocketMQNamespace | rocketmq命名空间 ||
37+
| bizTopic | 普通Topic ||
38+
| bizConsumerGroup | 普通消费者CID ||
39+
| rocketMQAK | rocketmq账号 ||
40+
| rocketMQSK | rocketmq密码 ||
41+
| apiKey | 百炼平台调用apiKey ||
42+
| workAgentResponseTopic | LiteTopic ||
43+
| workAgentResponseGroupID | LiteConsumer CID ||
44+
45+
### 2.配置agentScope-server端参数
46+
修改application.yml文件中配置参数
47+
```yaml
48+
# Server configuration
49+
server:
50+
port: 8080
51+
# Spring application settings
52+
spring:
53+
application:
54+
name: agentscope-rocketmq-a2a-demo
55+
# AgentScope configuration
56+
agentscope:
57+
# LLM provider API configuration
58+
dashscope:
59+
api-key: ${apiKey:} # DashScope API key for model access
60+
# Agent identity settings
61+
agent:
62+
name: my-assistant
63+
modelName: qwen-plus
64+
# A2A (Agent-to-Agent) protocol configuration
65+
a2a:
66+
server:
67+
enabled: true
68+
card:
69+
name: My Assistant
70+
description: An intelligent assistant based on AgentScope
71+
transports:
72+
# RocketMQ transport for asynchronous message communication
73+
rocketmq:
74+
enabled: true
75+
rocketMQEndpoint: ${rocketMQEndpoint:} # RocketMQ Server endpoint
76+
rocketMQNamespace: ${rocketMQNamespace:} # RocketMQ namespace isolation
77+
biz-topic: LLM_TOPIC # Business message topic name
78+
biz-consumer-group: LLM_CID # Consumer group for business messages
79+
workAgentResponseTopic: WorkerAgentResponseServer # Agent response topic name (lightweight topic)
80+
workAgentResponseGroupId: CID_HOST_AGENT_LITE_SERVER # Consumer group for agent responses
81+
accessKey: ${rocketMQAK:} # Authentication access key
82+
secret-key: ${rocketMQSK:} # Authentication secret key
83+
# JSON-RPC transport for synchronous HTTP communication
84+
jsonrpc:
85+
enabled: true
86+
```
87+
88+
### 3. 编译打包
89+
```shell
90+
mvn clean package -Dmaven.test.skip=true -Dcheckstyle.skip=true
91+
```
92+
以下2个Agent进程建议分别在不同的窗口中运行
93+
94+
### 4.运行agentscope-server
95+
```shell
96+
cd agentscope-server/target
97+
```
98+
```shell
99+
java -jar agentscope-server.jar
100+
```
101+
102+
### 5.运行agentscope-client
103+
104+
```shell
105+
cd agentscope-client/target
106+
```
107+
```shell
108+
java -DrocketMQNamespace= -DworkAgentResponseTopic=WorkerAgentResponse -DworkAgentResponseGroupID=CID_HOST_AGENT_LITE -DrocketMQAK= -DrocketMQSK= -jar agentscope-client-jar-with-dependencies.jar
109+
```
110+
111+
### 6.开始体验
112+
启动后可在终端与agentscope构建的agent进行会话
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Copyright 2024-2026 the original author or authors.
4+
~
5+
~ Licensed under the Apache License, Version 2.0 (the "License");
6+
~ you may not use this file except in compliance with the License.
7+
~ You may obtain a copy of the License at
8+
~
9+
~ http://www.apache.org/licenses/LICENSE-2.0
10+
~
11+
~ Unless required by applicable law or agreed to in writing, software
12+
~ distributed under the License is distributed on an "AS IS" BASIS,
13+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
~ See the License for the specific language governing permissions and
15+
~ limitations under the License.
16+
-->
17+
<project xmlns="http://maven.apache.org/POM/4.0.0"
18+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
19+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<modelVersion>4.0.0</modelVersion>
21+
<parent>
22+
<groupId>io.agentscope.examples</groupId>
23+
<artifactId>a2a-rocketmq</artifactId>
24+
<version>${revision}</version>
25+
</parent>
26+
27+
<artifactId>agentscope-client</artifactId>
28+
29+
<properties>
30+
<maven.compiler.source>17</maven.compiler.source>
31+
<maven.compiler.target>17</maven.compiler.target>
32+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
33+
</properties>
34+
35+
<dependencies>
36+
<dependency>
37+
<groupId>org.apache.rocketmq</groupId>
38+
<artifactId>rocketmq-a2a</artifactId>
39+
</dependency>
40+
<dependency>
41+
<groupId>io.agentscope</groupId>
42+
<artifactId>agentscope-extensions-a2a-client</artifactId>
43+
</dependency>
44+
<dependency>
45+
<groupId>io.agentscope</groupId>
46+
<artifactId>agentscope-core</artifactId>
47+
</dependency>
48+
<dependency>
49+
<groupId>ch.qos.logback</groupId>
50+
<artifactId>logback-classic</artifactId>
51+
</dependency>
52+
</dependencies>
53+
<build>
54+
<plugins>
55+
<plugin>
56+
<groupId>org.apache.maven.plugins</groupId>
57+
<artifactId>maven-compiler-plugin</artifactId>
58+
<configuration>
59+
<source>17</source>
60+
<target>17</target>
61+
</configuration>
62+
</plugin>
63+
<plugin>
64+
<groupId>org.apache.maven.plugins</groupId>
65+
<artifactId>maven-assembly-plugin</artifactId>
66+
<version>3.6.0</version>
67+
<configuration>
68+
<finalName>agentscope-client</finalName>
69+
<archive>
70+
<manifest>
71+
<mainClass>io.agentscope.examples.a2a.rocketmq.client.A2aAgentCallerExample</mainClass>
72+
</manifest>
73+
</archive>
74+
<descriptorRefs>
75+
<descriptorRef>jar-with-dependencies</descriptorRef>
76+
</descriptorRefs>
77+
</configuration>
78+
<executions>
79+
<execution>
80+
<id>make-assembly</id>
81+
<phase>package</phase>
82+
<goals>
83+
<goal>single</goal>
84+
</goals>
85+
</execution>
86+
</executions>
87+
</plugin>
88+
</plugins>
89+
</build>
90+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Copyright 2024-2026 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.agentscope.examples.a2a.rocketmq.client;
18+
19+
import io.a2a.client.http.JdkA2AHttpClient;
20+
import io.agentscope.core.a2a.agent.A2aAgent;
21+
import io.agentscope.core.a2a.agent.A2aAgentConfig;
22+
import io.agentscope.core.a2a.agent.A2aAgentConfig.A2aAgentConfigBuilder;
23+
import io.agentscope.core.a2a.agent.card.WellKnownAgentCardResolver;
24+
import io.agentscope.core.message.Msg;
25+
import io.agentscope.core.message.MsgRole;
26+
import io.agentscope.core.message.TextBlock;
27+
import java.io.BufferedReader;
28+
import java.io.IOException;
29+
import java.io.InputStreamReader;
30+
import org.apache.rocketmq.a2a.transport.config.RocketMQTransportConfig;
31+
import org.apache.rocketmq.a2a.transport.impl.RocketMQTransport;
32+
import reactor.core.publisher.Flux;
33+
34+
/**
35+
* A command-line example demonstrating how to interact with an A2A agent via RocketMQ.
36+
* Supports streaming responses and runs as an interactive chat interface.
37+
*/
38+
public class A2aAgentCallerExample {
39+
/**
40+
* ANSI color codes for terminal output
41+
*/
42+
private static final String USER_INPUT_PREFIX =
43+
"\u001B[34mYou>\u001B[0m "; // Blue prefix for user input
44+
45+
private static final String AGENT_RESPONSE_PREFIX =
46+
"\u001B[32mAgent>\u001B[0m "; // Green prefix for agent response
47+
private static final String TARGET_SERVER_BASE_URL = "http://localhost:8080";
48+
49+
/**
50+
* The access key for authenticating with the RocketMQ service.
51+
*/
52+
private static final String ACCESS_KEY = System.getProperty("rocketMQAK");
53+
54+
/**
55+
* The secret key for authenticating with the RocketMQ service.
56+
*/
57+
private static final String SECRET_KEY = System.getProperty("rocketMQSK");
58+
59+
/**
60+
* The dedicated topic for receiving reply messages from the target agent(Typically, a lightweight Topic).
61+
*/
62+
private static final String WORK_AGENT_RESPONSE_TOPIC =
63+
System.getProperty("workAgentResponseTopic");
64+
65+
/**
66+
* The consumer group ID used when subscribing to the {@link #WORK_AGENT_RESPONSE_TOPIC}.
67+
*/
68+
private static final String WORK_AGENT_RESPONSE_GROUP_ID =
69+
System.getProperty("workAgentResponseGroupID");
70+
71+
/**
72+
* The namespace used for logical isolation of RocketMQ resources.
73+
*/
74+
private static final String ROCKETMQ_NAMESPACE = System.getProperty("rocketMQNamespace");
75+
76+
/**
77+
* Logical name of this agent instance.
78+
*/
79+
private static final String AGENT_NAME = "agentscope-a2a-rocketmq-example-agent";
80+
81+
/**
82+
* Main entry point.
83+
* Sets up the agent with RocketMQ-based transport and starts the interactive loop.
84+
*/
85+
public static void main(String[] args) {
86+
RocketMQTransportConfig rocketMQTransportConfig = new RocketMQTransportConfig();
87+
rocketMQTransportConfig.setAccessKey(ACCESS_KEY);
88+
rocketMQTransportConfig.setSecretKey(SECRET_KEY);
89+
rocketMQTransportConfig.setWorkAgentResponseTopic(WORK_AGENT_RESPONSE_TOPIC);
90+
rocketMQTransportConfig.setWorkAgentResponseGroupID(WORK_AGENT_RESPONSE_GROUP_ID);
91+
rocketMQTransportConfig.setNamespace(ROCKETMQ_NAMESPACE);
92+
rocketMQTransportConfig.setHttpClient(new JdkA2AHttpClient());
93+
A2aAgentConfig a2aAgentConfig =
94+
new A2aAgentConfigBuilder()
95+
.withTransport(RocketMQTransport.class, rocketMQTransportConfig)
96+
.build();
97+
A2aAgent agent =
98+
A2aAgent.builder()
99+
.a2aAgentConfig(a2aAgentConfig)
100+
.name(AGENT_NAME)
101+
.agentCardResolver(
102+
WellKnownAgentCardResolver.builder()
103+
.baseUrl(TARGET_SERVER_BASE_URL)
104+
.build())
105+
.build();
106+
startExample(agent);
107+
}
108+
109+
/**
110+
* Starts an interactive CLI loop where the user can send messages to the agent.
111+
* Type 'exit' or 'quit' to terminate.
112+
*
113+
* @param agent the A2A agent to communicate with
114+
*/
115+
private static void startExample(A2aAgent agent) {
116+
try (BufferedReader reader = new BufferedReader(new InputStreamReader(System.in))) {
117+
while (true) {
118+
System.out.print(USER_INPUT_PREFIX);
119+
String input = reader.readLine();
120+
if (input == null
121+
|| input.trim().equalsIgnoreCase("exit")
122+
|| input.trim().equalsIgnoreCase("quit")) {
123+
System.out.println(AGENT_RESPONSE_PREFIX + "Bye!");
124+
break;
125+
}
126+
System.out.println(
127+
AGENT_RESPONSE_PREFIX + "I have received your question: " + input);
128+
System.out.print(AGENT_RESPONSE_PREFIX);
129+
processInput(agent, input).doOnNext(System.out::print).then().block();
130+
System.out.println();
131+
}
132+
} catch (IOException e) {
133+
System.err.println("input error: " + e.getMessage());
134+
}
135+
}
136+
137+
/**
138+
* Sends the user input to the agent and returns a reactive stream of response parts.
139+
* Filters only text content from the streaming events.
140+
*
141+
* @param agent the agent to call.
142+
* @param input the user's input message.
143+
* @return a Flux emitting incremental string parts of the response.
144+
*/
145+
private static Flux<String> processInput(A2aAgent agent, String input) {
146+
Msg msg =
147+
Msg.builder()
148+
.role(MsgRole.USER)
149+
.content(TextBlock.builder().text(input).build())
150+
.build();
151+
return agent.stream(msg)
152+
.map(
153+
event -> {
154+
if (event.isLast()) {
155+
return "";
156+
}
157+
Msg message = event.getMessage();
158+
StringBuilder partText = new StringBuilder();
159+
message.getContent().stream()
160+
.filter(block -> block instanceof TextBlock)
161+
.map(block -> (TextBlock) block)
162+
.forEach(block -> partText.append(block.getText()));
163+
return partText.toString();
164+
});
165+
}
166+
}

0 commit comments

Comments
 (0)