Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions aiosqlite/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

__all__ = ["connect", "Connection", "Cursor"]

AuthorizerCallback = Callable[[int, str, str, str, str], int]

LOG = logging.getLogger("aiosqlite")


Expand Down Expand Up @@ -287,6 +289,45 @@ async def set_progress_handler(
async def set_trace_callback(self, handler: Callable) -> None:
await self._execute(self._conn.set_trace_callback, handler)

async def set_authorizer(
self, authorizer_callback: Optional[AuthorizerCallback]
) -> None:
"""
Set an authorizer callback to control database access.

The authorizer callback is invoked for each SQL statement that is prepared,
and controls whether specific operations are permitted. The callback function
receives five string arguments:

Args:
authorizer_callback: A callable that receives:
- action_code (int): The action to be authorized (e.g., SQLITE_READ)
- arg1 (str): First argument, meaning depends on action_code
- arg2 (str): Second argument, meaning depends on action_code
- db_name (str): Database name (e.g., "main", "temp")
- trigger_name (str): Name of trigger or view that is doing the access, or None

The callback should return:
- SQLITE_OK (0): Allow the operation
- SQLITE_DENY (1): Deny the operation, raise sqlite3.DatabaseError
- SQLITE_IGNORE (2): Treat operation as no-op

Pass None to remove the authorizer.

Example:
import sqlite3

async def restrict_drops(action_code, arg1, arg2, db_name, trigger_name):
# Deny all DROP operations
if action_code == sqlite3.SQLITE_DROP_TABLE:
return sqlite3.SQLITE_DENY
# Allow everything else
return sqlite3.SQLITE_OK

await conn.set_authorizer(restrict_drops)
"""
await self._execute(self._conn.set_authorizer, authorizer_callback)

async def iterdump(self) -> AsyncIterator[str]:
"""
Return an async iterator to dump the database in SQL text format.
Expand Down
18 changes: 18 additions & 0 deletions aiosqlite/tests/perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""
Simple perf tests for aiosqlite and the asyncio run loop.
"""
import sqlite3
import string
import tempfile
import time
Expand Down Expand Up @@ -124,6 +125,23 @@ async def test_inserts(self):
await db.execute("insert into perf (k) values (1), (2), (3)")
await db.commit()

@timed
async def test_inserts_authorized(self):
def deny_drops(action_code, arg1, arg2, db_name, trigger_name):
if action_code == sqlite3.SQLITE_DROP_TABLE:
return sqlite3.SQLITE_DENY
return sqlite3.SQLITE_OK

async with aiosqlite.connect(TEST_DB) as db:
await db.execute("create table perf (i integer primary key asc, k integer)")
await db.set_authorizer(deny_drops)
await db.commit()

while True:
yield
await db.execute("insert into perf (k) values (1), (2), (3)")
await db.commit()

@timed
async def test_insert_ids(self):
async with aiosqlite.connect(TEST_DB) as db:
Expand Down
35 changes: 35 additions & 0 deletions aiosqlite/tests/smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,41 @@ def callback(statement: str):
await db.execute("select 10")
self.assertIn("select 10", statements)

async def test_set_authorizer_deny_drops(self):
"""Test authorizer that denies DROP operations"""

def deny_drops(action_code, arg1, arg2, db_name, trigger_name):
if action_code == sqlite3.SQLITE_DROP_TABLE:
return sqlite3.SQLITE_DENY
return sqlite3.SQLITE_OK

async with aiosqlite.connect(self.db) as db:
await db.set_authorizer(deny_drops)

# Other operations should succeed
await db.execute("CREATE TABLE test_drop (id INTEGER)")
await db.execute("INSERT INTO test_drop VALUES (1)")
await db.execute("SELECT * FROM test_drop")

# DROP should fail
with self.assertRaises(sqlite3.DatabaseError):
await db.execute("DROP TABLE test_drop")

# Disabling the authorizer re-enables DROP
await db.set_authorizer(None)
await db.execute("DROP TABLE test_drop")

async def test_set_authorizer_exception_propagation(self):
"""Test that exceptions raised in authorizer callback are caught by SQLite"""

def raise_exception(action_code, arg1, arg2, db_name, trigger_name):
raise ValueError("Test exception from authorizer")

async with aiosqlite.connect(self.db) as db:
await db.set_authorizer(raise_exception)
with self.assertRaises(sqlite3.DatabaseError):
await db.execute("CREATE TABLE test_exception (id INTEGER)")

async def test_connect_error(self):
bad_db = Path("/something/that/shouldnt/exist.db")
with self.assertRaisesRegex(OperationalError, "unable to open database"):
Expand Down