11#!/usr/bin/env python3
22"""
3- Database Manager MCP Server
3+ Database MCP Server
44
5- A comprehensive MCP server for database operations.
5+ A comprehensive MCP server for database operations using FastMCP.
6+ Demonstrates:
7+ - SQL query execution
8+ - Database connection management
9+ - Type-safe operations
10+ - Resource providers
611"""
712
813import asyncio
914import sqlite3
10- from typing import Dict , Any
15+ from typing import Dict , List , Any , Optional
16+ from datetime import datetime
1117
12- class DatabaseMCPServer :
13- """Database Manager MCP Server"""
14-
15- def __init__ (self ):
16- self .connections = {}
18+ from mcp .server .fastmcp import FastMCP
19+ import mcp .types as types
20+ from pydantic import BaseModel
21+
22+ # Initialize FastMCP server
23+ mcp = FastMCP ("database_tools" )
24+
25+ class QueryResult (BaseModel ):
26+ """Model for query results"""
27+ columns : List [str ]
28+ rows : List [Dict [str , Any ]]
29+ row_count : int
30+ execution_time : float
31+
32+ class DatabaseConfig (BaseModel ):
33+ """Database configuration model"""
34+ type : str
35+ connection_string : str
36+ max_connections : int = 10
37+
38+ # System prompts
39+ @mcp .prompt ()
40+ def system_prompt () -> str :
41+ """Define the database assistant's role"""
42+ return """
43+ You are a database assistant that helps users interact with databases safely.
44+ Always validate queries before execution and use proper error handling.
45+ Never execute dangerous operations without confirmation.
46+ """
47+
48+ @mcp .prompt ()
49+ def error_prompt () -> str :
50+ """Handle database errors"""
51+ return """
52+ I encountered an error while working with the database.
53+ This could be due to:
54+ - Invalid SQL syntax
55+ - Connection issues
56+ - Permission problems
57+ - Resource constraints
58+ Please check your query and try again.
59+ """
60+
61+ # Database resources
62+ @mcp .resource ("db://{database}/{table}" )
63+ async def get_table_info (database : str , table : str ) -> Dict :
64+ """Get metadata about a database table"""
65+ try :
66+ conn = sqlite3 .connect (f"{ database } .db" )
67+ cursor = conn .cursor ()
68+ cursor .execute (f"PRAGMA table_info({ table } )" )
69+ columns = cursor .fetchall ()
1770
18- async def connect_database (self , database_type : str , connection_string : str ) -> Dict [str , Any ]:
19- """Connect to a database"""
20- try :
21- if database_type == "sqlite" :
22- conn = sqlite3 .connect (connection_string )
23- conn .row_factory = sqlite3 .Row
24- self .connections ["default" ] = {"type" : "sqlite" , "connection" : conn }
25-
26- return {
27- "success" : True ,
28- "message" : f"Connected to { database_type } database" ,
29- "database_type" : database_type
30- }
31- except Exception as e :
32- return {"success" : False , "error" : str (e )}
71+ return {
72+ "table" : table ,
73+ "columns" : [col [1 ] for col in columns ],
74+ "types" : [col [2 ] for col in columns ],
75+ "last_checked" : datetime .now ().isoformat ()
76+ }
77+ except Exception as e :
78+ return {"error" : str (e )}
79+
80+ # Database tools
81+ @mcp .tool ()
82+ async def execute_query (query : str , limit : int = 100 ) -> Dict :
83+ """
84+ Execute a SQL query safely
3385
34- async def execute_query (self , query : str , limit : int = 100 ) -> Dict [str , Any ]:
35- """Execute a SQL query"""
36- try :
37- if "default" not in self .connections :
38- return {"success" : False , "error" : "No database connection" }
86+ Args:
87+ query: SQL query to execute
88+ limit: Maximum number of rows to return
89+
90+ Returns:
91+ Dictionary with query results or error information
92+ """
93+ try :
94+ # In production, use connection pooling
95+ conn = sqlite3 .connect ("demo.db" )
96+ cursor = conn .cursor ()
97+
98+ start_time = datetime .now ()
99+ cursor .execute (query )
100+
101+ if query .lower ().strip ().startswith ("select" ):
102+ rows = cursor .fetchmany (limit )
103+ columns = [desc [0 ] for desc in cursor .description ]
104+ results = [dict (zip (columns , row )) for row in rows ]
39105
40- conn = self .connections ["default" ]["connection" ]
41- cursor = conn .cursor ()
42- cursor .execute (query )
106+ execution_time = (datetime .now () - start_time ).total_seconds ()
43107
44- if query .lower ().strip ().startswith ("select" ):
45- rows = cursor .fetchmany (limit )
46- columns = [desc [0 ] for desc in cursor .description ]
47- results = [dict (zip (columns , row )) for row in rows ]
48-
49- return {
50- "success" : True ,
51- "data" : results ,
52- "row_count" : len (results )
53- }
54- else :
55- conn .commit ()
56- return {
57- "success" : True ,
58- "message" : "Query executed successfully" ,
59- "rows_affected" : cursor .rowcount
108+ return {
109+ "success" : True ,
110+ "data" : QueryResult (
111+ columns = columns ,
112+ rows = results ,
113+ row_count = len (results ),
114+ execution_time = execution_time
115+ ).dict ()
116+ }
117+ else :
118+ conn .commit ()
119+ execution_time = (datetime .now () - start_time ).total_seconds ()
120+
121+ return {
122+ "success" : True ,
123+ "data" : {
124+ "rows_affected" : cursor .rowcount ,
125+ "execution_time" : execution_time
60126 }
61- except Exception as e :
62- return {"success" : False , "error" : str (e )}
127+ }
128+
129+ except Exception as e :
130+ return {
131+ "success" : False ,
132+ "error" : str (e ),
133+ "prompt" : "error_prompt"
134+ }
63135
64- async def main ():
65- """Demo the database MCP server"""
66- db_server = DatabaseMCPServer ()
67-
68- print ("🗄️ Database Manager MCP Server Demo" )
69- print ("=" * 40 )
70-
71- # Connect to demo database
72- result = await db_server .connect_database ("sqlite" , "demo.db" )
73- print (f"Connection: { result } " )
74-
75- # Create demo table
76- create_query = """
77- CREATE TABLE IF NOT EXISTS users (
78- id INTEGER PRIMARY KEY,
79- name TEXT,
80- email TEXT
81- )
136+ @mcp .tool ()
137+ async def get_table_schema (table : str ) -> Dict :
82138 """
83- result = await db_server .execute_query (create_query )
84- print (f"Create table: { result } " )
139+ Get schema information for a table
85140
86- # Insert data
87- insert_query = "INSERT INTO users (name, email) VALUES ('Alice', '[email protected] ')" 88- result = await db_server .execute_query (insert_query )
89- print (f"Insert: { result } " )
141+ Args:
142+ table: Name of the table
143+
144+ Returns:
145+ Dictionary with table schema information
146+ """
147+ try :
148+ # Get schema from resource
149+ schema = await get_table_info ("demo" , table )
150+
151+ if "error" in schema :
152+ raise Exception (schema ["error" ])
153+
154+ return {
155+ "success" : True ,
156+ "data" : schema
157+ }
158+
159+ except Exception as e :
160+ return {
161+ "success" : False ,
162+ "error" : str (e ),
163+ "prompt" : "error_prompt"
164+ }
165+
166+ @mcp .tool ()
167+ async def validate_query (query : str ) -> Dict :
168+ """
169+ Validate a SQL query without executing it
90170
91- # Query data
92- select_query = "SELECT * FROM users"
93- result = await db_server .execute_query (select_query )
94- print (f"Query results: { result } " )
171+ Args:
172+ query: SQL query to validate
173+
174+ Returns:
175+ Dictionary with validation results
176+ """
177+ try :
178+ conn = sqlite3 .connect (":memory:" )
179+ cursor = conn .cursor ()
180+
181+ # SQLite will parse but not execute in prepare mode
182+ cursor .execute (f"EXPLAIN QUERY PLAN { query } " )
183+ plan = cursor .fetchall ()
184+
185+ return {
186+ "success" : True ,
187+ "data" : {
188+ "is_valid" : True ,
189+ "query_plan" : plan ,
190+ "validation_time" : datetime .now ().isoformat ()
191+ }
192+ }
193+
194+ except Exception as e :
195+ return {
196+ "success" : False ,
197+ "error" : str (e ),
198+ "data" : {
199+ "is_valid" : False ,
200+ "validation_time" : datetime .now ().isoformat ()
201+ }
202+ }
95203
96204if __name__ == "__main__" :
97- asyncio .run (main ())
205+ # Run the MCP server
206+ print ("🗄️ Starting Database MCP Server..." )
207+ mcp .run (transport = "streamable-http" )
0 commit comments