Skip to content

Commit 31e7fe8

Browse files
Pass options
1 parent 6a37f44 commit 31e7fe8

File tree

2 files changed

+57
-38
lines changed

2 files changed

+57
-38
lines changed

lib/hermes/server/transport/streamable_http.ex

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -64,17 +64,20 @@ defmodule Hermes.Server.Transport.StreamableHTTP do
6464
6565
- `:server` - The server process (required)
6666
- `:name` - Name for registering the GenServer (required)
67+
- `:call_timeout` - Timeout for internal GenServer calls in milliseconds (default: 10 minutes)
6768
"""
6869
@type option ::
6970
{:server, GenServer.server()}
7071
| {:name, GenServer.name()}
72+
| {:call_timeout, pos_integer()}
7173
| GenServer.option()
7274

7375
defschema(:parse_options, [
7476
{:server, {:required, Hermes.get_schema(:process_name)}},
7577
{:name, {:required, {:custom, &Hermes.genserver_name/1}}},
7678
{:registry, {:atom, {:default, Hermes.Server.Registry}}},
7779
{:request_timeout, {:integer, {:default, to_timeout(second: 30)}}},
80+
{:call_timeout, {:integer, {:default, to_timeout(second: 30)}}},
7881
{:task_supervisor, {:required, {:custom, &Hermes.genserver_name/1}}}
7982
])
8083

@@ -106,8 +109,9 @@ defmodule Hermes.Server.Transport.StreamableHTTP do
106109
"""
107110
@impl Transport
108111
@spec send_message(GenServer.server(), binary(), keyword()) :: :ok | {:error, term()}
109-
def send_message(transport, message, _opts \\ []) when is_binary(message) do
110-
GenServer.call(transport, {:send_message, message})
112+
def send_message(transport, message, opts \\ []) when is_binary(message) do
113+
timeout = Keyword.get(opts, :call_timeout, 5000)
114+
GenServer.call(transport, {:send_message, message}, timeout)
111115
end
112116

113117
@doc """
@@ -133,10 +137,11 @@ defmodule Hermes.Server.Transport.StreamableHTTP do
133137
Called by the Plug when establishing an SSE connection.
134138
The calling process becomes the SSE handler for the session.
135139
"""
136-
@spec register_sse_handler(GenServer.server(), String.t()) ::
140+
@spec register_sse_handler(GenServer.server(), String.t(), keyword()) ::
137141
:ok | {:error, term()}
138-
def register_sse_handler(transport, session_id) do
139-
GenServer.call(transport, {:register_sse_handler, session_id, self()})
142+
def register_sse_handler(transport, session_id, opts \\ []) do
143+
timeout = Keyword.get(opts, :call_timeout, 5000)
144+
GenServer.call(transport, {:register_sse_handler, session_id, self()}, timeout)
140145
end
141146

142147
@doc """
@@ -154,10 +159,11 @@ defmodule Hermes.Server.Transport.StreamableHTTP do
154159
155160
Called by the Plug when a message is received via HTTP POST.
156161
"""
157-
@spec handle_message(GenServer.server(), String.t(), map() | list(map), map()) ::
162+
@spec handle_message(GenServer.server(), String.t(), map() | list(map), map(), keyword()) ::
158163
{:ok, binary() | nil} | {:error, term()}
159-
def handle_message(transport, session_id, message, context) do
160-
GenServer.call(transport, {:handle_message, session_id, message, context})
164+
def handle_message(transport, session_id, message, context, opts \\ []) do
165+
timeout = Keyword.get(opts, :call_timeout, 5000)
166+
GenServer.call(transport, {:handle_message, session_id, message, context}, timeout)
161167
end
162168

163169
@doc """
@@ -166,12 +172,15 @@ defmodule Hermes.Server.Transport.StreamableHTTP do
166172
This allows the Plug to know whether to stream the response via SSE
167173
or return it as a regular HTTP response.
168174
"""
169-
@spec handle_message_for_sse(GenServer.server(), String.t(), map(), map()) ::
175+
@spec handle_message_for_sse(GenServer.server(), String.t(), map(), map(), keyword()) ::
170176
{:ok, binary()} | {:sse, binary()} | {:error, term()}
171-
def handle_message_for_sse(transport, session_id, message, context) do
177+
def handle_message_for_sse(transport, session_id, message, context, opts \\ []) do
178+
timeout = Keyword.get(opts, :call_timeout, 5000)
179+
172180
GenServer.call(
173181
transport,
174-
{:handle_message_for_sse, session_id, message, context}
182+
{:handle_message_for_sse, session_id, message, context},
183+
timeout
175184
)
176185
end
177186

@@ -181,9 +190,10 @@ defmodule Hermes.Server.Transport.StreamableHTTP do
181190
Returns the pid of the process handling SSE for this session,
182191
or nil if no SSE connection exists.
183192
"""
184-
@spec get_sse_handler(GenServer.server(), String.t()) :: pid() | nil
185-
def get_sse_handler(transport, session_id) do
186-
GenServer.call(transport, {:get_sse_handler, session_id})
193+
@spec get_sse_handler(GenServer.server(), String.t(), keyword()) :: pid() | nil
194+
def get_sse_handler(transport, session_id, opts \\ []) do
195+
timeout = Keyword.get(opts, :call_timeout, 5000)
196+
GenServer.call(transport, {:get_sse_handler, session_id}, timeout)
187197
end
188198

189199
@doc """
@@ -207,6 +217,7 @@ defmodule Hermes.Server.Transport.StreamableHTTP do
207217
server: server,
208218
registry: opts.registry,
209219
request_timeout: opts.request_timeout,
220+
call_timeout: opts.call_timeout,
210221
task_supervisor: opts.task_supervisor,
211222
# Map of session_id => {pid, monitor_ref}
212223
sse_handlers: %{},

lib/hermes/server/transport/streamable_http/plug.ex

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ if Code.ensure_loaded?(Plug) do
1212
1313
## SSE Streaming Architecture
1414
15-
This Plug handles SSE streaming by keeping the request process alive
15+
This Plug handles SSE streaming by keeping the request process alive
1616
and managing the streaming loop for server-to-client communication.
1717
1818
## Usage in Phoenix Router
@@ -81,8 +81,9 @@ if Code.ensure_loaded?(Plug) do
8181
transport = registry.transport(server, :streamable_http)
8282
session_header = Keyword.get(opts, :session_header, @default_session_header)
8383
timeout = Keyword.get(opts, :timeout, @default_timeout)
84+
call_timeout = Keyword.get(opts, :call_timeout, @default_timeout)
8485

85-
%{transport: transport, session_header: session_header, timeout: timeout}
86+
%{transport: transport, session_header: session_header, timeout: timeout, call_timeout: call_timeout}
8687
end
8788

8889
@impl Plug
@@ -97,11 +98,11 @@ if Code.ensure_loaded?(Plug) do
9798

9899
# GET request handler - establishes SSE connection
99100

100-
defp handle_get(conn, %{transport: transport, session_header: session_header}) do
101+
defp handle_get(conn, %{transport: transport, session_header: session_header, call_timeout: call_timeout}) do
101102
if wants_sse?(conn) do
102103
session_id = get_or_create_session_id(conn, session_header)
103104

104-
case StreamableHTTP.register_sse_handler(transport, session_id) do
105+
case StreamableHTTP.register_sse_handler(transport, session_id, call_timeout: call_timeout) do
105106
:ok ->
106107
start_sse_streaming(conn, transport, session_id, session_header)
107108

@@ -129,7 +130,7 @@ if Code.ensure_loaded?(Plug) do
129130
session_id: session_id
130131
})
131132

132-
process_message(message, conn, transport, session_id, context, session_header)
133+
process_message(message, conn, transport, session_id, context, session_header, opts.call_timeout)
133134
else
134135
{:error, :invalid_accept_header} ->
135136
send_error(
@@ -156,20 +157,22 @@ if Code.ensure_loaded?(Plug) do
156157
end
157158
end
158159

159-
defp process_message(message, conn, transport, session_id, context, session_header) when is_map(message) do
160+
defp process_message(message, conn, transport, session_id, context, session_header, call_timeout)
161+
when is_map(message) do
160162
if Message.is_request(message) do
161163
handle_request_with_possible_sse(
162164
conn,
163165
transport,
164166
session_id,
165167
message,
166168
context,
167-
session_header
169+
session_header,
170+
call_timeout
168171
)
169172
else
170173
# Notification
171174
transport
172-
|> StreamableHTTP.handle_message(session_id, message, context)
175+
|> StreamableHTTP.handle_message(session_id, message, context, call_timeout: call_timeout)
173176
|> format_notification_response(conn)
174177
end
175178
end
@@ -212,15 +215,16 @@ if Code.ensure_loaded?(Plug) do
212215

213216
# Handle requests that might need SSE streaming
214217

215-
defp handle_request_with_possible_sse(conn, transport, session_id, body, context, session_header) do
218+
defp handle_request_with_possible_sse(conn, transport, session_id, body, context, session_header, call_timeout) do
216219
if wants_sse?(conn) do
217220
handle_sse_request(
218221
conn,
219222
transport,
220223
session_id,
221224
body,
222225
context,
223-
session_header
226+
session_header,
227+
call_timeout
224228
)
225229
else
226230
handle_json_request(
@@ -229,17 +233,19 @@ if Code.ensure_loaded?(Plug) do
229233
session_id,
230234
body,
231235
context,
232-
session_header
236+
session_header,
237+
call_timeout
233238
)
234239
end
235240
end
236241

237-
defp handle_sse_request(conn, transport, session_id, body, context, session_header) do
242+
defp handle_sse_request(conn, transport, session_id, body, context, session_header, call_timeout) do
238243
case StreamableHTTP.handle_message_for_sse(
239244
transport,
240245
session_id,
241246
body,
242-
context
247+
context,
248+
call_timeout: call_timeout
243249
) do
244250
{:sse, response} ->
245251
route_sse_response(
@@ -249,7 +255,8 @@ if Code.ensure_loaded?(Plug) do
249255
response,
250256
body,
251257
context,
252-
session_header
258+
session_header,
259+
call_timeout
253260
)
254261

255262
{:ok, response} ->
@@ -263,8 +270,8 @@ if Code.ensure_loaded?(Plug) do
263270
end
264271
end
265272

266-
defp handle_json_request(conn, transport, session_id, body, context, session_header) do
267-
case StreamableHTTP.handle_message(transport, session_id, body, context) do
273+
defp handle_json_request(conn, transport, session_id, body, context, session_header, call_timeout) do
274+
case StreamableHTTP.handle_message(transport, session_id, body, context, call_timeout: call_timeout) do
268275
{:ok, response} ->
269276
conn
270277
|> put_resp_content_type("application/json")
@@ -276,8 +283,8 @@ if Code.ensure_loaded?(Plug) do
276283
end
277284
end
278285

279-
defp route_sse_response(conn, transport, session_id, response, body, context, session_header) do
280-
if handler_pid = StreamableHTTP.get_sse_handler(transport, session_id) do
286+
defp route_sse_response(conn, transport, session_id, response, body, context, session_header, call_timeout) do
287+
if handler_pid = StreamableHTTP.get_sse_handler(transport, session_id, call_timeout: call_timeout) do
281288
send(handler_pid, {:sse_message, response})
282289

283290
conn
@@ -290,7 +297,8 @@ if Code.ensure_loaded?(Plug) do
290297
session_id,
291298
body,
292299
context,
293-
session_header
300+
session_header,
301+
call_timeout
294302
)
295303
end
296304
end
@@ -309,10 +317,10 @@ if Code.ensure_loaded?(Plug) do
309317
)
310318
end
311319

312-
defp establish_sse_for_request(conn, transport, session_id, body, context, session_header) do
313-
case StreamableHTTP.register_sse_handler(transport, session_id) do
320+
defp establish_sse_for_request(conn, transport, session_id, body, context, session_header, call_timeout) do
321+
case StreamableHTTP.register_sse_handler(transport, session_id, call_timeout: call_timeout) do
314322
:ok ->
315-
start_background_request(transport, session_id, body, context)
323+
start_background_request(transport, session_id, body, context, call_timeout)
316324
start_sse_streaming(conn, transport, session_id, session_header)
317325

318326
{:error, reason} ->
@@ -326,11 +334,11 @@ if Code.ensure_loaded?(Plug) do
326334
end
327335
end
328336

329-
defp start_background_request(transport, session_id, body, context) do
337+
defp start_background_request(transport, session_id, body, context, call_timeout) do
330338
self_pid = self()
331339

332340
Task.start(fn ->
333-
case StreamableHTTP.handle_message(transport, session_id, body, context) do
341+
case StreamableHTTP.handle_message(transport, session_id, body, context, call_timeout: call_timeout) do
334342
{:ok, response} when is_binary(response) ->
335343
send(self_pid, {:sse_message, response})
336344

0 commit comments

Comments
 (0)