Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Expose schema list in database
### Tools

- query_database
- Execute read-only operations against MariDB
- Execute read-only operations against MaraiDB

## dependency

Expand Down Expand Up @@ -120,3 +120,7 @@ Paths to Claude Desktop config file:
## License

This mcp server is licensed under the MIT license. please see the LICENSE file in the repository.

## Testing

Run using `> pytest` in your working directory
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies = [
"mysql-connector-python>=9.2.0",
"pytest>=8.3.4",
"python-dotenv>=1.0.1",
"sqlparse>=0.5.3",
]

[project.scripts]
Expand Down
52 changes: 47 additions & 5 deletions src/mcp_server_mariadb/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
import os
from contextlib import closing
from dataclasses import dataclass, field

import mariadb
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP

import sqlparse
from sqlparse.tokens import DML


load_dotenv()

mcp = FastMCP(
Expand Down Expand Up @@ -64,11 +67,50 @@ def get_connection():
print(f"Error connecting to MariaDB Platform: {e}")


def is_read_only_query(query: str) -> bool:
"""check if a query is read-only by examining its first word"""
first_word = query.strip().split()[0].upper()
def is_read_only_query(query: str) -> str:
# only allow readonly keywords
READ_ONLY_KEYWORDS = ("SELECT", "SHOW", "DESCRIBE", "DESC", "EXPLAIN")

cleaned = sqlparse.format(query, strip_comments=True, strip_whitespace=True, strip_newlines=True)
statements = sqlparse.parse(cleaned)



# evaluate each statement, to handle multi-statement query
for statement in statements:
first_token = next((token for token in statement.tokens if token.ttype is not token.is_whitespace), None)

# query is empty, invalid
if first_token is None:
return False

# keyword is not a valid DML
if (first_token.value.upper() not in READ_ONLY_KEYWORDS):
return False

tokens = cleaned.upper().split()
blacklisted_keywords = (
"INSERT",
"UPDATE",
"DELETE",
"REPLACE",
"CREATE",
"ALTER",
"DROP",
"INTO",
"TRUNCATE",
"LOAD_FILE",
"DUMPFILE",
"OUTFILE"
)

# if any tokens are in blacklisted keywords, return false
if any(token in tokens for token in blacklisted_keywords):
return False

return True


return first_word in READ_ONLY_KEYWORDS


@mcp.resource("schema://tables")
Expand Down
44 changes: 43 additions & 1 deletion tests/test_db.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,66 @@
from dotenv import load_dotenv
import pytest
from src.mcp_server_mariadb.server import get_connection, is_read_only_query

load_dotenv()


def test_db_connection():
connection = get_connection()

assert connection is not None

@pytest.mark.parametrize("query, expected", [

("SELECT * FROM users", True),
("SHOW TABLES", True),
("DESCRIBE users", True),
("DESC users", True),
("EXPLAIN SELECT * FROM users", True),

# EDGE CASE: fails when using multiple queries, the trailing query being non-read only
# This would have failed with earlier version
("SELECT * FROM users; DROP TABLE users;", False),

("""SELECT
employee_name,
department, status
FROM
Employee
WHERE
status = 'Active'
ORDER BY
employee_name ASC;""", True),

("""DELETE FROM Employee
WHERE status = 'Left';""", False),

("SELECT * FROM Employee; DROP TABLE Employee;", False),

("""
SELECT employee_name, department FROM Employee WHERE status = 'Active';
SELECT department, COUNT(*) FROM Employee GROUP BY department;""", True),

("""
SELECT employee_name, department FROM Employee WHERE status = 'Active';
DELETE FROM Employee WHERE status = 'Left';""", False),
])
def test_query(query, expected):
assert is_read_only_query(query) == expected


def test_is_read_only_query():
assert is_read_only_query("SELECT * FROM users")
assert is_read_only_query("SHOW TABLES")
assert is_read_only_query("DESCRIBE users")
assert is_read_only_query("DESC users")
assert is_read_only_query("EXPLAIN SELECT * FROM users")



def test_is_not_read_only_query():
assert not is_read_only_query(
"INSERT INTO users (name, email) VALUES ('John', '[email protected]');"
)
assert not is_read_only_query(
"INSERT INTO users (name, email) VALUES ('John', '[email protected]')"
)
Expand Down
14 changes: 13 additions & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.