Skip to content

Commit 1885b6d

Browse files
committed
feat: add streamable transport
1 parent ef8356c commit 1885b6d

File tree

5 files changed

+272
-147
lines changed

5 files changed

+272
-147
lines changed

README.md

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ Here are some example prompts to get you started:
107107

108108
### Claude Desktop Setup
109109

110+
#### Local
111+
110112
1. Open Claude Desktop settings
111113
2. Add the following to your configuration:
112114
```json
@@ -123,8 +125,8 @@ Here are some example prompts to get you started:
123125
> [!TIP]
124126
> You can refer to the [official documentation](https://modelcontextprotocol.io/quickstart/user) for Claude Desktop.
125127
126-
#### SSE Server specific configuration
127-
To run the SSE version, as Claude Desktop doesn't natively support it yet, you'll have to use a gateway:
128+
#### Remote
129+
To run an HTTP server, as Claude Desktop doesn't natively support it yet, you'll have to use a gateway:
128130
```json
129131
{
130132
"mcpServers": {
@@ -133,12 +135,16 @@ To run the SSE version, as Claude Desktop doesn't natively support it yet, you'l
133135
"args": [
134136
"-y",
135137
"mcp-remote",
136-
"http://localhost:4243/sse"
138+
"http://localhost:4243/mcp"
137139
]
138140
}
139141
}
140142
}
141143
```
144+
> [!INFO]
145+
> Our HTTP server leverages the [Streamable HTTP transport](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http).
146+
> It is also backward compatible with the [SSE transport](https://modelcontextprotocol.io/docs/concepts/transports#server-sent-events-sse).
147+
142148

143149
### CLI Options
144150

src/app.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ program
6262
.command("start-server", { isDefault: true })
6363
.description("Starts the Algolia MCP server")
6464
.option<string[]>(...ALLOW_TOOLS_OPTIONS_TUPLE)
65-
.option("--transport [stdio|sse]", "Transport type, either `stdio` (default) or `sse`", "stdio")
65+
.option("--transport [stdio|http]", "Transport type, either `stdio` (default) or `http`", "stdio")
6666
.action(async (opts: StartServerOptions) => {
6767
switch (opts.transport) {
6868
case "stdio": {
@@ -71,14 +71,14 @@ program
7171
await startServer(opts);
7272
break;
7373
}
74-
case "sse": {
75-
console.info('Starting server with SSE transport');
76-
const { startSseServer } = await import("./commands/start-sse-server.ts");
77-
await startSseServer(opts);
74+
case "http": {
75+
console.info('Starting server with HTTP transport support');
76+
const { startHttpServer } = await import("./commands/start-http-server.ts");
77+
await startHttpServer(opts);
7878
break;
7979
}
8080
default:
81-
console.error(`Unknown transport type: ${opts.transport}\nAllowed values: stdio, sse`);
81+
console.error(`Unknown transport type: ${opts.transport}\nAllowed values: stdio, http`);
8282
process.exit(1);
8383
}
8484
});

src/commands/start-http-server.ts

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
import express, { type NextFunction, type Request, type Response } from "express";
2+
import cors from "cors";
3+
import { randomUUID } from "node:crypto";
4+
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
5+
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
6+
import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js";
7+
import { InMemoryEventStore } from "@modelcontextprotocol/sdk/examples/shared/inMemoryEventStore.js";
8+
import type { StartServerOptions } from "../server/types.ts";
9+
import { initMCPServer } from "../server/init-server.ts";
10+
import { CONFIG } from "../config.ts";
11+
12+
export async function startHttpServer(opts: StartServerOptions) {
13+
try {
14+
// Create Express application
15+
const app = express();
16+
app.use(express.json());
17+
app.use(
18+
cors({
19+
origin: process.env.ALLOWED_ORIGINS?.split(",") || "*",
20+
methods: ["GET", "POST", "DELETE"],
21+
allowedHeaders: ["Content-Type", "Authorization"],
22+
}),
23+
);
24+
25+
// Store transports by session ID
26+
const transports: Map<string, StreamableHTTPServerTransport | SSEServerTransport> = new Map();
27+
28+
// Health check endpoint
29+
app.get("/health", (_, res) => {
30+
res.status(200).json({
31+
status: "ok",
32+
version: CONFIG.version,
33+
uptime: process.uptime(),
34+
timestamp: new Date().toISOString(),
35+
connections: transports.size,
36+
});
37+
});
38+
39+
//=============================================================================
40+
// STREAMABLE HTTP TRANSPORT (PROTOCOL VERSION 2025-03-26)
41+
//=============================================================================
42+
43+
// Handle all MCP Streamable HTTP requests (GET, POST, DELETE) on a single endpoint
44+
app.all("/mcp", async (req: Request, res: Response) => {
45+
console.log(`Received ${req.method} request to /mcp`);
46+
47+
try {
48+
// Check for existing session ID
49+
const sessionId = req.headers["mcp-session-id"] as string | undefined;
50+
let transport: StreamableHTTPServerTransport;
51+
52+
if (sessionId != null && transports.has(sessionId)) {
53+
// Check if the transport is of the correct type
54+
const existingTransport = transports.get(sessionId);
55+
if (existingTransport instanceof StreamableHTTPServerTransport) {
56+
// Reuse existing transport
57+
transport = existingTransport;
58+
} else {
59+
// Transport exists but is not a StreamableHTTPServerTransport (could be SSEServerTransport)
60+
res.status(400).json({
61+
jsonrpc: "2.0",
62+
error: {
63+
code: -32000,
64+
message: "Bad Request: Session exists but uses a different transport protocol",
65+
},
66+
id: null,
67+
});
68+
return;
69+
}
70+
} else if (sessionId == null && req.method === "POST" && isInitializeRequest(req.body)) {
71+
const eventStore = new InMemoryEventStore();
72+
transport = new StreamableHTTPServerTransport({
73+
sessionIdGenerator: () => randomUUID(),
74+
eventStore, // Enable resumability
75+
onsessioninitialized: (sessionId) => {
76+
// Store the transport by session ID when session is initialized
77+
console.log(`StreamableHTTP session initialized with ID: ${sessionId}`);
78+
transports.set(sessionId, transport);
79+
},
80+
});
81+
82+
// Set up onclose handler to clean up transport when closed
83+
transport.onclose = () => {
84+
if (transport.sessionId != null && transports.has(transport.sessionId)) {
85+
console.log(`Transport closed for HTTP session ${transport.sessionId}, removing from transports map`);
86+
transports.delete(transport.sessionId);
87+
}
88+
};
89+
90+
// Connect the transport to the MCP server
91+
const server = await initMCPServer(opts);
92+
await server.connect(transport);
93+
} else {
94+
// Invalid request - no session ID or not initialization request
95+
res.status(400).json({
96+
jsonrpc: "2.0",
97+
error: {
98+
code: -32000,
99+
message: "Bad Request: No valid session ID provided",
100+
},
101+
id: null,
102+
});
103+
return;
104+
}
105+
106+
// Handle the request with the transport
107+
await transport.handleRequest(req, res, req.body);
108+
} catch (error) {
109+
console.error("Error handling MCP request:", error);
110+
if (!res.headersSent) {
111+
res.status(500).json({
112+
jsonrpc: "2.0",
113+
error: {
114+
code: -32603,
115+
message: "Internal server error",
116+
},
117+
id: null,
118+
});
119+
}
120+
}
121+
});
122+
123+
//=============================================================================
124+
// DEPRECATED HTTP+SSE TRANSPORT (PROTOCOL VERSION 2024-11-05)
125+
//=============================================================================
126+
127+
app.get("/sse", async (_: Request, res: Response) => {
128+
console.log("Received GET request to /sse (deprecated SSE transport)");
129+
const transport = new SSEServerTransport("/messages", res);
130+
transports.set(transport.sessionId, transport);
131+
res.on("close", () => {
132+
if (transport.sessionId != null && transports.has(transport.sessionId)) {
133+
console.log(`Transport closed for SSE session ${transport.sessionId}, removing from transports map`);
134+
transports.delete(transport.sessionId);
135+
}
136+
});
137+
const server = await initMCPServer(opts);
138+
await server.connect(transport);
139+
});
140+
141+
app.post("/messages", async (req: Request, res: Response) => {
142+
const sessionId = req.query.sessionId as string;
143+
let transport: SSEServerTransport;
144+
const existingTransport = transports.get(sessionId);
145+
if (existingTransport instanceof SSEServerTransport) {
146+
// Reuse existing transport
147+
transport = existingTransport;
148+
} else {
149+
// Transport exists but is not a SSEServerTransport (could be StreamableHTTPServerTransport)
150+
res.status(400).json({
151+
jsonrpc: "2.0",
152+
error: {
153+
code: -32000,
154+
message: "Bad Request: Session exists but uses a different transport protocol",
155+
},
156+
id: null,
157+
});
158+
return;
159+
}
160+
if (transport != null) {
161+
await transport.handlePostMessage(req, res, req.body);
162+
} else {
163+
res.status(400).send({
164+
jsonrpc: "2.0",
165+
error: {
166+
code: -32000,
167+
message: "Bad Request: No transport found for sessionId",
168+
},
169+
id: null,
170+
});
171+
}
172+
});
173+
174+
//=============================================================================
175+
// END OF SERVER SETUP
176+
//=============================================================================
177+
178+
// Error handling
179+
app.use((err: Error, _: Request, res: Response, __: NextFunction) => {
180+
console.error("Unhandled exception: ", err.stack);
181+
res.status(500).json({
182+
jsonrpc: "2.0",
183+
error: {
184+
code: -32603,
185+
message: "Internal Server Error",
186+
},
187+
id: null,
188+
});
189+
});
190+
191+
// Graceful shutdown of all connections
192+
async function closeAllConnections() {
193+
for (const [sessionId, transport] of transports.entries()) {
194+
try {
195+
console.log(`Closing transport for session ${sessionId}`);
196+
await transport.send({
197+
jsonrpc: "2.0",
198+
method: "notifications/shutdown",
199+
});
200+
await transport.close();
201+
} catch (error) {
202+
console.error(`Error closing transport for session ${sessionId}: `, error);
203+
}
204+
}
205+
transports.clear();
206+
console.log("All connections closed");
207+
}
208+
209+
// Graceful shutdown
210+
process.on("SIGTERM", async () => {
211+
console.log(`Graceful shutdown initiated at ${new Date().toISOString()}`);
212+
await closeAllConnections();
213+
server.close(() => {
214+
console.log(`Graceful shutdown complete at ${new Date().toISOString()}`);
215+
process.exit(0);
216+
});
217+
});
218+
219+
// Handle server shutdown
220+
process.on("SIGINT", async () => {
221+
console.log("Shutting down server...");
222+
await closeAllConnections();
223+
224+
process.exit(0);
225+
});
226+
227+
// Start the server
228+
const PORT = 4243;
229+
const server = app.listen(PORT, () => {
230+
console.log(`HTTP MCP server listening on port ${PORT}`);
231+
console.log(`
232+
==============================================
233+
SUPPORTED TRANSPORT OPTIONS:
234+
235+
1. Streamable Http(Protocol version: 2025-03-26)
236+
Endpoint: /mcp
237+
Methods: GET, POST, DELETE
238+
Usage:
239+
- Initialize with POST to /mcp
240+
- Establish SSE stream with GET to /mcp
241+
- Send requests with POST to /mcp
242+
- Terminate session with DELETE to /mcp
243+
244+
2. Http + SSE (Protocol version: 2024-11-05)
245+
Endpoints: /sse (GET) and /messages (POST)
246+
Usage:
247+
- Establish SSE stream with GET to /sse
248+
- Send requests with POST to /messages?sessionId=<id>
249+
==============================================
250+
`);
251+
});
252+
} catch (err) {
253+
console.error("Error starting server:", err);
254+
process.exit(1);
255+
}
256+
}

0 commit comments

Comments
 (0)