-
Notifications
You must be signed in to change notification settings - Fork 91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bug: McpAgent
forces internal WebSocket upgrade for SSE connections
#172
Comments
This is expected behaviour. We do this internally to enable hibernation, so you don't have to pay for wall clock time when there's no activity on the connection. None of this is user facing. Is there a specific problem or bug you're facing? |
Thank you for the clarification. I understand the internal WebSocket connection is intentional for hibernation purposes. The specific problem I'm facing is that when multiple clients try to connect simultaneously via SSE, only the very first client can maintain its connection. Subsequent clients effectively fail because the internal WebSocket connection terminates abnormally. Looking at the logs, for the second client onwards, I can see the internal WebSocket connection attempt (the request with As a result, from the client's perspective, the SSE connection cannot be established or maintained, making the service unusable for subsequent clients. So, the issue isn't the use of the internal WebSocket itself, but rather that this internal WebSocket connection seems unable to handle concurrent connections correctly within my current Durable Object setup, leading to the abnormal termination. This results in the bug where only the first client can successfully connect. |
The issue where only the first client can maintain a connection likely stems from the agents/packages/agents/src/mcp/index.ts Lines 221 to 227 in 979394a
Here's a likely sequence of events:
Therefore, the current implementation of |
Hmm, this isn't because of the websockets, but because we have explicit code that prevents multiple connections with the same session id. We might have a bug if you're seeing it across different sessions. Tagging @jmorrell-cloudflare if he gets to see it before I do in a couple of hours. |
Thank you for looking into this and clarifying the potential cause regarding the session ID connection logic, not the websockets themselves. I appreciate you tagging @jmorrell-cloudflare as well. Hopefully, we can pinpoint the issue if it's indeed happening across different sessions. Thanks for the quick response! |
Do you have a repro for this? What client/app are you using? If you can describe the steps to repro this it'll be very helpful, because the code seems fine and I can't figure out how to trigger the bug normally |
This cannot happen if you have different session IDs since we use the session IDs to address the DO. We do allow the user to pass in a session ID for the initial connection: https://github.com/cloudflare/agents/blob/main/packages/agents/src/mcp/index.ts#L354 which is what I suspect is happening here. I would expect this to fail in the way @kajirita2002 has described. If it's possible to remove the session IDs from the initial connection it should resolve the issue. If it's not possible, then I'd appreciate some more details about your use-case and why to help think through different scenarios we should support. I need to rethink what our behavior should be here before I recommend a more general fix. In theory we might want to always send a specific user to a DO for every session so it can track state for that user, and the DO should handle multiple connections in that case. |
@threepointone @jmorrell-cloudflare, Thank you for getting back to me and for your help in resolving this issue. I appreciate you looking into this. 1. Specific Steps to Reproduce The issue where the second and subsequent clients cannot connect can be reproduced with the following steps: [Please describe the specific steps to reproduce the issue here]
2. Environment
import type { AuthRequest, OAuthHelpers } from '@cloudflare/workers-oauth-provider'
import { Hono } from 'hono'
import { logError, getUpstreamAuthorizeUrl, createErrorRedirect, fetchUpstreamAuthToken, fetchUpstreamTokenInfo } from './utils'
export type Props = {
accessToken: string
}
const app = new Hono<{ Bindings: Env & { OAUTH_PROVIDER: OAuthHelpers } }>()
app.get('/authorize', async (c) => {
// 変数を初期化して未定義エラーを回避
let oauthReqInfo: AuthRequest | undefined = undefined
try {
oauthReqInfo = await c.env.OAUTH_PROVIDER.parseAuthRequest(c.req.raw)
if (!oauthReqInfo.clientId || !oauthReqInfo.redirectUri) {
console.log('Invalid authorization request parameters:', oauthReqInfo)
logError('authorize', `Invalid authorization request parameters: ${JSON.stringify(oauthReqInfo)}`)
return c.text('Invalid authorization request parameters', 400)
}
const state = crypto.randomUUID()
const kvData = JSON.stringify({
originalRequest: oauthReqInfo,
})
await c.env.OAUTH_KV.put(`esa:state:${state}`, kvData, { expirationTtl: 600 })
const redirectUrl = getUpstreamAuthorizeUrl({
upstream_url: c.env.ESA_AUTH_URL,
scope: 'read write',
client_id: c.env.ESA_CLIENT_ID,
redirect_uri: new URL('/callback', c.req.url).href,
state: state,
})
return Response.redirect(redirectUrl)
} catch (e) {
logError('authorize', e)
if (oauthReqInfo?.redirectUri && oauthReqInfo?.state) {
const errorRedirectUrl = createErrorRedirect(
oauthReqInfo.redirectUri,
'server_error',
'Authorization initiation failed',
oauthReqInfo.state,
)
return Response.redirect(errorRedirectUrl.toString())
}
return c.text('Internal server error during authorization initiation', 500)
}
})
app.get('/callback', async (c) => {
const stateFromUrl = c.req.query('state')
console.error('c.req', c.req)
const code = c.req.query('code')
const error = c.req.query('error')
const errorDescription = c.req.query('error_description')
let oauthReqInfo: AuthRequest | undefined
try {
if (error) {
throw new Error(`OIDC callback error: ${error} - ${errorDescription}`)
}
if (!stateFromUrl) {
logError('callback', 'Invalid callback request: Missing state or code')
return c.text('Invalid callback request', 400)
}
const kvKey = `esa:state:${stateFromUrl}`
const storedDataJson = await c.env.OAUTH_KV.get(kvKey)
if (!storedDataJson) {
throw new Error(`Invalid or expired state parameter: ${stateFromUrl}`)
}
await c.env.OAUTH_KV.delete(kvKey)
const storedData = JSON.parse(storedDataJson) as { originalRequest: AuthRequest }
oauthReqInfo = storedData.originalRequest
if (!oauthReqInfo || !oauthReqInfo.clientId || !oauthReqInfo.redirectUri) {
throw new Error('Invalid stored state data retrieved from KV')
}
const [accessToken, tokenErrResponse] = await fetchUpstreamAuthToken({
upstream_url: c.env.ESA_TOKEN_URL,
client_id: c.env.ESA_CLIENT_ID,
client_secret: c.env.ESA_CLIENT_SECRET,
code: code,
redirect_uri: new URL('/callback', c.req.url).href,
})
if (tokenErrResponse) {
throw new Error(`Token exchange failed with status ${tokenErrResponse.status}`)
}
const [userId, userErrResponse] = await fetchUpstreamTokenInfo({
upstream_url: c.env.ESA_TOKEN_INFO_URL,
access_token: accessToken,
})
if (userErrResponse) {
throw new Error(`Failed to fetch user info: ${userErrResponse.status}`)
}
const props: Props = {
accessToken: accessToken,
}
const { redirectTo } = await c.env.OAUTH_PROVIDER.completeAuthorization({
request: oauthReqInfo,
userId: String(userId),
metadata: {
userId: String(userId),
},
scope: oauthReqInfo.scope,
props: props,
})
return Response.redirect(redirectTo)
} catch (e: any) {
logError('callback', `Processing callback failed: ${e.message}`)
if (oauthReqInfo?.redirectUri && oauthReqInfo?.state) {
const errorCode = e.message.includes('state') || e.message.includes('nonce') ? 'invalid_request' : 'server_error'
const redirectUrl = createErrorRedirect(oauthReqInfo.redirectUri, errorCode, e.message, oauthReqInfo.state)
return Response.redirect(redirectUrl.toString(), 302)
}
return c.text('Authentication processing failed', 500)
}
})
export const EsaHandler = app
import OAuthProvider from '@cloudflare/workers-oauth-provider'
import { McpAgent } from 'agents/mcp'
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'
import { z } from 'zod'
import { logError } from './utils'
import { EsaClient } from './EsaClient'
import { EsaHandler } from './esa-handler'
import {
ListPostsArgs,
GetPostArgs,
CreatePostArgs,
UpdatePostArgs,
ListCommentsArgs,
GetMemberArgs,
GetCommentArgs,
CreateCommentArgs,
ListMembersArgs,
} from './types'
import { Props } from './esa-handler'
type McpToolContent = {
content: { type: 'text'; text: string }[]
}
export class EsaMCPServer extends McpAgent<Props, unknown, Env> {
private esaClient: EsaClient | null = null
server = new McpServer({
name: 'Esa MCP Server',
version: '0.0.1',
})
async init() {
// --- 初期化前の前提条件チェック ---
if (!this.props.accessToken) {
console.warn('EsaMCPServer.init: AccessToken is missing. ESA tools will not be initialized.')
return
}
if (!this.env.ESA_TEAM) {
const errorMsg = 'EsaMCPServer.init: ESA_TEAM environment variable is not set. Cannot initialize EsaClient.'
console.error(errorMsg)
throw new Error(errorMsg)
}
// --- EsaClient の初期化 ---
this.esaClient = new EsaClient(this.props.accessToken, this.env.ESA_TEAM)
// --- Zod スキーマ定義 ---
const listPostsSchema = {
q: z.string().optional().describe('Search query (see esa API documentation for details)'),
include: z.string().optional().describe("Related data to include in the response (e.g. 'comments,stargazers')"),
sort: z
.enum(['updated', 'created', 'number', 'stars', 'watches', 'comments', 'best_match'])
.optional()
.default('updated')
.describe('Sort method (updated, created, number, stars, watches, comments, best_match)'),
order: z.enum(['desc', 'asc']).optional().default('desc').describe('Sort order (desc, asc)'),
per_page: z.number().optional().default(20).describe('Number of results per page (default: 20, max: 100)'),
page: z.number().optional().default(1).describe('Page number to retrieve'),
}
// --- ツール定義 ---
this.server.tool(
'esa_list_posts',
'Get a list of posts in the team (with pagination support)',
listPostsSchema,
async (args: ListPostsArgs): Promise<McpToolContent> => {
const toolName = 'esa_list_posts'
if (!this.esaClient) {
// 未初期化エラーも共通フォーマッタを使用
return this.formatErrorResponse(toolName, new Error('ESA client not initialized'), args)
}
try {
const response = await this.esaClient.listPosts(args)
// 成功レスポンスの整形をヘルパーに任せる
return this.formatSuccessResponse(response)
} catch (error) {
// エラーレスポンスの整形とログ記録をヘルパーに任せる
return this.formatErrorResponse(toolName, error, args)
}
},
)
}
}
export default new OAuthProvider({
apiRoute: '/sse',
// @ts-ignore
apiHandler: EsaMCPServer.mount('/sse'),
// @ts-ignore
defaultHandler: EsaHandler,
authorizeEndpoint: '/authorize',
tokenEndpoint: '/token',
clientRegistrationEndpoint: '/register',
}) 3. Session ID Usage
Please let me know if you require any further information or clarification. I'm happy to provide more details or assist in any way I can to help resolve this issue. Best regards, |
Summary
The
McpAgent
(from@cloudflare/agents/mcp
) internally forces a WebSocket connection to its underlying Durable Object (DO) stub, even when the client connects to the Worker using Server-Sent Events (SSE) via the/sse
endpoint provided byMcpAgent.mount()
.This behavior prevents the use of
McpAgent
in environments without WebSocket support for the DO communication leg and generates unnecessary WebSocket-related errors or logs in the DO, even when the intended client communication method is purely SSE.Problem Description
When a client initiates an SSE connection:
/sse
endpoint with theAccept: text/event-stream
header.McpAgent.mount()
'sWorkspace
handler correctly identifies the request as SSE.doStub.fetch
), it always sends a request including anUpgrade: websocket
header.This results in the following internal communication flow:
Client --- (SSE) ---> Worker --- (WebSocket Upgrade) ---> Durable Object
Problematic Code Location
The issue appears to stem from the
McpAgent.mount()
's fetch handler, specifically where it callsdoStub.fetch
. The code unconditionally adds theUpgrade: websocket
header.See approximately: https://github.com/cloudflare/agents/blob/main/packages/agents/src/mcp/index.ts#L373-L384
This internal architecture makes WebSocket support mandatory for the Durable Object when using
McpAgent
, even if the public-facing interface is intended to be SSE only.Expected Behavior
When a client connects via the
/sse
endpoint, the communication between the Worker and the Durable Object should ideally not be forced to upgrade to WebSocket. The system should support configurations where the entire path (Client -> Worker -> DO) can operate without requiring an internal WebSocket connection if the client initiated an SSE connection.Relation to MCP Specification (PR #206)
This mandatory internal WebSocket upgrade seems somewhat contrary to the flexible transport principles discussed in the Model Context Protocol evolution, such as in modelcontextprotocol/modelcontextprotocol#206 ("[RFC] Replace HTTP+SSE with new 'Streamable HTTP' transport"). That discussion highlights the need for transport flexibility and acknowledges limitations of mandating WebSockets universally due to infrastructure or client constraints. While the PR focuses on the client-server specification, the spirit of avoiding unnecessary transport limitations seems applicable here internally as well.
Potential Solutions
McpAgent.mount()
to only add theUpgrade: "websocket"
header to thedoStub.fetch
call if the original client request was a WebSocket upgrade request.McpAgent.mount()
(e.g.,internalTransport: "sse" | "websocket"
) allowing developers to specify the preferred Worker-DO communication method.McpAgent
specifically for proxying SSE requests to the DO without WebSocket involvement.Request
McpAgent
, even for clients connecting via SSE? If so, what is the rationale?@cloudflare/agents
package?Thank you for considering this issue.
The text was updated successfully, but these errors were encountered: