Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@
import com.fasterxml.jackson.databind.ObjectMapper;

import io.modelcontextprotocol.client.McpSyncClient;
import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport;
import io.modelcontextprotocol.json.jackson.JacksonMcpJsonMapper;
import io.modelcontextprotocol.json.schema.jackson.DefaultJsonSchemaValidator;
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
import modelengine.fel.tool.mcp.client.McpClient;
import modelengine.fel.tool.mcp.client.elicitation.ElicitRequest;
import modelengine.fel.tool.mcp.client.elicitation.ElicitResult;
import modelengine.fel.tool.mcp.client.support.handler.DefaultMcpClientLogHandler;
import modelengine.fel.tool.mcp.client.support.handler.DefaultMcpElicitationHandler;
import modelengine.fel.tool.mcp.entity.Tool;
import modelengine.fitframework.inspection.Nullable;
import modelengine.fitframework.log.Logger;
import modelengine.fitframework.util.StringUtils;
import modelengine.fitframework.util.UuidUtils;
Expand All @@ -26,6 +30,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
Expand All @@ -37,41 +42,47 @@
* @author 黄可欣
* @since 2025-11-03
*/
public class DefaultMcpStreamableClient implements McpClient {
private static final Logger log = Logger.get(DefaultMcpStreamableClient.class);
public class DefaultMcpClient implements McpClient {
private static final Logger log = Logger.get(DefaultMcpClient.class);

private final String clientId;
private final McpSyncClient mcpSyncClient;
private final DefaultMcpClientLogHandler logHandler;

private volatile boolean initialized = false;
private volatile boolean closed = false;

/**
* Constructs a new instance of the DefaultMcpStreamableClient.
* Constructs a new instance of the DefaultMcpClient.
*
* @param baseUri The base URI of the MCP server.
* @param sseEndpoint The endpoint for the Server-Sent Events (SSE) connection.
* @param requestTimeoutSeconds The timeout duration of requests. Units: seconds.
*/
public DefaultMcpStreamableClient(String baseUri, String sseEndpoint, int requestTimeoutSeconds) {
public DefaultMcpClient(String baseUri, String sseEndpoint, McpClientTransport transport, int requestTimeoutSeconds,
@Nullable Function<ElicitRequest, ElicitResult> elicitationHandler) {
this.clientId = UuidUtils.randomUuidString();
notBlank(baseUri, "The MCP server base URI cannot be blank.");
notBlank(sseEndpoint, "The MCP server SSE endpoint cannot be blank.");
log.info("Creating MCP client. [clientId={}, baseUri={}]", this.clientId, baseUri);
ObjectMapper mapper = new ObjectMapper();
HttpClientStreamableHttpTransport transport = HttpClientStreamableHttpTransport.builder(baseUri)
.jsonMapper(new JacksonMcpJsonMapper(mapper))
.endpoint(sseEndpoint)
.build();

this.logHandler = new DefaultMcpClientLogHandler(this.clientId);
this.mcpSyncClient = io.modelcontextprotocol.client.McpClient.sync(transport)
.requestTimeout(Duration.ofSeconds(requestTimeoutSeconds))
.capabilities(McpSchema.ClientCapabilities.builder().build())
.loggingConsumer(this.logHandler::handleLoggingMessage)
.jsonSchemaValidator(new DefaultJsonSchemaValidator(mapper))
.build();
DefaultMcpClientLogHandler logHandler = new DefaultMcpClientLogHandler(this.clientId);
if (elicitationHandler != null) {
DefaultMcpElicitationHandler mcpElicitationHandler =
new DefaultMcpElicitationHandler(this.clientId, elicitationHandler);
this.mcpSyncClient = io.modelcontextprotocol.client.McpClient.sync(transport)
.capabilities(McpSchema.ClientCapabilities.builder().elicitation().build())
.loggingConsumer(logHandler::handleLoggingMessage)
.elicitation(mcpElicitationHandler::handleElicitationRequest)
.requestTimeout(Duration.ofSeconds(requestTimeoutSeconds))
.jsonSchemaValidator(new DefaultJsonSchemaValidator(new ObjectMapper()))
.build();
} else {
this.mcpSyncClient = io.modelcontextprotocol.client.McpClient.sync(transport)
.capabilities(McpSchema.ClientCapabilities.builder().build())
.loggingConsumer(logHandler::handleLoggingMessage)
.requestTimeout(Duration.ofSeconds(requestTimeoutSeconds))
.jsonSchemaValidator(new DefaultJsonSchemaValidator(new ObjectMapper()))
.build();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,23 @@

package modelengine.fel.tool.mcp.client.support;

import com.fasterxml.jackson.databind.ObjectMapper;

import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport;
import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport;
import io.modelcontextprotocol.json.jackson.JacksonMcpJsonMapper;
import modelengine.fel.tool.mcp.client.McpClient;
import modelengine.fel.tool.mcp.client.McpClientFactory;
import modelengine.fel.tool.mcp.client.elicitation.ElicitRequest;
import modelengine.fel.tool.mcp.client.elicitation.ElicitResult;
import modelengine.fitframework.annotation.Component;
import modelengine.fitframework.annotation.Value;
import modelengine.fitframework.inspection.Nullable;

import java.util.function.Function;

/**
* Represents a factory for creating instances of the {@link DefaultMcpStreamableClient}.
* Represents a factory for creating instances of the {@link DefaultMcpClient}.
* This class is responsible for initializing and configuring.
*
* @author 季聿阶
Expand All @@ -32,7 +42,28 @@ public DefaultMcpClientFactory(@Value("${mcp.client.request.timeout-seconds}") i
}

@Override
public McpClient createStreamable(String baseUri, String sseEndpoint,
@Nullable Function<ElicitRequest, ElicitResult> elicitationHandler) {
HttpClientStreamableHttpTransport transport = HttpClientStreamableHttpTransport.builder(baseUri)
.jsonMapper(new JacksonMcpJsonMapper(new ObjectMapper()))
.endpoint(sseEndpoint)
.build();
return new DefaultMcpClient(baseUri, sseEndpoint, transport, this.requestTimeoutSeconds, elicitationHandler);
}

@Override
public McpClient createSse(String baseUri, String sseEndpoint,
@Nullable Function<ElicitRequest, ElicitResult> elicitationHandler) {
HttpClientSseClientTransport transport = HttpClientSseClientTransport.builder(baseUri)
.jsonMapper(new JacksonMcpJsonMapper(new ObjectMapper()))
.sseEndpoint(sseEndpoint)
.build();
return new DefaultMcpClient(baseUri, sseEndpoint, transport, this.requestTimeoutSeconds, elicitationHandler);
}

@Override
@Deprecated
public McpClient create(String baseUri, String sseEndpoint) {
return new DefaultMcpStreamableClient(baseUri, sseEndpoint, requestTimeoutSeconds);
return this.createStreamable(baseUri, sseEndpoint, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/

package modelengine.fel.tool.mcp.client.support;
package modelengine.fel.tool.mcp.client.support.handler;

import io.modelcontextprotocol.spec.McpSchema;
import modelengine.fitframework.log.Logger;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
* This file is a part of the ModelEngine Project.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/

package modelengine.fel.tool.mcp.client.support.handler;

import io.modelcontextprotocol.spec.McpSchema;
import modelengine.fel.tool.mcp.client.elicitation.ElicitRequest;
import modelengine.fel.tool.mcp.client.elicitation.ElicitResult;
import modelengine.fitframework.log.Logger;

import java.util.function.Function;

/**
* Default MCP elicitation handler that delegates to an external handler function.
*
* <p>Converts {@link McpSchema.ElicitRequest} to {@link ElicitRequest},
* calls the user's handler, and converts {@link ElicitResult} back to {@link McpSchema.ElicitResult}.
*
* @author 黄可欣
* @since 2025-11-25
*/
public class DefaultMcpElicitationHandler {
private static final Logger log = Logger.get(DefaultMcpElicitationHandler.class);
private final String clientId;
private final Function<ElicitRequest, ElicitResult> elicitationHandler;

/**
* Constructs a new handler.
*
* @param clientId The client ID.
* @param elicitationHandler The user's handler function that processes {@link ElicitRequest}
* and returns {@link ElicitResult}.
*/
public DefaultMcpElicitationHandler(String clientId, Function<ElicitRequest, ElicitResult> elicitationHandler) {
this.clientId = clientId;
this.elicitationHandler = elicitationHandler;
}

/**
* Handles an elicitation request by converting {@link McpSchema.ElicitRequest} to {@link ElicitRequest},
* delegating to the user's handler, and converting {@link ElicitResult} back to {@link McpSchema.ElicitResult}.
*
* @param request The {@link McpSchema.ElicitRequest} from MCP server.
* @return The {@link McpSchema.ElicitResult} to send back to MCP server.
*/
public McpSchema.ElicitResult handleElicitationRequest(McpSchema.ElicitRequest request) {
log.info("Received elicitation request from MCP server. [clientId={}, message={}, requestSchema={}]",
this.clientId,
request.message(),
request.requestedSchema());

try {
ElicitRequest elicitRequest = new ElicitRequest(request.message(), request.requestedSchema());
ElicitResult result = this.elicitationHandler.apply(elicitRequest);
log.info("Successfully handled elicitation request. [clientId={}, action={}, content={}]",
this.clientId,
result.action(),
result.content());

McpSchema.ElicitResult.Action mcpAction = switch (result.action()) {
case ACCEPT -> McpSchema.ElicitResult.Action.ACCEPT;
case DECLINE -> McpSchema.ElicitResult.Action.DECLINE;
case CANCEL -> McpSchema.ElicitResult.Action.CANCEL;
};
return new McpSchema.ElicitResult(mcpAction, result.content());
} catch (Exception e) {
log.error("Failed to handle elicitation request. [clientId={}, error={}]",
this.clientId,
e.getMessage(),
e);
throw new IllegalStateException("Failed to handle elicitation request: " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import modelengine.fel.tool.mcp.client.McpClient;
import modelengine.fel.tool.mcp.client.McpClientFactory;
import modelengine.fel.tool.mcp.client.elicitation.ElicitResult;
import modelengine.fel.tool.mcp.entity.Tool;
import modelengine.fit.http.annotation.GetMapping;
import modelengine.fit.http.annotation.PostMapping;
Expand All @@ -17,6 +18,7 @@
import modelengine.fitframework.annotation.Component;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -51,9 +53,27 @@ public TestController(McpClientFactory mcpClientFactory) {
* @return A string indicating that the initialization was successful.
*/
@PostMapping(path = "/initialize")
public String initialize(@RequestQuery(name = "baseUri") String baseUri,
public String initializeStreamable(@RequestQuery(name = "baseUri") String baseUri,
@RequestQuery(name = "sseEndpoint") String sseEndpoint) {
this.client = this.mcpClientFactory.create(baseUri, sseEndpoint);
this.client = this.mcpClientFactory.createStreamable(baseUri, sseEndpoint, null);
this.client.initialize();
return "Initialized";
}

@PostMapping(path = "/initialize-sse")
public String initializeSse(@RequestQuery(name = "baseUri") String baseUri,
@RequestQuery(name = "sseEndpoint") String sseEndpoint) {
this.client = this.mcpClientFactory.createSse(baseUri, sseEndpoint, null);
this.client.initialize();
return "Initialized";
}

@PostMapping(path = "/initialize-elicitation")
public String initializeElicitation(@RequestQuery(name = "baseUri") String baseUri,
@RequestQuery(name = "sseEndpoint") String sseEndpoint) {
this.client = this.mcpClientFactory.createStreamable(baseUri,
sseEndpoint,
request -> new ElicitResult(ElicitResult.Action.ACCEPT, Collections.emptyMap()));
this.client.initialize();
return "Initialized";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,52 @@

package modelengine.fel.tool.mcp.client;

import modelengine.fel.tool.mcp.client.elicitation.ElicitRequest;
import modelengine.fel.tool.mcp.client.elicitation.ElicitResult;
import modelengine.fitframework.inspection.Nullable;

import java.util.function.Function;

/**
* Indicates the factory of {@link McpClient}.
* <p>
* Each {@link McpClient} instance created by this factory is designed to connect to a single specified MCP server.
* Factory for creating {@link McpClient} instances with SSE or Streamable HTTP transport.
* <p>Each client connects to a single MCP server.</p>
*
* @author 季聿阶
* @since 2025-05-21
*/
public interface McpClientFactory {
/**
* Creates a {@link McpClient} instance.
* Creates a client with streamable HTTP transport.
*
* @param baseUri The base URI of the MCP server.
* @param sseEndpoint The SSE endpoint of the MCP server.
* @param elicitationFunction The function to handle {@link ElicitRequest} and return {@link ElicitResult}.
* If null, elicitation will not be supported in MCP client.
* @return The created {@link McpClient} instance.
*/
public McpClient createStreamable(String baseUri, String sseEndpoint,
@Nullable Function<ElicitRequest, ElicitResult> elicitationFunction);

/**
* Creates a client with SSE transport.
*
* @param baseUri The base URI of the MCP server.
* @param sseEndpoint The SSE endpoint of the MCP server.
* @param elicitationFunction The function to handle {@link ElicitRequest} and return {@link ElicitResult}.
* If null, elicitation will not be supported in MCP client.
* @return The created {@link McpClient} instance.
*/
public McpClient createSse(String baseUri, String sseEndpoint,
@Nullable Function<ElicitRequest, ElicitResult> elicitationFunction);

/**
* Creates a client with streamable HTTP transport.
*
* @param baseUri The base URI of the MCP server.
* @param sseEndpoint The SSE endpoint of the MCP server.
* @return The connected {@link McpClient} instance.
* @return The created {@link McpClient} instance.
* @deprecated Use {@link #createStreamable(String, String, Function)} instead.
*/
McpClient create(String baseUri, String sseEndpoint);
@Deprecated
public McpClient create(String baseUri, String sseEndpoint);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
* This file is a part of the ModelEngine Project.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/

package modelengine.fel.tool.mcp.client.elicitation;

import java.util.Map;

/**
* Represents an elicitation request from an MCP server.
* This is a simplified version that doesn't depend on MCP SDK types.
*
* @param message The message describing what information is needed from the user
* @param requestedSchema The JSON schema defining the expected data structure
* @author 黄可欣
* @since 2025-11-25
*/
public record ElicitRequest(String message, Map<String, Object> requestedSchema) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
* This file is a part of the ModelEngine Project.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/

package modelengine.fel.tool.mcp.client.elicitation;

import java.util.Map;

/**
* Represents the result of handling an elicitation request.
* This is a simplified version that doesn't depend on MCP SDK types.
*
* @param action The action to take
* @param content The user-provided data matching the requested schema
* @author 黄可欣
* @since 2025-11-25
*/
public record ElicitResult(Action action, Map<String, Object> content) {
/**
* Action types for elicitation results.
*/
public enum Action {
ACCEPT,
DECLINE,
CANCEL
}
}