Skip to content

Commit 7e4ec93

Browse files
committed
Adding streaming send message and setPushNotificationConfig
Signed-off-by: Emmanuel Hugonnet <[email protected]>
1 parent ef62516 commit 7e4ec93

File tree

8 files changed

+395
-145
lines changed

8 files changed

+395
-145
lines changed

client/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@
4646
<artifactId>protobuf-java-util</artifactId>
4747
<type>jar</type>
4848
</dependency>
49+
<dependency>
50+
<groupId>org.slf4j</groupId>
51+
<artifactId>slf4j-jdk14</artifactId>
52+
<scope>test</scope>
53+
</dependency>
4954
</dependencies>
5055

5156
</project>
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package io.a2a.client.sse;
2+
3+
import static io.a2a.grpc.StreamResponse.PayloadCase.ARTIFACT_UPDATE;
4+
import static io.a2a.grpc.StreamResponse.PayloadCase.MSG;
5+
import static io.a2a.grpc.StreamResponse.PayloadCase.STATUS_UPDATE;
6+
import static io.a2a.grpc.StreamResponse.PayloadCase.TASK;
7+
8+
import java.util.concurrent.Future;
9+
import java.util.function.Consumer;
10+
import java.util.logging.Logger;
11+
12+
import com.google.protobuf.InvalidProtocolBufferException;
13+
import com.google.protobuf.util.JsonFormat;
14+
import io.a2a.grpc.StreamResponse;
15+
import io.a2a.grpc.utils.ProtoUtils;
16+
import io.a2a.spec.StreamingEventKind;
17+
import java.util.logging.Level;
18+
19+
public class JSONRestSSEEventListener {
20+
21+
private static final Logger log = Logger.getLogger(JSONRestSSEEventListener.class.getName());
22+
private final Consumer<StreamingEventKind> eventHandler;
23+
private final Consumer<Throwable> errorHandler;
24+
25+
public JSONRestSSEEventListener(Consumer<StreamingEventKind> eventHandler,
26+
Consumer<Throwable> errorHandler) {
27+
this.eventHandler = eventHandler;
28+
this.errorHandler = errorHandler;
29+
}
30+
31+
public void onMessage(String message, Future<Void> completableFuture) {
32+
try {
33+
io.a2a.grpc.StreamResponse.Builder builder = io.a2a.grpc.StreamResponse.newBuilder();
34+
JsonFormat.parser().merge(message, builder);
35+
handleMessage(builder.build(), completableFuture);
36+
} catch (InvalidProtocolBufferException e) {
37+
Logger.getLogger(JSONRestSSEEventListener.class.getName()).log(Level.SEVERE, null, e);
38+
errorHandler.accept(e);
39+
}
40+
}
41+
42+
public void onError(Throwable throwable, Future<Void> future) {
43+
if (errorHandler != null) {
44+
errorHandler.accept(throwable);
45+
}
46+
future.cancel(true); // close SSE channel
47+
}
48+
49+
private void handleMessage(StreamResponse response, Future<Void> future) {
50+
StreamingEventKind event;
51+
switch (response.getPayloadCase()) {
52+
case MSG:
53+
event = ProtoUtils.FromProto.message(response.getMsg());
54+
break;
55+
case TASK:
56+
event = ProtoUtils.FromProto.task(response.getTask());
57+
break;
58+
case STATUS_UPDATE:
59+
event = ProtoUtils.FromProto.taskStatusUpdateEvent(response.getStatusUpdate());
60+
break;
61+
case ARTIFACT_UPDATE:
62+
event = ProtoUtils.FromProto.taskArtifactUpdateEvent(response.getArtifactUpdate());
63+
break;
64+
default:
65+
log.warning("Invalid stream response " + response.getPayloadCase());
66+
errorHandler.accept(new IllegalStateException("Invalid stream response from server: " + response.getPayloadCase()));
67+
return;
68+
}
69+
eventHandler.accept(event);
70+
}
71+
72+
}

client/src/main/java/io/a2a/client/transport/JSONRestTransport.java

Lines changed: 77 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
/*
2-
* Copyright The WildFly Authors
3-
* SPDX-License-Identifier: Apache-2.0
4-
*/
51
package io.a2a.client.transport;
62

73
import static io.a2a.util.Assert.checkNotNullParam;
@@ -13,7 +9,10 @@
139
import io.a2a.client.ClientCallContext;
1410
import io.a2a.client.ClientCallInterceptor;
1511
import io.a2a.client.PayloadAndHeaders;
12+
import io.a2a.client.sse.JSONRestSSEEventListener;
1613
import io.a2a.grpc.CancelTaskRequest;
14+
import io.a2a.grpc.CreateTaskPushNotificationConfigRequest;
15+
import io.a2a.grpc.GetTaskPushNotificationConfigRequest;
1716
import io.a2a.grpc.GetTaskRequest;
1817
import io.a2a.spec.TaskPushNotificationConfig;
1918
import io.a2a.http.A2AHttpClient;
@@ -31,9 +30,13 @@
3130
import io.a2a.spec.TaskIdParams;
3231
import io.a2a.spec.TaskQueryParams;
3332
import io.a2a.grpc.utils.ProtoUtils;
33+
import io.a2a.spec.SendStreamingMessageRequest;
34+
import io.a2a.spec.SetTaskPushNotificationConfigRequest;
3435
import java.io.IOException;
3536
import java.util.List;
3637
import java.util.Map;
38+
import java.util.concurrent.CompletableFuture;
39+
import java.util.concurrent.atomic.AtomicReference;
3740
import java.util.function.Consumer;
3841

3942
public class JSONRestTransport implements ClientTransport {
@@ -62,19 +65,11 @@ public JSONRestTransport(A2AHttpClient httpClient, AgentCard agentCard,
6265
@Override
6366
public EventKind sendMessage(MessageSendParams messageSendParams, ClientCallContext context) throws A2AClientException {
6467
checkNotNullParam("messageSendParams", messageSendParams);
65-
io.a2a.grpc.SendMessageRequest.Builder builder = io.a2a.grpc.SendMessageRequest.newBuilder();
66-
builder.setRequest(ProtoUtils.ToProto.message(messageSendParams.message()));
67-
if (messageSendParams.configuration() != null) {
68-
builder.setConfiguration(ProtoUtils.ToProto.messageSendConfiguration(messageSendParams.configuration()));
69-
}
70-
if (messageSendParams.metadata() != null) {
71-
builder.setMetadata(ProtoUtils.ToProto.struct(messageSendParams.metadata()));
72-
}
68+
io.a2a.grpc.SendMessageRequest.Builder builder = io.a2a.grpc.SendMessageRequest.newBuilder(ProtoUtils.ToProto.sendMessageRequest(messageSendParams));
7369
PayloadAndHeaders payloadAndHeaders = applyInterceptors(io.a2a.spec.SendMessageRequest.METHOD, builder.getRequestOrBuilder(),
7470
agentCard, context);
7571
try {
7672
String httpResponseBody = sendPostRequest(agentUrl + "/v1/message:send", payloadAndHeaders);
77-
System.out.println("Response " + httpResponseBody);
7873
io.a2a.grpc.SendMessageResponse.Builder responseBuilder = io.a2a.grpc.SendMessageResponse.newBuilder();
7974
JsonFormat.parser().merge(httpResponseBody, responseBuilder);
8075
if (responseBuilder.hasMsg()) {
@@ -89,8 +84,28 @@ public EventKind sendMessage(MessageSendParams messageSendParams, ClientCallCont
8984
}
9085

9186
@Override
92-
public void sendMessageStreaming(MessageSendParams request, Consumer<StreamingEventKind> eventConsumer, Consumer<Throwable> errorConsumer, ClientCallContext context) throws A2AClientException {
93-
throw new UnsupportedOperationException("Not supported yet."); // Generated from nbfs://nbhost/SystemFileSystem/Templates/Classes/Code/GeneratedMethodBody
87+
public void sendMessageStreaming(MessageSendParams messageSendParams, Consumer<StreamingEventKind> eventConsumer, Consumer<Throwable> errorConsumer, ClientCallContext context) throws A2AClientException {
88+
checkNotNullParam("request", messageSendParams);
89+
checkNotNullParam("eventConsumer", eventConsumer);
90+
checkNotNullParam("messageSendParams", messageSendParams);
91+
io.a2a.grpc.SendMessageRequest.Builder builder = io.a2a.grpc.SendMessageRequest.newBuilder(ProtoUtils.ToProto.sendMessageRequest(messageSendParams));
92+
PayloadAndHeaders payloadAndHeaders = applyInterceptors(SendStreamingMessageRequest.METHOD,
93+
builder, agentCard, context);
94+
AtomicReference<CompletableFuture<Void>> ref = new AtomicReference<>();
95+
JSONRestSSEEventListener sseEventListener = new JSONRestSSEEventListener(eventConsumer, errorConsumer);
96+
try {
97+
A2AHttpClient.PostBuilder postBuilder = createPostBuilder(agentUrl + "/v1/message:stream", payloadAndHeaders);
98+
ref.set(postBuilder.postAsyncSSE(
99+
msg -> sseEventListener.onMessage(msg, ref.get()),
100+
throwable -> sseEventListener.onError(throwable, ref.get()),
101+
() -> {
102+
// We don't need to do anything special on completion
103+
}));
104+
} catch (IOException e) {
105+
throw new A2AClientException("Failed to send streaming message request: " + e, e);
106+
} catch (InterruptedException e) {
107+
throw new A2AClientException("Send streaming message request timed out: " + e, e);
108+
}
94109
}
95110

96111
@Override
@@ -102,12 +117,11 @@ public Task getTask(TaskQueryParams taskQueryParams, ClientCallContext context)
102117
agentCard, context);
103118
try {
104119
String url;
105-
if(taskQueryParams.historyLength() != null) {
120+
if (taskQueryParams.historyLength() != null) {
106121
url = agentUrl + String.format("/v1/tasks/%1s?historyLength=%2d", taskQueryParams.id(), taskQueryParams.historyLength());
107122
} else {
108123
url = agentUrl + String.format("/v1/tasks/%1s", taskQueryParams.id());
109124
}
110-
System.out.println("Getting URL: " + url);
111125
A2AHttpClient.GetBuilder getBuilder = httpClient.createGet().url(url);
112126
if (payloadAndHeaders.getHttpHeaders() != null) {
113127
for (Map.Entry<String, String> entry : payloadAndHeaders.getHttpHeaders().entrySet()) {
@@ -120,7 +134,6 @@ public Task getTask(TaskQueryParams taskQueryParams, ClientCallContext context)
120134
throw new A2AClientException("Failed to send message: " + e, e);
121135
}
122136
String httpResponseBody = response.body();
123-
System.out.println("Response " + httpResponseBody);
124137
io.a2a.grpc.Task.Builder responseBuilder = io.a2a.grpc.Task.newBuilder();
125138
JsonFormat.parser().merge(httpResponseBody, responseBuilder);
126139
return ProtoUtils.FromProto.task(responseBuilder);
@@ -140,7 +153,6 @@ public Task cancelTask(TaskIdParams taskIdParams, ClientCallContext context) thr
140153
agentCard, context);
141154
try {
142155
String httpResponseBody = sendPostRequest(agentUrl + String.format("/v1/tasks/%1s:cancel", taskIdParams.id()), payloadAndHeaders);
143-
System.out.println("Response " + httpResponseBody);
144156
io.a2a.grpc.Task.Builder responseBuilder = io.a2a.grpc.Task.newBuilder();
145157
JsonFormat.parser().merge(httpResponseBody, responseBuilder);
146158
return ProtoUtils.FromProto.task(responseBuilder);
@@ -153,12 +165,55 @@ public Task cancelTask(TaskIdParams taskIdParams, ClientCallContext context) thr
153165

154166
@Override
155167
public TaskPushNotificationConfig setTaskPushNotificationConfiguration(TaskPushNotificationConfig request, ClientCallContext context) throws A2AClientException {
156-
throw new UnsupportedOperationException("Not supported yet."); // Generated from nbfs://nbhost/SystemFileSystem/Templates/Classes/Code/GeneratedMethodBody
168+
checkNotNullParam("request", request);
169+
CreateTaskPushNotificationConfigRequest.Builder builder = CreateTaskPushNotificationConfigRequest.newBuilder();
170+
builder.setConfig(ProtoUtils.ToProto.taskPushNotificationConfig(request))
171+
.setParent("tasks/" + request.taskId());
172+
if (request.pushNotificationConfig().id() != null) {
173+
builder.setConfigId(request.pushNotificationConfig().id());
174+
}
175+
PayloadAndHeaders payloadAndHeaders = applyInterceptors(SetTaskPushNotificationConfigRequest.METHOD, builder, agentCard, context);
176+
try {
177+
String httpResponseBody = sendPostRequest(agentUrl + String.format("/v1/tasks/%1s/pushNotificationConfigs", request.taskId()), payloadAndHeaders);
178+
io.a2a.grpc.TaskPushNotificationConfig.Builder reponseBuilder = io.a2a.grpc.TaskPushNotificationConfig.newBuilder();
179+
JsonFormat.parser().merge(httpResponseBody, reponseBuilder);
180+
return ProtoUtils.FromProto.taskPushNotificationConfig(reponseBuilder);
181+
} catch (A2AClientException e) {
182+
throw e;
183+
} catch (IOException | InterruptedException e) {
184+
throw new A2AClientException("Failed to set task push notification config: " + e, e);
185+
}
157186
}
158187

159188
@Override
160189
public TaskPushNotificationConfig getTaskPushNotificationConfiguration(GetTaskPushNotificationConfigParams request, ClientCallContext context) throws A2AClientException {
161-
throw new UnsupportedOperationException("Not supported yet."); // Generated from nbfs://nbhost/SystemFileSystem/Templates/Classes/Code/GeneratedMethodBody
190+
checkNotNullParam("request", request);
191+
GetTaskPushNotificationConfigRequest.Builder builder = GetTaskPushNotificationConfigRequest.newBuilder();
192+
builder.setName(String.format("/tasks/%1s/pushNotificationConfigs/%2s", request.id(), request.pushNotificationConfigId()));
193+
PayloadAndHeaders payloadAndHeaders = applyInterceptors(io.a2a.spec.SendMessageRequest.METHOD, builder,
194+
agentCard, context);
195+
try {
196+
String url = agentUrl + String.format("/v1/tasks/%1s/pushNotificationConfigs/%2s", request.id(), request.pushNotificationConfigId());
197+
A2AHttpClient.GetBuilder getBuilder = httpClient.createGet().url(url);
198+
if (payloadAndHeaders.getHttpHeaders() != null) {
199+
for (Map.Entry<String, String> entry : payloadAndHeaders.getHttpHeaders().entrySet()) {
200+
getBuilder.addHeader(entry.getKey(), entry.getValue());
201+
}
202+
}
203+
A2AHttpResponse response = getBuilder.get();
204+
if (!response.success()) {
205+
IOException e = new IOException("Request failed " + response.status());
206+
throw new A2AClientException("Failed to send message: " + e, e);
207+
}
208+
String httpResponseBody = response.body();
209+
io.a2a.grpc.TaskPushNotificationConfig.Builder reponseBuilder = io.a2a.grpc.TaskPushNotificationConfig.newBuilder();
210+
JsonFormat.parser().merge(httpResponseBody, reponseBuilder);
211+
return ProtoUtils.FromProto.taskPushNotificationConfig(reponseBuilder);
212+
} catch (A2AClientException e) {
213+
throw e;
214+
} catch (IOException | InterruptedException e) {
215+
throw new A2AClientException("Failed to send message: " + e, e);
216+
}
162217
}
163218

164219
@Override
@@ -173,7 +228,7 @@ public void deleteTaskPushNotificationConfigurations(DeleteTaskPushNotificationC
173228

174229
@Override
175230
public void resubscribe(TaskIdParams request, Consumer<StreamingEventKind> eventConsumer,
176-
Consumer<Throwable> errorConsumer, ClientCallContext context) throws A2AClientException {
231+
Consumer<Throwable> errorConsumer, ClientCallContext context) throws A2AClientException {
177232
throw new UnsupportedOperationException("Not supported yet."); // Generated from nbfs://nbhost/SystemFileSystem/Templates/Classes/Code/GeneratedMethodBody
178233
}
179234

@@ -222,7 +277,6 @@ private A2AHttpClient.PostBuilder createPostBuilder(String url, PayloadAndHeader
222277
postBuilder.addHeader(entry.getKey(), entry.getValue());
223278
}
224279
}
225-
226280
return postBuilder;
227281
}
228282

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package io.a2a.client.transport;
2+
3+
import java.util.List;
4+
5+
import io.a2a.client.ClientCallInterceptor;
6+
import io.a2a.client.ClientConfig;
7+
import io.a2a.spec.AgentCard;
8+
import io.a2a.spec.TransportProtocol;
9+
10+
public class JSONRestTransportProvider implements ClientTransportProvider {
11+
12+
@Override
13+
public ClientTransport create(ClientConfig clientConfig, AgentCard agentCard,
14+
String agentUrl, List<ClientCallInterceptor> interceptors) {
15+
return new JSONRestTransport(clientConfig.getHttpClient(), agentCard, agentUrl, interceptors);
16+
}
17+
18+
@Override
19+
public String getTransportProtocol() {
20+
return TransportProtocol.HTTP_JSON.asString();
21+
}
22+
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
io.a2a.client.transport.JSONRPCTransportProvider
2-
io.a2a.client.transport.GrpcTransportProvider
2+
io.a2a.client.transport.GrpcTransportProvider
3+
io.a2a.client.transport.JSONRestTransportProvider

0 commit comments

Comments
 (0)