Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 78 additions & 6 deletions framework/fel/java/plugins/tool-mcp-server/README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,50 @@
# FitMcpStreamableServerTransportProvider类维护文档
# MCP Server 插件维护文档

## 文档概述

本文档用于记录 `FitMcpStreamableServerTransportProvider` 类的设计、实现细节以及维护更新指南。该类是基于 MCP SDK 中的
`HttpServletStreamableServerTransportProvider` 类改造而来,用于在 FIT 框架中提供 MCP(Model Context Protocol)服务端的传输层实现。

**原始参考类**: MCP SDK 中的 `HttpServletStreamableServerTransportProvider`
本文档用于记录 MCP Server 插件的设计、实现细节以及维护更新指南。该插件基于 MCP SDK 改造而来,用于在 FIT 框架中提供 MCP(Model Context Protocol)服务端的传输层实现。

**创建时间**: 2025-11-04

---

## 类的作用和职责
## 架构概览

### DefaultMcpServer 的双 Bean 管理

`DefaultMcpServer` 是 MCP Server 的核心实现类,它同时管理两个 MCP 同步服务器 Bean:

1. **McpSyncSseServer** - 用于 SSE (Server-Sent Events) 传输
2. **McpSyncStreamableServer** - 用于 Streamable 传输

这两个 Bean 分别由 `McpSseServerConfig` 和 `McpStreamableServerConfig` 配置类创建,并通过 `@Fit(alias = "...")` 注解进行区分注入:

```java
// McpSseServerConfig.java
@Bean("McpSyncSseServer")
public McpSyncServer mcpSyncSseServer(...) { ... }

// McpStreamableServerConfig.java
@Bean("McpSyncStreamableServer")
public McpSyncServer mcpSyncStreamableServer(...) { ... }

// FitMcpServer.java
public DefaultMcpServer(ToolExecuteService toolExecuteService,
@Fit(alias = "McpSyncSseServer") McpSyncServer mcpSyncSseServer,
@Fit(alias = "McpSyncStreamableServer") McpSyncServer mcpSyncStreamableServer) {
...
}
```

`DefaultMcpServer` 实现了 `McpServer` 和 `ToolChangedObserver` 接口,负责:
- 统一管理两个传输类型的工具注册和移除
- 确保两个 MCP 同步服务器保持工具列表同步
- 处理工具执行请求并返回结果
- 通知工具变更观察者

---

## FitMcpStreamableServerTransportProvider 类的作用和职责

`FitMcpStreamableServerTransportProvider` 是 MCP 服务端传输层的核心实现类,负责:

Expand Down Expand Up @@ -307,6 +340,45 @@ if(!this.response.isActive()){
}
```

---

## FitMcpSseServerTransportProvider 简要说明

`FitMcpSseServerTransportProvider` 是基于 MCP SDK 中的 `HttpServletSseServerTransportProvider` 改造而来的 FIT 框架实现,用于提供 MCP SSE 传输层。

### 与 Streamable 的主要区别

`FitMcpSseServerTransportProvider` 的实现与 `FitMcpStreamableServerTransportProvider` 非常相似,主要区别在于:

1. **端点路径**:
- SSE: `/mcp/sse` (GET) 和 `/mcp/message` (POST)
- Streamable: `/mcp/streamable` (GET/POST/DELETE)

2. **协议版本支持**:
- SSE: 仅支持 `MCP_2024_11_05`
- Streamable: 支持 `MCP_2024_11_05`、`MCP_2025_03_26`、`MCP_2025_06_18`

3. **请求处理**:
- SSE: GET 请求用于建立 SSE 连接,POST 请求用于发送 JSON-RPC 消息
- Streamable: GET 请求用于建立 SSE 连接或重放消息,POST 请求处理初始化和其他 JSON-RPC 消息,DELETE 请求用于删除会话

4. **会话管理**:
- SSE: 使用 `McpServerSession`,会话通过 GET 请求建立
- Streamable: 使用 `McpStreamableServerSession`,会话通过 POST 初始化请求建立

### 核心改造点

与 Streamable 版本类似,SSE 版本也进行了以下 FIT 框架改造:

- 使用 `HttpClassicServerRequest` 和 `HttpClassicServerResponse` 替代 Servlet API
- 使用 `Choir<TextEvent>` 和 `Emitter<TextEvent>` 实现 SSE 事件流
- 使用 FIT 的 HTTP 注解 (`@GetMapping`, `@PostMapping`) 处理请求
- 使用 `Entity.createText()` 和 `Entity.createObject()` 创建响应

详细的实现细节可以参考 `FitMcpStreamableServerTransportProvider` 的相关章节。

---

## 参考资源

### MCP 协议文档
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/

package modelengine.fel.tool.mcp.server.support;
package modelengine.fel.tool.mcp.server;

import static modelengine.fel.tool.info.schema.PluginSchema.TYPE;
import static modelengine.fel.tool.info.schema.ToolsSchema.PROPERTIES;
Expand All @@ -15,56 +15,53 @@
import io.modelcontextprotocol.server.McpSyncServer;
import io.modelcontextprotocol.spec.McpSchema;
import modelengine.fel.tool.mcp.entity.Tool;
import modelengine.fel.tool.mcp.server.McpServer;
import modelengine.fel.tool.service.ToolChangedObserver;
import modelengine.fel.tool.service.ToolChangedObserverRegistry;
import modelengine.fel.tool.service.ToolExecuteService;
import modelengine.fitframework.annotation.Component;
import modelengine.fitframework.ioc.annotation.PreDestroy;
import modelengine.fitframework.log.Logger;
import modelengine.fitframework.util.MapUtils;
import modelengine.fitframework.util.StringUtils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Mcp Server implementing interface {@link McpServer}, {@link ToolChangedObserver}
* with MCP Server Bean {@link McpSyncServer}.
* Mcp Server implementing interface {@link ToolChangedObserver}
* with MCP Server {@link McpSyncServer} implemented with SDK.
*
* @author 季聿阶
* @since 2025-05-15
*/
@Component
public class DefaultMcpStreamableServer implements McpServer, ToolChangedObserver {
private static final Logger log = Logger.get(DefaultMcpStreamableServer.class);
public class FitMcpServer implements ToolChangedObserver {
private static final Logger log = Logger.get(FitMcpServer.class);
private final McpSyncServer mcpSyncServer;

private final ToolExecuteService toolExecuteService;
private final List<ToolsChangedObserver> toolsChangedObservers = new ArrayList<>();
private final ToolChangedObserverRegistry toolChangedObserverRegistry;

/**
* Constructs a new instance of the DefaultMcpServer class.
* Constructs a new instance of the FitMcpServer class.
*
* @param toolExecuteService The service used to execute tools when handling tool call requests.
* @throws IllegalArgumentException If {@code toolExecuteService} is null.
* @param mcpSyncServer The MCP sync server.
*/
public DefaultMcpStreamableServer(ToolExecuteService toolExecuteService, McpSyncServer mcpSyncServer) {
public FitMcpServer(ToolExecuteService toolExecuteService, McpSyncServer mcpSyncServer,
ToolChangedObserverRegistry toolChangedObserverRegistry) {
this.toolExecuteService = notNull(toolExecuteService, "The tool execute service cannot be null.");
this.mcpSyncServer = mcpSyncServer;
this.toolChangedObserverRegistry = toolChangedObserverRegistry;
this.toolChangedObserverRegistry.registerToolChangedObserver(this);
}

@Override
public List<Tool> getTools() {
return this.mcpSyncServer.listTools().stream().map(this::convertToFelTool).collect(Collectors.toList());
@PreDestroy
public void onDestroy() {
this.toolChangedObserverRegistry.unregisterToolChangedObserver(this);
}

@Override
public void registerToolsChangedObserver(ToolsChangedObserver observer) {
if (observer != null) {
this.toolsChangedObservers.add(observer);
}
public List<Tool> getTools() {
return this.mcpSyncServer.listTools().stream().map(this::convertToFelTool).collect(Collectors.toList());
}

@Override
Expand All @@ -88,10 +85,13 @@ public void onToolAdded(String name, String description, Map<String, Object> par

McpServerFeatures.SyncToolSpecification toolSpecification =
createToolSpecification(name, description, parameters);

this.mcpSyncServer.addTool(toolSpecification);
try {
this.mcpSyncServer.addTool(toolSpecification);
} catch (Exception e) {
log.error("Failed to added tool to MCP server. [toolName={}, error={}]", name, e.getMessage());
throw e;
}
log.info("Tool added to MCP server. [toolName={}, description={}, schema={}]", name, description, parameters);
this.toolsChangedObservers.forEach(ToolsChangedObserver::onToolsChanged);
}

@Override
Expand All @@ -102,22 +102,15 @@ public void onToolRemoved(String name) {
}
this.mcpSyncServer.removeTool(name);
log.info("Tool removed from MCP server. [toolName={}]", name);
this.toolsChangedObservers.forEach(ToolsChangedObserver::onToolsChanged);
}

/**
* Creates a tool specification for the MCP server.
* <p>
* This method constructs a {@link McpServerFeatures.SyncToolSpecification} that includes:
* <ul>
* <li>Tool metadata (name, description, input schema)</li>
* <li>Call handler that executes the tool and handles exceptions</li>
* </ul>
*
* @param name The name of the tool.
* @param description The description of the tool.
* @param parameters The parameter schema containing type, properties, and required fields.
* @return A fully configured {@link McpServerFeatures.SyncToolSpecification}.
* @return A configured {@link McpServerFeatures.SyncToolSpecification}.
*/
private McpServerFeatures.SyncToolSpecification createToolSpecification(String name, String description,
Map<String, Object> parameters) {
Expand All @@ -137,12 +130,6 @@ private McpServerFeatures.SyncToolSpecification createToolSpecification(String n

/**
* Executes a tool and handles any exceptions that may occur.
* <p>
* This method handles two types of exceptions:
* <ul>
* <li>{@link IllegalArgumentException}: Invalid tool arguments (logged as warning)</li>
* <li>{@link Exception}: Any other execution failure (logged as error)</li>
* </ul>
*
* @param toolName The name of the tool to execute.
* @param request The tool call request containing arguments.
Expand Down
Loading