Skip to content
Merged
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions mssql_python/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2451,6 +2451,99 @@
)
return True

def _bulkcopy(self, table_name: str, data, **kwargs):
"""
Perform bulk copy operation using Rust-based implementation.

Args:
table_name: Target table name
data: Iterable of tuples/lists containing row data
**kwargs: Additional options passed to the Rust bulkcopy method
- batch_size: Number of rows per batch (default: 1000)
- timeout: Timeout in seconds (default: 30)
- column_mappings: List of tuples mapping source column index to target column name

Returns:
Dictionary with rows_copied, batch_count, and elapsed_time

Raises:
ImportError: If mssql_py_core is not installed
ValueError: If parameters are invalid
RuntimeError: If connection string is not available
"""
try:
import mssql_py_core
except ImportError as exc:
raise ImportError(
"Bulk copy requires mssql_py_core Rust library. "
"Install from BCPRustWheel directory."
) from exc

# Validate inputs
if not table_name or not isinstance(table_name, str):
raise ValueError("table_name must be a non-empty string")

# Extract and validate kwargs with defaults
batch_size = kwargs.get("batch_size", 1000)
timeout = kwargs.get("timeout", 30)

if batch_size <= 0:
raise ValueError(f"batch_size must be positive, got {batch_size}")
if timeout <= 0:
raise ValueError(f"timeout must be positive, got {timeout}")

# Get and parse connection string
if not hasattr(self.connection, "connection_str"):
raise RuntimeError("Connection string not available for bulk copy")

params = {
k.strip().lower(): v.strip()
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of v.strip() on connection string values is incorrect. Connection strings may have intentional leading or trailing whitespace in values, especially in passwords or paths. Stripping these could cause authentication failures or incorrect configurations. Only the key should be normalized, not the value.

Suggested change
k.strip().lower(): v.strip()
k.strip().lower(): v

Copilot uses AI. Check for mistakes.
for pair in self.connection.connection_str.split(";")
if "=" in pair
for k, v in [pair.split("=", 1)]
}

if not params.get("server"):
raise ValueError("SERVER parameter is required in connection string")

# Build connection context for Rust library
trust_cert = params.get("trustservercertificate", "yes").lower() in ("yes", "true")
context = {
"server": params.get("server", "localhost"),
"database": params.get("database", "master"),
"user_name": params.get("uid", ""),
"password": params.get("pwd", ""),
"trust_server_certificate": trust_cert,
"encryption": "Optional",
}

logger.debug("Bulk copy connecting to %s/%s", context["server"], context["database"])
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log statement could potentially expose sensitive connection details. While it doesn't directly log credentials, logging connection details should be done with caution. Consider using sanitized logging similar to other parts of the codebase that handle connection strings.

Suggested change
logger.debug("Bulk copy connecting to %s/%s", context["server"], context["database"])
logger.debug("Bulk copy establishing connection")

Copilot uses AI. Check for mistakes.

rust_connection = None
rust_cursor = None
try:
rust_connection = mssql_py_core.PyCoreConnection(context)
rust_cursor = rust_connection.cursor()

logger.debug("Bulk copy to '%s' - batch_size=%d", table_name, batch_size)
result = rust_cursor.bulkcopy(table_name, iter(data), kwargs=kwargs)

logger.debug("Bulk copy completed - rows=%d", result.get("rows_copied", 0))
return result

except Exception as e:
logger.error("Bulk copy failed: %s - %s", type(e).__name__, str(e))
raise

finally:
# Clean up Rust resources
for resource in (rust_cursor, rust_connection):
if resource and hasattr(resource, "close"):
try:
resource.close()
except Exception:
pass

def __enter__(self):
"""
Enter the runtime context for the cursor.
Expand Down
Loading