diff --git a/EmployeeFullNames.bcp b/EmployeeFullNames.bcp new file mode 100644 index 000000000..a6a334e80 --- /dev/null +++ b/EmployeeFullNames.bcp @@ -0,0 +1,39 @@ +Abdul "Kalam" 00000000000084D1 +Narendra Modi 00000000000084D2 +Rahul Gandhi 00000000000084D3 +Rabindranath Tagore 00000000000084D4 +Donald Trump 00000000000084D5 +Satya Nadella 00000000000084D6 +Sam Altman 2025-01-28 05:15:30.0000000 00000000000084D7 Wide str 1 210 123456789 4.2743594E+27 +Jeff Bezos 2025-01-28 05:15:30.0000000 00000000000084D8 Wide str2 1 127 123456789 5.8486031E+35 +Jeff Bezos 2025-01-28 05:15:30.0000000 00000000000084D9 Wide str2 1 127 123456789 5.8486031E+35 +direct direct 00000000000084DC 12.34 +Harry Potter 2025-01-28 05:15:30.0000000 00000000000084DD Wide str3 1 127 123456789 12.34 12.34 34.12 +Harry Potter 2025-01-28 05:15:30.0000000 00000000000084DE Wide str3 1 127 123456789 12.34 12.34 12345 +Float Cast 2025-01-28 05:15:30.0000000 00000000000084DF Wide str3 1 127 123456789 12.34 12.34 0.567 +Float Cast 2025-01-28 05:15:30.0000000 00000000000084E0 Wide str3 1 127 123456789 12.34 12.34 +Float Cast 2025-01-28 05:15:30.0000000 00000000000084E1 Wide str3 1 127 123456789 12.34 12.34 +Test fix 2025-01-28 05:15:30.0000000 00000000000084E2 Wide str3 1 127 123456789 12.34 12.34 +Harry Potter2 2025-01-28 05:15:30.0000000 00000000000084E3 Wide str3 1 127 123456789 12.34 12.34 +floattest scientificNotation 2025-01-28 05:15:30.0000000 00000000000084E4 Wide str3 1 127 123456789 1.7899999E+10 12.34 +floattest scientificNotation 2025-01-28 05:15:30.0000000 00000000000084E5 Wide str3 1 127 123456789 12.34 12.34 +is_stmt_prepared test 2025-01-28 05:15:30.0000000 000000000000C355 Wide str3 1 127 123456789 12.34 12.34 +test approx Float 2025-01-28 05:15:30.0000000 000000000000C356 Wide str3 1 127 123456789 12.34 12.34 +test22 approx Float 2025-01-28 05:15:30.0000000 000000000000C357 Wide str3 1 127 123456789 12.34 12.34 +test22 approx Float 2025-01-28 05:15:30.0000000 000000000000C358 Wide str3 1 127 123456789 12.345 12.34 +FetchAll Test 2025-01-28 05:15:30.0000000 000000000000C359 Wide str3 1 127 123456789 12.345 12.34 +FetchAll Test 2025-01-28 05:15:30.0000000 000000000000C35A Wide str3 1 127 123456789 12.345 12.34 +FetchAll Test 2025-01-28 05:15:30.0000000 000000000000C35B Wide str3 1 127 123456789 12.345 12.34 +FetchAll Test 2025-01-28 05:15:30.0000000 000000000000C35C Wide str3 1 127 123456789 12.345 12.34 +FetchAll Test2 2025-01-28 05:15:30.0000000 000000000000C35D Wide str3 0 127 123456789 12.345 12.34 +FetchAll Test2 2025-01-28 05:15:30.0000000 000000000000C35E Wide str3 0 127 123456789 12.345 12.34 +FetchAll Test2 2025-01-28 05:15:30.0000000 000000000000C35F Wide str3 0 127 123456789 12.345 12.34 +1 Test 000000000000C36A 1.00000 +1 Test 000000000000C36B 1.00000 +Harry Potter 2025-01-28 05:15:30.0000000 00000000000084DA Wide str3 1 127 123456789 5.8486031E+35 12.34 + 2025-01-28 05:15:30.0000000 00000000000084E6 Wide str3 1 127 123456789 12.34 12.34 + 2025-01-28 05:15:30.0000000 00000000000084E7 Wide str3 1 127 123456789 12.34 12.34 +main check 2025-01-28 05:15:30.0000000 00000000000084E8 Wide str3 1 127 123456789 12.34 12.34 +is_stmt_prepared 00000000000084EA +is_stmt_prepared 05:15:30.0000000 00000000000084EB +is_stmt_prepared test 2025-01-28 05:15:30.0000000 00000000000084F6 Wide str3 1 127 123456789 12.34 12.34 diff --git a/EmployeeFullNames.fmt b/EmployeeFullNames.fmt new file mode 100644 index 000000000..f2c42d37c --- /dev/null +++ b/EmployeeFullNames.fmt @@ -0,0 +1,14 @@ +14.0 +12 +1 SQLCHAR 0 50 "\t" 1 FirstName SQL_Latin1_General_CP1_CI_AS +2 SQLCHAR 0 50 "\t" 2 LastName SQL_Latin1_General_CP1_CI_AS +3 SQLCHAR 0 11 "\t" 3 date_ "" +4 SQLCHAR 0 19 "\t" 4 time_ "" +5 SQLCHAR 0 16 "\t" 5 datetime_ "" +6 SQLCHAR 0 20 "\t" 6 wchar_ SQL_Latin1_General_CP1_CI_AS +7 SQLCHAR 0 1 "\t" 7 bool_ "" +8 SQLCHAR 0 5 "\t" 8 tinyint_ "" +9 SQLCHAR 0 21 "\t" 9 bigint_ "" +10 SQLCHAR 0 30 "\t" 10 float_ "" +11 SQLCHAR 0 30 "\t" 11 double_ "" +12 SQLCHAR 0 41 "\r\n" 12 numeric_ "" diff --git a/TestBCP.fmt b/TestBCP.fmt new file mode 100644 index 000000000..435607af6 --- /dev/null +++ b/TestBCP.fmt @@ -0,0 +1,4 @@ +14.0 +2 +1 SQLCHAR 0 200 "\t" 1 id SQL_Latin1_General_CP1_CI_AS +2 SQLCHAR 0 200 "\r\n" 2 names SQL_Latin1_General_CP1_CI_AS diff --git a/data_unicode.csv b/data_unicode.csv new file mode 100644 index 000000000..d13d48edb --- /dev/null +++ b/data_unicode.csv @@ -0,0 +1,5 @@ +1 Alice +2 Bob +3 Charlie +4 David +5 Eve \ No newline at end of file diff --git a/mssql_python/__init__.py b/mssql_python/__init__.py index a4f20a46e..eac1353e5 100644 --- a/mssql_python/__init__.py +++ b/mssql_python/__init__.py @@ -45,7 +45,11 @@ from .logging_config import setup_logging, get_logger # Constants -from .constants import ConstantsDDBC +from .constants import ConstantsDDBC, BCPControlOptions, BCPDataTypes + +# BCP +from .bcp_options import BCPOptions, ColumnFormat +from .bcp_main import BCPClient # GLOBALS # Read-Only @@ -53,6 +57,69 @@ paramstyle = "qmark" threadsafety = 1 +# Create direct variables for easier access to BCP data type constants Read-only +# Character/string types +SQLTEXT = BCPDataTypes.SQLTEXT.value +SQLVARCHAR = BCPDataTypes.SQLVARCHAR.value +SQLCHARACTER = BCPDataTypes.SQLCHARACTER.value +SQLBIGCHAR = BCPDataTypes.SQLBIGCHAR.value +SQLBIGVARCHAR = BCPDataTypes.SQLBIGVARCHAR.value +SQLNCHAR = BCPDataTypes.SQLNCHAR.value +SQLNVARCHAR = BCPDataTypes.SQLNVARCHAR.value +SQLNTEXT = BCPDataTypes.SQLNTEXT.value + +# Binary types +SQLBINARY = BCPDataTypes.SQLBINARY.value +SQLVARBINARY = BCPDataTypes.SQLVARBINARY.value +SQLBIGBINARY = BCPDataTypes.SQLBIGBINARY.value +SQLBIGVARBINARY = BCPDataTypes.SQLBIGVARBINARY.value +SQLIMAGE = BCPDataTypes.SQLIMAGE.value + +# Integer types +SQLBIT = BCPDataTypes.SQLBIT.value +SQLBITN = BCPDataTypes.SQLBITN.value +SQLINT1 = BCPDataTypes.SQLINT1.value +SQLINT2 = BCPDataTypes.SQLINT2.value +SQLINT4 = BCPDataTypes.SQLINT4.value +SQLINT8 = BCPDataTypes.SQLINT8.value +SQLINTN = BCPDataTypes.SQLINTN.value + +# Floating point types +SQLFLT4 = BCPDataTypes.SQLFLT4.value +SQLFLT8 = BCPDataTypes.SQLFLT8.value +SQLFLTN = BCPDataTypes.SQLFLTN.value + +# Decimal/numeric types +SQLDECIMAL = BCPDataTypes.SQLDECIMAL.value +SQLNUMERIC = BCPDataTypes.SQLNUMERIC.value +SQLDECIMALN = BCPDataTypes.SQLDECIMALN.value +SQLNUMERICN = BCPDataTypes.SQLNUMERICN.value + +# Money types +SQLMONEY = BCPDataTypes.SQLMONEY.value +SQLMONEY4 = BCPDataTypes.SQLMONEY4.value +SQLMONEYN = BCPDataTypes.SQLMONEYN.value + +# Date/time types +SQLDATETIME = BCPDataTypes.SQLDATETIME.value +SQLDATETIM4 = BCPDataTypes.SQLDATETIM4.value +SQLDATETIMN = BCPDataTypes.SQLDATETIMN.value +SQLDATEN = BCPDataTypes.SQLDATEN.value +SQLTIMEN = BCPDataTypes.SQLTIMEN.value +SQLDATETIME2N = BCPDataTypes.SQLDATETIME2N.value +SQLDATETIMEOFFSETN = BCPDataTypes.SQLDATETIMEOFFSETN.value + +# Special types +SQLUNIQUEID = BCPDataTypes.SQLUNIQUEID.value +SQLVARIANT = BCPDataTypes.SQLVARIANT.value +SQLUDT = BCPDataTypes.SQLUDT.value +SQLXML = BCPDataTypes.SQLXML.value +SQLTABLE = BCPDataTypes.SQLTABLE.value + +# BCP special values +SQL_VARLEN_DATA = BCPDataTypes.SQL_VARLEN_DATA.value +SQL_NULL_DATA = BCPDataTypes.SQL_NULL_DATA.value + from .pooling import PoolingManager def pooling(max_size=100, idle_timeout=600): # """ diff --git a/mssql_python/bcp_main.py b/mssql_python/bcp_main.py new file mode 100644 index 000000000..da37b2fbc --- /dev/null +++ b/mssql_python/bcp_main.py @@ -0,0 +1,268 @@ +import logging +from mssql_python.bcp_options import ( + BCPOptions, + BindData +) +from ddbc_bindings import BCPWrapper +from mssql_python.constants import BCPControlOptions +from typing import Optional, List, Any # Import Optional for type hints + +logger = logging.getLogger(__name__) # Add a logger instance + +# defining constants for BCP control options +SUPPORTED_DIRECTIONS = ("in", "out", "queryout") +# Define SQL_CHAR if not already available, e.g., from a constants module +SQL_CHAR = 1 +SQL_VARLEN_DATA = -10 # Define these constants for column binding +SQL_NULL_DATA = -1 + +class BCPClient: + """ + A client for performing bulk copy operations using the BCP (Bulk Copy Program) utility. + This class provides methods to initialize and execute BCP operations. + """ + + def __init__(self, connection): # connection is an instance of mssql_python.connection.Connection + """ + Initializes the BCPClient with a database connection. + Args: + connection: A mssql_python.connection.Connection object. + """ + logger.info("Initializing BCPClient.") + if connection is None: + logger.error("Connection object is None during BCPClient initialization.") + raise ValueError( + "A valid connection object is required to initialize BCPClient." + ) + + # Access the underlying C++ ddbc_bindings.Connection object + # stored in the _conn attribute of your Python Connection wrapper. + if not hasattr(connection, '_conn'): + logger.error("The provided Python connection object does not have the expected '_conn' attribute.") + raise TypeError("The Python Connection object is missing the '_conn' attribute holding the native C++ connection.") + + self.wrapper = BCPWrapper(connection._conn) + print(f"connection: {connection._conn}") + logger.info("BCPClient initialized successfully.") + + def sql_bulk_copy(self, options: BCPOptions, table: str = ""): # options is no longer optional + """ + Executes a bulk copy operation to or from a specified table or using a query. + + Args: + table (str): The name of the table (for 'in', 'out', 'format') or the query string (for 'queryout'). + options (BCPOptions): Configuration for the bulk copy operation. Must be provided. + The options.direction field dictates the BCP operation. + Raises: + ValueError: If 'table' is not provided, or if 'options' are invalid + or use a direction not supported by this client. + TypeError: If 'options' is not an instance of BCPOptions. + RuntimeError: If the BCPWrapper was not initialized. + """ + logger.info(f"Starting sql_bulk_copy for table/query: '{table}', direction: '{options.direction}'.") + if not table and options.direction != "queryout": + # If 'table' is empty and direction is not 'queryout', raise an error. + logger.error("Validation failed: 'table' (or query) not provided for sql_bulk_copy.") + raise ValueError( + "The 'table' name (or query for queryout) must be provided." + ) + + if not isinstance(options, BCPOptions): + logger.error(f"Validation failed: 'options' is not an instance of BCPOptions. Got type: {type(options)}.") + # This check is good practice, though type hints help statically. + raise TypeError("The 'options' argument must be an instance of BCPOptions.") + + # BCPClient can add its own operational constraints: + if options.direction not in SUPPORTED_DIRECTIONS: + logger.error(f"Validation failed: Unsupported BCP direction '{options.direction}'. Supported: {SUPPORTED_DIRECTIONS}") + raise ValueError( + f"BCPClient currently only supports directions: {', '.join(SUPPORTED_DIRECTIONS)}. " + f"Got '{options.direction}'." + ) + + current_options = options # Use the validated options directly + logger.debug(f"Using BCPOptions: {current_options}") + + if not self.wrapper: # Should be caught by __init__ ideally + logger.error("BCPWrapper was not initialized before calling sql_bulk_copy.") + raise RuntimeError("BCPWrapper was not initialized.") + + try: + if not current_options.use_memory_bcp or not current_options.bind_data: + # Standard file-based BCP initialization + logger.info( + f"Initializing BCP operation: table='{table}', data_file='{current_options.data_file}', " + f"error_file='{current_options.error_file}', direction='{current_options.direction}'" + ) + # Initialize BCP operation for file-based operations + self.wrapper.bcp_initialize_operation( + table, + current_options.data_file, + current_options.error_file, + current_options.direction, + ) + logger.debug("BCP operation initialized with BCPWrapper.") + else: + # In-memory BCP initialization (for bind/sendrow) + logger.info(f"Initializing in-memory BCP operation for table: '{table}'") + # For in-memory BCP, initialize with no data file + self.wrapper.bcp_initialize_operation( + table, + "", # No data file for in-memory BCP + current_options.error_file or "", + "in" # Always use "in" for in-memory BCP + ) + logger.debug("In-memory BCP operation initialized with BCPWrapper.") + + if current_options.query: + logger.debug(f"Setting BCPControlOptions.HINTS to '{current_options.query}'") + print(f"current_options.query: {current_options.query}") + self.wrapper.bcp_control( + BCPControlOptions.HINTS.value, current_options.query + ) + + # # Set BCP control options + # if current_options.batch_size is not None: + # logger.debug(f"Setting BCPControlOptions.BATCH_SIZE to {current_options.batch_size}") + # self.wrapper.bcp_control( + # BCPControlOptions.BATCH_SIZE.value, current_options.batch_size + # ) + # if current_options.max_errors is not None: + # logger.debug(f"Setting BCPControlOptions.MAX_ERRORS to {current_options.max_errors}") + # self.wrapper.bcp_control( + # BCPControlOptions.MAX_ERRORS.value, current_options.max_errors + # ) + # if current_options.first_row is not None: + # logger.debug(f"Setting BCPControlOptions.FIRST_ROW to {current_options.first_row}") + # self.wrapper.bcp_control( + # BCPControlOptions.FIRST_ROW.value, current_options.first_row + # ) + # if current_options.last_row is not None: + # logger.debug(f"Setting BCPControlOptions.LAST_ROW to {current_options.last_row}") + # self.wrapper.bcp_control( + # BCPControlOptions.LAST_ROW.value, current_options.last_row + # ) + # if current_options.code_page is not None: + # logger.debug(f"Setting BCPControlOptions.FILE_CODE_PAGE to {current_options.code_page}") + # self.wrapper.bcp_control( + # BCPControlOptions.FILE_CODE_PAGE.value, current_options.code_page + # ) + # if current_options.keep_identity: + # logger.debug("Setting BCPControlOptions.KEEP_IDENTITY to 1") + # self.wrapper.bcp_control(BCPControlOptions.KEEP_IDENTITY.value, 1) + # if current_options.keep_nulls: + # logger.debug("Setting BCPControlOptions.KEEP_NULLS to 1") + # self.wrapper.bcp_control(BCPControlOptions.KEEP_NULLS.value, 1) + # if current_options.hints: + # logger.debug(f"Setting BCPControlOptions.HINTS to '{current_options.hints}'") + # self.wrapper.bcp_control( + # BCPControlOptions.HINTS.value, current_options.hints + # ) + # if ( + # current_options.columns + # and current_options.columns[0].row_terminator is not None + # ): # Check if columns list is not empty + # logger.debug(f"Setting BCPControlOptions.SET_ROW_TERMINATOR to '{current_options.columns[0].row_terminator}'") + # self.wrapper.bcp_control( + # BCPControlOptions.SET_ROW_TERMINATOR.value, + # current_options.columns[0].row_terminator, + # ) + + # Handle format file or column definitions + if current_options.format_file: + logger.info(f"Reading format file: '{current_options.format_file}'") + self.wrapper.read_format_file(current_options.format_file) + elif current_options.columns: + logger.info(f"Defining {len(current_options.columns)} columns programmatically.") + self.wrapper.define_columns(len(current_options.columns)) + for i, col_fmt_obj in enumerate(current_options.columns): + logger.debug(f"Defining column format for file column {col_fmt_obj.file_col}: {col_fmt_obj}") + print(f"col_fmt_obj: {col_fmt_obj}") + + self.wrapper.define_column_format( + file_col_idx=col_fmt_obj.file_col, + user_data_type=col_fmt_obj.user_data_type, + indicator_length=col_fmt_obj.prefix_len, + user_data_length=col_fmt_obj.data_len, + terminator_bytes=col_fmt_obj.field_terminator, + terminator_length=col_fmt_obj.terminator_len, + server_col_idx=col_fmt_obj.server_col + ) + else: + logger.info("No format file or explicit column definitions provided. Relying on BCP defaults or server types.") + + # Handle in-memory BCP binding + if current_options.use_memory_bcp and current_options.bind_data: + # Check if bind_data is a list of lists (multiple rows) + is_multi_row = (len(current_options.bind_data) > 0 and + isinstance(current_options.bind_data[0], list)) + + if is_multi_row: + # Process multiple rows + row_count = len(current_options.bind_data) + logger.info(f"Processing {row_count} rows in memory") + + for row_idx, row_data in enumerate(current_options.bind_data): + logger.info(f"Processing row {row_idx+1} of {row_count}") + + # Bind each column in this row + col_count = len(row_data) + logger.info(f"Binding {col_count} columns for row {row_idx+1}") + + for bind_data in row_data: + logger.debug(f"Binding column {bind_data.server_col} with data type {bind_data.data_type}") + self.wrapper.bind_column( + data=bind_data.data, + indicator_length=bind_data.indicator_length, + data_length=bind_data.data_length, + terminator=bind_data.terminator, + terminator_length=bind_data.terminator_length, + data_type=bind_data.data_type, + server_col_idx=bind_data.server_col + ) + + # Send this row to the server + logger.info(f"Sending row {row_idx+1} to server") + self.wrapper.send_row() + + # Call finish to complete the batch + logger.info("Finishing BCP batch") + self.wrapper.finish() + else: + # Original single-row logic + logger.info(f"Binding data for {len(current_options.bind_data)} columns (single row)") + for bind_data in current_options.bind_data: + logger.debug(f"Binding column {bind_data.server_col} with data type {bind_data.data_type}") + self.wrapper.bind_column( + data=bind_data.data, + indicator_length=bind_data.indicator_length, + data_length=bind_data.data_length, + terminator=bind_data.terminator, + terminator_length=bind_data.terminator_length, + data_type=bind_data.data_type, + server_col_idx=bind_data.server_col + ) + + # For single-row in-memory BCP, send the row to the server + logger.info("Sending row to server") + self.wrapper.send_row() + + # Call finish to complete the batch + logger.info("Finishing BCP batch") + self.wrapper.finish() + else: + # For file-based BCP, execute and finish + logger.info("Executing BCP operation via wrapper.exec_bcp().") + self.wrapper.exec_bcp() + logger.info("BCP operation executed successfully.") + + except Exception as e: + logger.exception(f"An error occurred during BCP operation for table '{table}': {e}") + raise # Re-raise the exception after logging + finally: + if self.wrapper: + logger.info("Finishing and closing BCPWrapper.") + # self.wrapper.finish() + # self.wrapper.close() + logger.debug("BCPWrapper finished and closed.") + logger.info(f"sql_bulk_copy for table/query: '{table}' completed.") diff --git a/mssql_python/bcp_options.py b/mssql_python/bcp_options.py index 7dab82d55..56e95d8b7 100644 --- a/mssql_python/bcp_options.py +++ b/mssql_python/bcp_options.py @@ -1,6 +1,41 @@ from dataclasses import dataclass, field -from typing import List, Optional, Literal +from typing import List, Optional, Union, Any, Dict +from mssql_python.constants import BCPControlOptions +# defining constants for BCP control options +ALLOWED_DIRECTIONS = ("in", "out", "queryout") +ALLOWED_FILE_MODES = ("native", "char", "unicode") + +@dataclass +class BindData: + """ + Represents the data binding for a column in a bulk copy operation. + Used with bcp_bind API. + + Attributes: + data (Any): Pointer to the data to be copied. Can be primitive types or bytes. + indicator_length (int): Length of indicator in bytes (0, 1, 2, 4, or 8). + data_length (int): Count of bytes of data in the variable (can be SQL_VARLEN_DATA/SQL_NULL_DATA). + terminator (Optional[bytes]): Byte pattern marking the end of the variable, if any. + terminator_length (int): Count of bytes in the terminator. + data_type (int): The C data type of the variable (using SQL Server type tokens). + server_col (int): Ordinal position of the column in the database table (1-based). + """ + data: Any = None + indicator_length: int = 0 + data_length: int = 0 # Can be SQL_VARLEN_DATA or SQL_NULL_DATA + terminator: Optional[bytes] = None + terminator_length: int = 0 + data_type: int = 0 # SQL Server data type tokens + server_col: int = 0 # 1-based column number in table + + def __post_init__(self): + if self.indicator_length not in [0, 1, 2, 4, 8]: + raise ValueError("indicator_length must be 0, 1, 2, 4, or 8.") + if self.server_col <= 0: + raise ValueError("server_col must be a positive integer (1-based).") + if self.terminator is not None and not isinstance(self.terminator, bytes): + raise TypeError("terminator must be bytes or None.") @dataclass class ColumnFormat: @@ -23,12 +58,13 @@ class ColumnFormat: Must be a positive integer. """ - prefix_len: int - data_len: int + file_col: int = 1 + user_data_type: int = 0 + prefix_len: int = 0 + data_len: int = 0 field_terminator: Optional[bytes] = None - row_terminator: Optional[bytes] = None + terminator_len: int = 0 server_col: int = 1 - file_col: int = 1 def __post_init__(self): if self.prefix_len < 0: @@ -43,10 +79,6 @@ def __post_init__(self): self.field_terminator, bytes ): raise TypeError("field_terminator must be bytes or None.") - if self.row_terminator is not None and not isinstance( - self.row_terminator, bytes - ): - raise TypeError("row_terminator must be bytes or None.") @dataclass @@ -69,53 +101,106 @@ class BCPOptions: bulk_mode (str): Bulk mode ('native', 'char', 'unicode'). Option: (-n, -c, -w). Defaults to "native". columns (List[ColumnFormat]): Column formats. + bind_data (List[BindData]): Data bindings for in-memory BCP. """ - direction: Literal["in", "out"] - data_file: str # data_file is mandatory for 'in' and 'out' + direction: str + data_file: Optional[str] = None # data_file is mandatory for 'in' and 'out' error_file: Optional[str] = None format_file: Optional[str] = None - # write_format_file is removed as 'format' direction is not actively supported + query: Optional[str] = None # For 'query' direction + bulk_mode: Optional[str] = "native" # Default to 'native' mode batch_size: Optional[int] = None max_errors: Optional[int] = None first_row: Optional[int] = None last_row: Optional[int] = None - code_page: Optional[str] = None + code_page: Optional[Union[int, str]] = None + hints: Optional[str] = None + columns: Optional[List[ColumnFormat]] = field(default_factory=list) + bind_data: Union[List[BindData], List[List[BindData]]] = field(default_factory=list) # New field for bind data + row_terminator: Optional[bytes] = None keep_identity: bool = False keep_nulls: bool = False - hints: Optional[str] = None - bulk_mode: Literal["native", "char", "unicode"] = "native" - columns: List[ColumnFormat] = field(default_factory=list) + use_memory_bcp: bool = False # Flag to indicate if we're using in-memory BCP (bind and sendrow) def __post_init__(self): - if self.direction not in ["in", "out"]: - raise ValueError("direction must be 'in' or 'out'.") - if not self.data_file: - raise ValueError("data_file must be provided and non-empty for 'in' or 'out' directions.") - if self.error_file is None or not self.error_file: # Making error_file mandatory for in/out - raise ValueError("error_file must be provided and non-empty for 'in' or 'out' directions.") - - if self.format_file is not None and not self.format_file: - raise ValueError("format_file, if provided, must not be an empty string.") - if self.batch_size is not None and self.batch_size <= 0: - raise ValueError("batch_size must be a positive integer.") - if self.max_errors is not None and self.max_errors < 0: - raise ValueError("max_errors must be a non-negative integer.") - if self.first_row is not None and self.first_row <= 0: - raise ValueError("first_row must be a positive integer.") - if self.last_row is not None and self.last_row <= 0: - raise ValueError("last_row must be a positive integer.") - if self.last_row is not None and self.first_row is None: - raise ValueError("first_row must be specified if last_row is specified.") + if not self.direction: + raise ValueError("BCPOptions.direction is a required field.") + + if self.bind_data and not self.use_memory_bcp: + self.use_memory_bcp = True # Automatically set if bind_data is provided + + if self.use_memory_bcp and not self.bind_data: + raise ValueError( + "BCPOptions.bind_data must be provided when use_memory_bcp is True." + ) + + if self.direction not in ALLOWED_DIRECTIONS: + raise ValueError( + f"BCPOptions.direction '{self.direction}' is invalid. " + f"Allowed directions are: {', '.join(ALLOWED_DIRECTIONS)}." + ) + + # Add this validation for in-memory BCP requiring 'in' direction + if self.use_memory_bcp and self.direction != "in": + raise ValueError("in-memory BCP operations require direction='in'") + + # Handle in-memory BCP case separately + if self.use_memory_bcp: + if not self.bind_data: + raise ValueError( + "BCPOptions.bind_data must be provided when use_memory_bcp is True." + ) + # For in-memory BCP, data_file is not needed, but error_file is still useful + if not self.error_file: + raise ValueError("error_file must be provided even for in-memory BCP operations.") + else: + # Regular file-based BCP validation + if self.direction in ["in", "out"]: + if not self.data_file: + raise ValueError( + f"BCPOptions.data_file is required for file-based BCP direction '{self.direction}'." + ) + if not self.error_file: + raise ValueError("error_file must be provided for file-based BCP operations.") + + if self.direction == "queryout" and not self.query: + raise ValueError( + "BCPOptions.query is required for BCP direction 'query'." + ) + + if self.columns and self.format_file: + raise ValueError( + "Cannot specify both 'columns' (for bcp_colfmt) and 'format_file' (for bcp_readfmt). Choose one." + ) + + if isinstance(self.code_page, int) and self.code_page < 0: + raise ValueError( + "BCPOptions.code_page, if an integer, must be non-negative." + ) + + if self.bulk_mode not in ALLOWED_FILE_MODES: + raise ValueError( + f"BCPOptions.bulk_mode '{self.bulk_mode}' is invalid. " + f"Allowed modes are: {', '.join(ALLOWED_FILE_MODES)}." + ) + for attr_name in ["batch_size", "max_errors", "first_row", "last_row"]: + attr_value = getattr(self, attr_name) + if attr_value is not None and attr_value < 0: + raise ValueError( + f"BCPOptions.{attr_name} must be non-negative if specified. Got {attr_value}" + ) + if ( self.first_row is not None and self.last_row is not None - and self.last_row < self.first_row + and self.first_row > self.last_row + ): + raise ValueError( + "BCPOptions.first_row cannot be greater than BCPOptions.last_row." + ) + + if self.row_terminator is not None and not isinstance( + self.row_terminator, bytes ): - raise ValueError("last_row must be greater than or equal to first_row.") - if self.code_page is not None and not self.code_page: - raise ValueError("code_page, if provided, must not be an empty string.") - if self.hints is not None and not self.hints: - raise ValueError("hints, if provided, must not be an empty string.") - if self.bulk_mode not in ["native", "char", "unicode"]: - raise ValueError("bulk_mode must be 'native', 'char', or 'unicode'.") + raise TypeError("row_terminator must be bytes or None.") \ No newline at end of file diff --git a/mssql_python/constants.py b/mssql_python/constants.py index aade503c7..713de41f2 100644 --- a/mssql_python/constants.py +++ b/mssql_python/constants.py @@ -116,3 +116,90 @@ class ConstantsDDBC(Enum): SQL_C_WCHAR = -8 SQL_NULLABLE = 1 SQL_MAX_NUMERIC_LEN = 16 + +class BCPControlOptions(Enum): + """ + Constants for BCP control options. + The values are the string representations expected by the BCP API. + """ + BATCH_SIZE = "BCPBATCH" + MAX_ERRORS = "BCPMAXERRS" + FIRST_ROW = "BCPFIRST" + LAST_ROW = "BCPLAST" + FILE_CODE_PAGE = "BCPFILECP" + KEEP_IDENTITY = "BCPKEEPIDENTITY" + KEEP_NULLS = "BCPKEEPNULLS" + HINTS = "BCPHINTS" + SET_ROW_TERMINATOR = "BCPSETROWTERM" + +class BCPDataTypes(Enum): + """ + SQL Server data type constants for BCP operations. + These are the native SQL Server data type tokens used with bcp_bind. + """ + # Character/string types + SQLTEXT = 35 + SQLVARCHAR = 39 + SQLCHARACTER = 47 + SQLBIGCHAR = 175 + SQLBIGVARCHAR = 167 + SQLNCHAR = 239 + SQLNVARCHAR = 231 + SQLNTEXT = 99 + + # Binary types + SQLBINARY = 45 + SQLVARBINARY = 37 + SQLBIGBINARY = 173 + SQLBIGVARBINARY = 165 + SQLIMAGE = 34 + + # Integer types + SQLBIT = 50 + SQLBITN = 104 + SQLINT1 = 48 + SQLINT2 = 52 + SQLINT4 = 56 + SQLINT8 = 127 + SQLINTN = 38 + + # Floating point types + SQLFLT4 = 59 + SQLFLT8 = 62 + SQLFLTN = 109 + + # Decimal/numeric types + SQLDECIMAL = 106 + SQLNUMERIC = 108 + SQLDECIMALN = 106 + SQLNUMERICN = 108 + + # Money types + SQLMONEY = 60 + SQLMONEY4 = 122 + SQLMONEYN = 110 + + # Date/time types + SQLDATETIME = 61 + SQLDATETIM4 = 58 + SQLDATETIMN = 111 + SQLDATEN = 40 + SQLTIMEN = 41 + SQLDATETIME2N = 42 + SQLDATETIMEOFFSETN = 43 + + # Special types + SQLUNIQUEID = 36 + SQLVARIANT = 98 + SQLUDT = 240 + SQLXML = 241 + SQLTABLE = 243 + + # BCP special values + SQL_VARLEN_DATA = -10 + SQL_NULL_DATA = -1 + + # BCP direction codes + BCP_IN = 1 + BCP_OUT = 2 + BCP_QUERYOUT = 3 diff --git a/mssql_python/libs/x64/msodbcsql18.dll b/mssql_python/libs/x64/msodbcsql18.dll index 7734343c2..0fca0d066 100644 Binary files a/mssql_python/libs/x64/msodbcsql18.dll and b/mssql_python/libs/x64/msodbcsql18.dll differ diff --git a/mssql_python/pybind/CMakeLists.txt b/mssql_python/pybind/CMakeLists.txt index 4b4c1f990..1b7b2566d 100644 --- a/mssql_python/pybind/CMakeLists.txt +++ b/mssql_python/pybind/CMakeLists.txt @@ -190,7 +190,7 @@ else() set(DDBC_SOURCE "ddbc_bindings.cpp") message(STATUS "Using standard source file: ${DDBC_SOURCE}") # Include connection module for Windows - add_library(ddbc_bindings MODULE ${DDBC_SOURCE} connection/connection.cpp connection/connection_pool.cpp) + add_library(ddbc_bindings MODULE ${DDBC_SOURCE} connection/connection.cpp connection/connection_pool.cpp bcp/bcp_wrapper.cpp) endif() # Set the output name to include Python version and architecture @@ -231,6 +231,7 @@ message(STATUS "Using ODBC include directory: ${ODBC_INCLUDE_DIR}") target_include_directories(ddbc_bindings PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} # Root directory (for ddbc_bindings.h) ${CMAKE_CURRENT_SOURCE_DIR}/connection # connection directory (for connection.h) + ${CMAKE_CURRENT_SOURCE_DIR}/bcp # connection directory (for bcp_wrapper.h) ${PYTHON_INCLUDE_DIR} ${PYBIND11_INCLUDE_DIR} ) diff --git a/mssql_python/pybind/bcp/bcp_wrapper.cpp b/mssql_python/pybind/bcp/bcp_wrapper.cpp new file mode 100644 index 000000000..690cb20e7 --- /dev/null +++ b/mssql_python/pybind/bcp/bcp_wrapper.cpp @@ -0,0 +1,598 @@ +#include "bcp_wrapper.h" // Includes ddbc_bindings.h (and thus sql.h, sqlext.h, , ) and connection.h + +// Pybind11 headers (needed for py::cast and potentially std::optional if used with pybind types) +#include +#include +#include // Ensure this is included for py::bytes if not transitively + +// Standard C++ headers (not covered by ddbc_bindings.h) +#include +#include +#include +#include // For std::to_string, std::string +#include // Added for std::cout + +namespace py = pybind11; // Alias for pybind11 namespace + +// Helper to manage BCP properties for bcp_control +enum class BCPCtrlPropType { INT, WSTRING }; +struct BCPCtrlPropertyInfo { + INT option_code; + BCPCtrlPropType type; +}; + +// Map property names to their ODBC codes and types +const std::unordered_map bcp_control_properties = { + {L"BCPMAXERRS", {BCPMAXERRS, BCPCtrlPropType::INT}}, + {L"BCPBATCH", {BCPBATCH, BCPCtrlPropType::INT}}, + {L"BCPKEEPNULLS", {BCPKEEPNULLS, BCPCtrlPropType::INT}}, + {L"BCPKEEPIDENTITY",{BCPKEEPIDENTITY, BCPCtrlPropType::INT}}, + {L"BCPHINTS", {BCPHINTS, BCPCtrlPropType::WSTRING}}, + {L"BCPFILECP", {BCPFILECP, BCPCtrlPropType::INT}}, + // Example if you were to add global terminators via bcp_control: + // {L"BCPFIELDTERM", {BCPFIELDTERM_Constant, BCPCtrlPropType::BYTES}}, // Assuming BCPFIELDTERM_Constant is the ODBC int + // {L"BCPROWTERM", {BCPROWTERM_Constant, BCPCtrlPropType::BYTES}}, // Assuming BCPROWTERM_Constant is the ODBC int +}; + +// Helper for bcp_init direction string +INT get_bcp_direction_code(const std::wstring& direction_str) { + if (direction_str == L"in") return DB_IN; + if (direction_str == L"out" || direction_str == L"queryout") return DB_OUT; + throw std::runtime_error("Invalid BCP direction string: " + py::cast(direction_str).cast()); +} + +// Helper function (can be a static private method or a lambda in C++11 and later) +// to retrieve ODBC diagnostic messages and populate ErrorInfo. +// This uses SQLGetDiagRec_ptr directly, which is loaded by DriverLoader. +static ErrorInfo get_odbc_diagnostics_for_handle(SQLSMALLINT handle_type, SQLHANDLE handle) { + ErrorInfo error_info; + error_info.sqlState = L""; // Initialize + error_info.ddbcErrorMsg = L""; // Initialize + + if (!SQLGetDiagRec_ptr) { + std::cout << "get_odbc_diagnostics_for_handle: SQLGetDiagRec_ptr is null." << std::endl; + error_info.ddbcErrorMsg = L"SQLGetDiagRec_ptr not loaded. Cannot retrieve diagnostics."; + return error_info; + } + + SQLWCHAR sql_state_w[6]; + SQLINTEGER native_error; + SQLWCHAR message_text_w[SQL_MAX_MESSAGE_LENGTH]; + SQLSMALLINT text_length; + SQLSMALLINT rec_number = 1; + std::wstring combined_messages; + + while (SQLGetDiagRec_ptr(handle_type, handle, rec_number, sql_state_w, &native_error, + message_text_w, SQL_MAX_MESSAGE_LENGTH, &text_length) == SQL_SUCCESS) { + if (rec_number == 1) { + error_info.sqlState = std::wstring(sql_state_w); + } + if (!combined_messages.empty()) { + combined_messages += L" | "; + } + combined_messages += std::wstring(message_text_w, text_length) + L" (Native: " + std::to_wstring(native_error) + L")"; + rec_number++; + } + error_info.ddbcErrorMsg = combined_messages; + if (combined_messages.empty() && rec_number == 1) { // No records found + error_info.ddbcErrorMsg = L"No ODBC diagnostic records found for the handle."; + } + return error_info; +} + +BCPWrapper::BCPWrapper(ConnectionHandle& conn) // Changed to Connection& + : _bcp_initialized(false), _bcp_finished(true) { // Initialize reference + try { + _hdbc = conn.getConnection()->getDbcHandle()->get(); + if (!_hdbc || _hdbc == SQL_NULL_HDBC) { + std::cout << "BCPWrapper Error: Invalid HDBC from Connection object." << std::endl; + throw std::runtime_error("BCPWrapper: Invalid HDBC from Connection object."); + } + } catch (const std::runtime_error& e) { + // Re-throw with more context or just let it propagate + throw std::runtime_error(std::string("BCPWrapper Constructor: Failed to get valid HDBC - ") + e.what()); + } +} + +BCPWrapper::~BCPWrapper() { + std::cout << "BCPWrapper: Destructor called." << std::endl; + // try { + // close(); + // } catch (const std::exception& e) { + // std::cout << "BCPWrapper Error: Exception in destructor: " << e.what() << std::endl; + // } catch (...) { + // std::cout << "BCPWrapper Error: Unknown exception in destructor." << std::endl; + // } + std::cout << "BCPWrapper: Destructor finished." << std::endl; +} + +SQLRETURN BCPWrapper::bcp_initialize_operation(const std::wstring& table, + const std::wstring& data_file, + const std::wstring& error_file, + const std::wstring& direction) { + if (_bcp_initialized) { + std::cout << "BCPWrapper Warning: bcp_initialize_operation called but BCP already initialized. Call finish() or close() first." << std::endl; + return SQL_ERROR; + } + + INT dir_code = get_bcp_direction_code(direction); + + LPCWSTR pTable = table.empty() ? nullptr : table.c_str(); + LPCWSTR pDataFile = data_file.empty() ? nullptr : data_file.c_str(); + LPCWSTR pErrorFile = error_file.empty() ? nullptr : error_file.c_str(); + + // std::cout << "BCPWrapper: Calling bcp_initW for table '" << py::cast(table).cast() + // << "', data_file '" << (pDataFile ? py::cast(data_file).cast() : "nullptr") + // << "', error_file '" << (pErrorFile ? py::cast(error_file).cast() : "nullptr") + // << "', direction '" << py::cast(direction).cast() << "'." << std::endl; + // std::cout << "BCPWrapper: BCPInitW_ptr: " << (BCPInitW_ptr ? "Loaded" : "Not Loaded") << std::endl; + // std::cout << "BCPWrapper: BCPInitW_ptr val" << BCPInitW_ptr << std::endl; + + std::cout << "BCPWrapper: Calling BCPInitW_ptr with HDBC: " << _hdbc << ", table: " + << (pTable ? py::cast(table).cast() : "nullptr") + << ", data_file: " << (pDataFile ? py::cast(data_file).cast() : "nullptr") + << ", error_file: " << (pErrorFile ? py::cast(error_file).cast() : "nullptr") + << ", direction code: " << dir_code << std::endl; + // Call BCPInitW with the correct parameters + SQLRETURN ret = BCPInitW_ptr(_hdbc, pTable, pDataFile, pErrorFile, dir_code); + std::cout << "BCPWrapper: HELLOOOO " << ret << std::endl; + + if (ret != FAIL) { + _bcp_initialized = true; + _bcp_finished = false; + std::cout << "BCPWrapper: bcp_initW successful." << std::endl; + } else { + std::cout << "BCPWrapper Error: bcp_initW failed. Ret: " << ret << std::endl; + } + return ret; +} + +SQLRETURN BCPWrapper::bcp_control(const std::wstring& property_name, int value) { + if (!_bcp_initialized || _bcp_finished) { + std::cout << "BCPWrapper Warning: bcp_control(int) called in invalid state (not initialized or already finished)." << std::endl; + // Throw an exception instead of returning SQL_ERROR for better Python-side error handling + throw std::runtime_error("BCPWrapper: bcp_control(int) called in invalid state."); + } + + auto it = bcp_control_properties.find(property_name); + if (it == bcp_control_properties.end() || it->second.type != BCPCtrlPropType::INT) { + std::string msg = "BCPWrapper Error: bcp_control(int) - property '" + py::cast(property_name).cast() + "' not found or type mismatch."; + std::cout << msg << std::endl; + throw std::runtime_error(msg); + } + + std::cout << "BCPWrapper: Calling bcp_controlW for property '" << py::cast(property_name).cast() << "' with int value " << value << "." << std::endl; + // Correctly pass integer values to bcp_control. + // The third argument (pvValue) for integer options is typically the value itself, cast to LPVOID, + // or a pointer to the value. For BCP options like BCPMAXERRS, BCPBATCH, BCPKEEPNULLS, BCPKEEPIDENTITY, + // passing the value directly cast to LPVOID after ensuring it's the correct size (e.g. SQLLEN) is common. + SQLRETURN ret = BCPControlW_ptr(_hdbc, it->second.option_code, (LPVOID)(SQLLEN)value); + if (ret == FAIL) { + std::string msg = "BCPWrapper Error: bcp_controlW (int value) failed for property '" + py::cast(property_name).cast() + "'. Ret: " + std::to_string(ret); + ErrorInfo diag = get_odbc_diagnostics_for_handle(SQL_HANDLE_DBC, _hdbc); + msg += " ODBC Diag: SQLState: " + py::cast(diag.sqlState).cast() + ", Message: " + py::cast(diag.ddbcErrorMsg).cast(); + std::cout << msg << std::endl; + throw std::runtime_error(msg); + } + return ret; +} + +SQLRETURN BCPWrapper::bcp_control(const std::wstring& property_name, const std::wstring& value) { + if (!_bcp_initialized || _bcp_finished) { + std::cout << "BCPWrapper Warning: bcp_control(wstring) called in invalid state." << std::endl; + return SQL_ERROR; + } + + auto it = bcp_control_properties.find(property_name); + std::cout << "BCPWrapper: bcp_control(wstring) called for property '" << py::cast(property_name).cast() << "'." << std::endl; + // Check if the property exists and is of type WSTRING + // Note: For WSTRING properties, we expect the value to be a wide string (std::wstring). + // If the property is not found or is not of type WSTRING, we return an error. + std::cout << "BCPWrapper: bcp_control value: '" << py::cast(value).cast() << "'." << std::endl; + std::cout << "BCPWrapper: bcp_control property name: '" << py::cast(property_name).cast() << "'." << std::endl; + if (it == bcp_control_properties.end() || it->second.type != BCPCtrlPropType::WSTRING) { + std::cout << "BCPWrapper Error: bcp_control(wstring) - property '" << py::cast(property_name).cast() << "' not found or type mismatch." << std::endl; + return SQL_ERROR; + } + + std::cout << "BCPWrapper: Calling bcp_controlW for property '" << py::cast(property_name).cast() << "' with wstring value '" << py::cast(value).cast() << "'." << std::endl; + std::string narrow_value = py::cast(value).cast(); // Convert wstring to string for logging + SQLRETURN ret = BCPControlW_ptr(_hdbc, it->second.option_code, (LPVOID)narrow_value.c_str()); + if (ret == FAIL) { + std::cout << "BCPWrapper Error: bcp_controlW (wstring value) failed for property '" << py::cast(property_name).cast() << "'. Ret: " << ret << std::endl; + ErrorInfo diag = get_odbc_diagnostics_for_handle(SQL_HANDLE_DBC, _hdbc); + std::cout << "BCPWrapper: ODBC Diag: SQLState: " << py::cast(diag.sqlState).cast() << ", Message: " << py::cast(diag.ddbcErrorMsg).cast() << std::endl; + throw std::runtime_error("BCPWrapper: bcp_controlW (wstring value) failed."); + } + return ret; +} + +SQLRETURN BCPWrapper::read_format_file(const std::wstring& file_path) { + if (!_bcp_initialized || _bcp_finished) { + std::cout << "BCPWrapper Warning: read_format_file called in invalid state." << std::endl; + return SQL_ERROR; + } + if (file_path.empty()) { + std::cout << "BCPWrapper Error: read_format_file - file path cannot be empty." << std::endl; + return SQL_ERROR; + } + + std::cout << "BCPWrapper: Calling bcp_readfmtW for file '" << py::cast(file_path).cast() << "'." << std::endl; + SQLRETURN ret = BCPReadFmtW_ptr(_hdbc, file_path.c_str()); + if (ret == FAIL) { + std::cout << "BCPWrapper Error: bcp_readfmtW failed for file '" << py::cast(file_path).cast() << "'. Ret: " << ret << std::endl; + } + return ret; +} + +SQLRETURN BCPWrapper::define_columns(int num_cols) { + if (!_bcp_initialized || _bcp_finished) { + std::cout << "BCPWrapper Warning: define_columns called in invalid state." << std::endl; + return SQL_ERROR; + } + if (num_cols <= 0) { + std::cout << "BCPWrapper Error: define_columns - invalid number of columns: " << num_cols << std::endl; + return SQL_ERROR; + } + + std::cout << "BCPWrapper: Calling bcp_columns with " << num_cols << " columns." << std::endl; + SQLRETURN ret = BCPColumns_ptr(_hdbc, num_cols); + if (ret == FAIL) { + std::cout << "BCPWrapper Error: bcp_columns failed for " << num_cols << " columns. Ret: " << ret << std::endl; + } + std::cout << "BCPWrapper: bcp_columns returned " << ret << std::endl; + return ret; +} + + +SQLRETURN BCPWrapper::define_column_format(int file_col_idx, + int user_data_type, + int indicator_length, + long long user_data_length, + const std::optional& terminator_bytes_py, + int terminator_length, + int server_col_idx) { + if (!_bcp_initialized || _bcp_finished) { + throw std::runtime_error("BCPWrapper: define_column_format called in invalid state."); + } + + const BYTE* pTerminator = nullptr; + std::string terminator_str_holder; + + if (terminator_bytes_py) { + terminator_str_holder = terminator_bytes_py->cast(); + if (!terminator_str_holder.empty()) { + std::cout << "BCPWrapper: Terminator bytes provided: " << py::cast(terminator_str_holder).cast() << std::endl; + pTerminator = reinterpret_cast(terminator_str_holder.data()); + std::cout << "BCPWrapper: Terminator pointer: " << pTerminator << std::endl; + std::cout << "BCPWRapper: Terminator pointer type: " << typeid(pTerminator).name() << std::endl; + std::cout << "Terminator content hex dump: "; + for (size_t i = 0; i < terminator_str_holder.size(); i++) { + std::cout << std::hex << (int)(unsigned char)terminator_str_holder[i] << " "; + } + std::cout << std::dec << std::endl; + } else { + std::cout << "Warning: Terminator string is empty!" << std::endl; + } + } else { + std::cout << "Warning: No terminator bytes provided!" << std::endl; + } + + DBINT bcp_user_data_len = static_cast(user_data_length); + + std::cout << "BCPWrapper: Calling bcp_colfmtW for file_col " << file_col_idx + << ", server_col " << server_col_idx + << ", user_data_type " << user_data_type + << ", indicator_len " << indicator_length + << ", user_data_len " << static_cast(bcp_user_data_len) + << ", terminator_len " << terminator_length + << ", terminator_ptr " << static_cast(pTerminator) << std::endl; + + std::cout << "BCPWrapper: user_data_type: " << static_cast(user_data_type) << std::endl; + + SQLRETURN ret = BCPColFmtW_ptr(_hdbc, + file_col_idx, + static_cast(user_data_type), + indicator_length, + bcp_user_data_len, + pTerminator, + terminator_length, + server_col_idx + ); + if (ret == FAIL) { + std::string msg = "BCPWrapper Error: bcp_colfmtW failed for file_col " + std::to_string(file_col_idx) + + ", server_col " + std::to_string(server_col_idx) + + ". Ret: " + std::to_string(ret); + ErrorInfo diag = get_odbc_diagnostics_for_handle(SQL_HANDLE_DBC, _hdbc); + msg += " ODBC Diag: SQLState: " + py::cast(diag.sqlState).cast() + ", Message: " + py::cast(diag.ddbcErrorMsg).cast(); + std::cout << msg << std::endl; + throw std::runtime_error(msg); + } + return ret; +} + +SQLRETURN BCPWrapper::exec_bcp() { + if (!_bcp_initialized || _bcp_finished) { + throw std::runtime_error("BCPWrapper: exec_bcp called in invalid state."); + } + + DBINT rows_copied_in_batch = 0; + std::cout << "BCPWrapper: Calling bcp_exec." << std::endl; + DBINT bcp_ret = BCPExec_ptr(_hdbc, &rows_copied_in_batch); + + if (bcp_ret == FAIL) { + std::string msg = "BCPWrapper Error: bcp_exec failed (returned -1). Rows in this batch (if any before error): " + std::to_string(static_cast(rows_copied_in_batch)); + ErrorInfo diag = get_odbc_diagnostics_for_handle(SQL_HANDLE_DBC, _hdbc); + msg += " ODBC Diag: SQLState: " + py::cast(diag.sqlState).cast() + ", Message: " + py::cast(diag.ddbcErrorMsg).cast(); + std::cout << msg << std::endl; + throw std::runtime_error(msg); + } + std::cout << "BCPWrapper: bcp_exec returned " << static_cast(bcp_ret) << ". Rows parameter output: " << static_cast(rows_copied_in_batch) << std::endl; + return SQL_SUCCESS; +} + +SQLRETURN BCPWrapper::finish() { + if (!_bcp_initialized) { + std::cout << "BCPWrapper Info: finish called but BCP not initialized. No action taken." << std::endl; + return SQL_SUCCESS; + } + if (_bcp_finished) { + std::cout << "BCPWrapper Info: finish called but BCP already finished. No action taken." << std::endl; + return SQL_SUCCESS; + } + + std::cout << "BCPWrapper: Calling bcp_done." << std::endl; + SQLRETURN ret = BCPDone_ptr(_hdbc); + if (ret != FAIL) { + _bcp_finished = true; + std::cout << "BCPWrapper: bcp_done successful." << std::endl; + } else { + std::cout << "BCPWrapper Error: bcp_done failed. Ret: " << ret << std::endl; + } + return ret; +} + +SQLRETURN BCPWrapper::close() { + std::cout << "BCPWrapper: close() called." << std::endl; + SQLRETURN ret = SQL_SUCCESS; + if (_bcp_initialized && !_bcp_finished) { + std::cout << "BCPWrapper: Active BCP operation found in close(), calling finish()." << std::endl; + ret = finish(); + } + return ret; +} + +// Add these new methods for bcp_bind and bcp_sendrow + +// Helper to allocate and copy data for different data types +template +T* AllocateAndCopyData(const py::object& data, std::vector>& buffers) { + // Create a shared pointer to hold our value + auto buffer = std::shared_ptr(new T()); + + try { + // For numeric types, handle possible Python type mismatches + if constexpr (std::is_same::value || std::is_same::value) { + // For float/double, accept integers or floats + if (py::isinstance(data)) { + *buffer = static_cast(data.cast()); + } else if (py::isinstance(data)) { + *buffer = data.cast(); + } else { + throw std::runtime_error("Cannot convert Python type to float/double"); + } + } + else if constexpr (std::is_integral::value) { + // For integer types, accept Python integers + if (py::isinstance(data)) { + *buffer = static_cast(data.cast()); + } else { + throw std::runtime_error("Cannot convert Python type to integral type"); + } + } + else { + // Direct cast for other types + *buffer = data.cast(); + } + } + catch (const py::cast_error& e) { + std::string msg = "Cast error: "; + msg += e.what(); + msg += " (Python type: "; + msg += py::str(py::type::of(data)).cast(); + msg += ")"; + throw std::runtime_error(msg); + } + + // Store the shared_ptr in the buffers vector to keep track of it + buffers.push_back(buffer); + + // Return the raw pointer for use with C APIs like bcp_bind + return buffer.get(); +} + +SQLRETURN BCPWrapper::bind_column(const py::object& data, + int indicator_length, + long long data_length, + const std::optional& terminator, + int terminator_length, + int data_type, + int server_col_idx) { + if (!_bcp_initialized || _bcp_finished) { + std::cout << "BCPWrapper Warning: bind_column called in invalid state." << std::endl; + throw std::runtime_error("BCPWrapper: bind_column called in invalid state."); + } + + LPCBYTE pData = nullptr; + LPCBYTE pTerm = nullptr; + std::string terminator_str_holder; + + // Process terminator bytes if provided + if (terminator) { + terminator_str_holder = terminator->cast(); + if (!terminator_str_holder.empty()) { + pTerm = reinterpret_cast(terminator_str_holder.data()); + } + } + + // Handle different data types and convert Python objects to C++ types + try { + // Note: The allocated memory will be freed in the destructor + if (py::isinstance(data)) { + // Handle string data - convert to either narrow or wide string based on data_type + if (data_type == 239 /* SQLNCHAR */ || + data_type == 231 /* SQLNVARCHAR */ || + data_type == 99 /* SQLNTEXT */) { + // For wide string types + std::wstring wstrValue = data.cast(); + auto buffer = std::shared_ptr(new wchar_t[wstrValue.length() + 1]); + wcscpy_s(buffer.get(), wstrValue.length() + 1, wstrValue.c_str()); + _data_buffers.push_back(buffer); + pData = reinterpret_cast(buffer.get()); + } else { + // For narrow string types (SQLCHAR, SQLVARCHAR, SQLTEXT, etc.) + std::string strValue = data.cast(); + auto buffer = std::shared_ptr(new char[strValue.length() + 1]); + strcpy_s(buffer.get(), strValue.length() + 1, strValue.c_str()); + _data_buffers.push_back(buffer); + pData = reinterpret_cast(buffer.get()); + } + } else if (py::isinstance(data) || py::isinstance(data)) { + // Handle binary data + std::string binValue = data.cast(); + auto buffer = std::shared_ptr(new unsigned char[binValue.length()]); + memcpy(buffer.get(), binValue.data(), binValue.length()); + _data_buffers.push_back(buffer); + pData = reinterpret_cast(buffer.get()); + } else if (py::isinstance(data)) { + // Handle integer types based on data_type + switch (data_type) { + case 48: // SQLINT1 + case 50: // SQLBIT + case 104: // SQLBITN + pData = reinterpret_cast(AllocateAndCopyData(data, _data_buffers)); + break; + case 52: // SQLINT2 + pData = reinterpret_cast(AllocateAndCopyData(data, _data_buffers)); + break; + case 56: // SQLINT4 + pData = reinterpret_cast(AllocateAndCopyData(data, _data_buffers)); + break; + case 127: // SQLINT8 + pData = reinterpret_cast(AllocateAndCopyData(data, _data_buffers)); + break; + case 38: // SQLINTN - need to determine size from indicator_length + if (indicator_length == 1) { + pData = reinterpret_cast(AllocateAndCopyData(data, _data_buffers)); + } else if (indicator_length == 2) { + pData = reinterpret_cast(AllocateAndCopyData(data, _data_buffers)); + } else if (indicator_length == 4) { + pData = reinterpret_cast(AllocateAndCopyData(data, _data_buffers)); + } else if (indicator_length == 8) { + pData = reinterpret_cast(AllocateAndCopyData(data, _data_buffers)); + } else { + // Default to int + pData = reinterpret_cast(AllocateAndCopyData(data, _data_buffers)); + } + break; + default: + pData = reinterpret_cast(AllocateAndCopyData(data, _data_buffers)); + } + } else if (py::isinstance(data) || py::isinstance(data)) { + // Handle float types - accept both float and int Python types + switch (data_type) { + case 59: // SQLFLT4 + pData = reinterpret_cast(AllocateAndCopyData(data, _data_buffers)); + break; + case 62: // SQLFLT8 + case 109: // SQLFLTN + default: + pData = reinterpret_cast(AllocateAndCopyData(data, _data_buffers)); + } + } else if (py::isinstance(data)) { + // Handle NULL values + if (indicator_length > 0) { + // Create indicator buffer with SQL_NULL_DATA + auto indicator_buffer = std::shared_ptr(new SQLLEN(-1)); // SQL_NULL_DATA is -1 + _data_buffers.push_back(indicator_buffer); + pData = reinterpret_cast(indicator_buffer.get()); + } else { + pData = nullptr; + } + } else { + // Default: try to convert to string + std::cout << "BCPWrapper Warning: Unknown data type, attempting to convert to string" << std::endl; + std::string strValue = py::str(data).cast(); + auto buffer = std::shared_ptr(new char[strValue.length() + 1]); + strcpy_s(buffer.get(), strValue.length() + 1, strValue.c_str()); + _data_buffers.push_back(buffer); + pData = reinterpret_cast(buffer.get()); + } + } catch (const std::exception& e) { + std::string error_msg = "BCPWrapper Error: Failed to convert Python data for binding: "; + error_msg += e.what(); + std::cout << error_msg << std::endl; + throw std::runtime_error(error_msg); + } + + // Call bcp_bind with the prepared data + std::cout << "BCPWrapper: Calling bcp_bind for column " << server_col_idx + << ", data_type " << data_type + << ", indicator_length " << indicator_length + << ", data_length " << data_length << std::endl; + + SQLRETURN ret = BCPBind_ptr(_hdbc, + pData, + indicator_length, + static_cast(data_length), + pTerm, + terminator_length, + data_type, + server_col_idx); + + if (ret == FAIL) { + std::string msg = "BCPWrapper Error: bcp_bind failed for column " + std::to_string(server_col_idx); + ErrorInfo diag = get_odbc_diagnostics_for_handle(SQL_HANDLE_DBC, _hdbc); + msg += " ODBC Diag: SQLState: " + py::cast(diag.sqlState).cast() + ", Message: " + py::cast(diag.ddbcErrorMsg).cast(); + std::cout << msg << std::endl; + throw std::runtime_error(msg); + } + + std::cout << "BCPWrapper: bcp_bind successful for column " << server_col_idx << std::endl; + + return ret; +} + +SQLRETURN BCPWrapper::send_row() { + if (!_bcp_initialized || _bcp_finished) { + std::cout << "BCPWrapper Warning: send_row called in invalid state." << std::endl; + throw std::runtime_error("BCPWrapper: send_row called in invalid state."); + } + + std::cout << "BCPWrapper: Calling bcp_sendrow" << std::endl; + SQLRETURN ret = BCPSendRow_ptr(_hdbc); + + std::cout << "BCPWrapper: bcp_sendrow returned " << ret << std::endl; + + if (ret == SQL_NO_DATA) { + std::cout << "BCPWrapper: bcp_sendrow returned SQL_NO_DATA, indicating no more rows to send." << std::endl; + return SQL_NO_DATA; // No more rows to send + } + if (ret == SQL_SUCCESS_WITH_INFO) { + std::cout << "BCPWrapper: bcp_sendrow returned SQL_SUCCESS_WITH_INFO, indicating a warning occurred." << std::endl; + // Handle warnings if needed, but still consider it a success + } + + + + if (ret == FAIL) { + std::string msg = "BCPWrapper Error: bcp_sendrow failed"; + ErrorInfo diag = get_odbc_diagnostics_for_handle(SQL_HANDLE_DBC, _hdbc); + msg += " ODBC Diag: SQLState: " + py::cast(diag.sqlState).cast() + ", Message: " + py::cast(diag.ddbcErrorMsg).cast(); + std::cout << msg << std::endl; + throw std::runtime_error(msg); + } + std::cout << "BCPWrapper: bcp_sendrow successful." << std::endl; + + return ret; +} \ No newline at end of file diff --git a/mssql_python/pybind/bcp/bcp_wrapper.h b/mssql_python/pybind/bcp/bcp_wrapper.h new file mode 100644 index 000000000..f86cbfee5 --- /dev/null +++ b/mssql_python/pybind/bcp/bcp_wrapper.h @@ -0,0 +1,94 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#pragma once + +#include "ddbc_bindings.h" // For SQLRETURN and other ODBC types/macros +#include "../connection/connection.h" // For the Connection class +#include // For std::optional +#include // For py::bytes + +class BCPWrapper { +public: + // Constructor: Requires a reference to an active Connection object. + // The BCPWrapper does not take ownership of the Connection object. + BCPWrapper(ConnectionHandle& conn); // Changed to Connection& + + // Destructor: Ensures BCP operations are properly terminated if active. + ~BCPWrapper(); + + // Initializes a BCP operation for a specific table, data file, error file, and direction. + // Maps to ODBC bcp_init. + SQLRETURN bcp_initialize_operation(const std::wstring& table, + const std::wstring& data_file, + const std::wstring& error_file, + const std::wstring& direction); + + // Sets various BCP control options using an integer value. + // Maps to ODBC bcp_control. + SQLRETURN bcp_control(const std::wstring& property_name, int value); + + // Sets various BCP control options using a string value. + // Maps to ODBC bcp_control. + SQLRETURN bcp_control(const std::wstring& property_name, const std::wstring& value); + + // Sets the bulk copy mode (e.g., "native", "char"). + // This might internally call bcp_control or affect column format definitions. + SQLRETURN set_bulk_mode(const std::wstring& mode, + const std::optional& field_terminator = std::nullopt, // Changed type + const std::optional& row_terminator = std::nullopt // Changed type + ); + + // Reads column format information from a BCP format file. + // Maps to ODBC bcp_readfmt. + SQLRETURN read_format_file(const std::wstring& file_path); + + // Specifies the total number of columns in the user data file. + // Maps to ODBC bcp_columns. + SQLRETURN define_columns(int num_cols); + + // Defines the format of data in the data file for a specific column. + // Maps to ODBC bcp_colfmt. + SQLRETURN define_column_format(int file_col_idx, // User data file column number (1-based), maps to idxUserDataCol + int user_data_type, // Data type in user file (e.g., SQLCHARACTER), maps to eUserDataType + int indicator_length, // Length of prefix/indicator (0, 1, 2, 4, 8, or SQL_VARLEN_DATA), maps to cbIndicator + long long user_data_length, // Max length of data in user file (bytes), maps to cbUserData + const std::optional& terminator_bytes_py, // Changed type + int terminator_length, // Length of terminator bytes (1 for single byte, 2 for double byte, etc.), maps to cbTerminator + int server_col_idx // Server table column number (1-based), maps to idxServerCol + ); + + // Executes the BCP operation, transferring data. + // Maps to ODBC bcp_exec. + SQLRETURN exec_bcp(); + + // Bind column data for in-memory BCP + // Maps to ODBC bcp_bind. + SQLRETURN bind_column(const py::object& data, // Python data object (will be converted to C++ data) + int indicator_length, // Length of indicator + long long data_length, // Length of data + const std::optional& terminator, // Optional terminator + int terminator_length, // Length of terminator + int data_type, // SQL data type + int server_col_idx); // Server column index (1-based) + + // Send the current row to the server + // Maps to ODBC bcp_sendrow. + SQLRETURN send_row(); + + // Completes the BCP operation and releases associated resources. + // Maps to ODBC bcp_done. + SQLRETURN finish(); + + // Optional method to explicitly close/cleanup BCP resources. + // May call finish() if BCP operation is still active. + SQLRETURN close(); + +private: + SQLHDBC _hdbc; + bool _bcp_initialized; // Flag to track if bcp_init has been called successfully + bool _bcp_finished; // Flag to track if bcp_finish (or bcp_done) has been called + + // Vector of allocated pointers that need to be freed in destructor + std::vector> _data_buffers; +}; \ No newline at end of file diff --git a/mssql_python/pybind/connection/connection.cpp b/mssql_python/pybind/connection/connection.cpp index b5184f4e6..4f6d6409f 100644 --- a/mssql_python/pybind/connection/connection.cpp +++ b/mssql_python/pybind/connection/connection.cpp @@ -10,6 +10,9 @@ #include #define SQL_COPT_SS_ACCESS_TOKEN 1256 // Custom attribute ID for access token +// #define SQL_COPT_SS_BCP 1219 // Custom attribute ID for BCP mode +// #define SQL_BCP_OFF 0 // BCP mode off +// #define SQL_BCP_ON 1 // BCP mode on static SqlHandlePtr getEnvHandle() { static SqlHandlePtr envHandle = []() -> SqlHandlePtr { @@ -196,6 +199,12 @@ void Connection::applyAttrsBefore(const py::dict& attrs) { ThrowStdException("Failed to set access token before connect"); } } + if (key == SQL_COPT_SS_BCP) { + SQLRETURN ret = setAttribute(key, py::reinterpret_borrow(item.second)); + if (!SQL_SUCCEEDED(ret)) { + ThrowStdException("Failed to set bcp before connect"); + } + } } } diff --git a/mssql_python/pybind/connection/connection.h b/mssql_python/pybind/connection/connection.h index b9cc50b68..404836d91 100644 --- a/mssql_python/pybind/connection/connection.h +++ b/mssql_python/pybind/connection/connection.h @@ -41,6 +41,10 @@ class Connection { // Allocate a new statement handle on this connection. SqlHandlePtr allocStatementHandle(); + // Get connection handle. + SqlHandlePtr getDbcHandle() const { + return _dbcHandle; + } private: void allocateDbcHandle(); @@ -66,6 +70,9 @@ class ConnectionHandle { void setAutocommit(bool enabled); bool getAutocommit() const; SqlHandlePtr allocStatementHandle(); + std::shared_ptr getConnection() const { + return _conn; + } private: std::shared_ptr _conn; diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index c9525e0a1..1713b552a 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -6,6 +6,7 @@ #include "ddbc_bindings.h" #include "connection/connection.h" #include "connection/connection_pool.h" +#include "bcp/bcp_wrapper.h" #include #include // std::setw, std::setfill @@ -117,6 +118,17 @@ SQLRowCountFunc SQLRowCount_ptr = nullptr; SQLGetStmtAttrFunc SQLGetStmtAttr_ptr = nullptr; SQLSetDescFieldFunc SQLSetDescField_ptr = nullptr; +// BCP APIs +BCPInitWFunc BCPInitW_ptr = nullptr; +BCPControlWFunc BCPControlW_ptr = nullptr; +BCPReadFmtWFunc BCPReadFmtW_ptr = nullptr; +BCPColumnsFunc BCPColumns_ptr = nullptr; +BCPColFmtWFunc BCPColFmtW_ptr = nullptr; +BCPExecFunc BCPExec_ptr = nullptr; +BCPDoneFunc BCPDone_ptr = nullptr; +BCPBindFunc BCPBind_ptr = nullptr; +BCPSendRowFunc BCPSendRow_ptr = nullptr; + // Data retrieval APIs SQLFetchFunc SQLFetch_ptr = nullptr; SQLFetchScrollFunc SQLFetchScroll_ptr = nullptr; @@ -620,6 +632,17 @@ std::wstring LoadDriverOrThrowException() { // Diagnostic record function Loading SQLGetDiagRec_ptr = (SQLGetDiagRecFunc)GetProcAddress(hModule, "SQLGetDiagRecW"); + // Load BCP functions + BCPInitW_ptr = (BCPInitWFunc)GetProcAddress(hModule, "bcp_initW"); + BCPControlW_ptr = (BCPControlWFunc)GetProcAddress(hModule, "bcp_control"); + BCPReadFmtW_ptr = (BCPReadFmtWFunc)GetProcAddress(hModule, "bcp_readfmtW"); + BCPColumns_ptr = (BCPColumnsFunc)GetProcAddress(hModule, "bcp_columns"); + BCPColFmtW_ptr = (BCPColFmtWFunc)GetProcAddress(hModule, "bcp_colfmt"); // Corrected from bcp_colfmtW to bcp_colfmt if that's the export name + BCPExec_ptr = (BCPExecFunc)GetProcAddress(hModule, "bcp_exec"); + BCPDone_ptr = (BCPDoneFunc)GetProcAddress(hModule, "bcp_done"); + BCPBind_ptr = (BCPBindFunc)GetProcAddress(hModule, "bcp_bind"); + BCPSendRow_ptr = (BCPSendRowFunc)GetProcAddress(hModule, "bcp_sendrow"); + bool success = SQLAllocHandle_ptr && SQLSetEnvAttr_ptr && SQLSetConnectAttr_ptr && SQLSetStmtAttr_ptr && SQLGetConnectAttr_ptr && SQLDriverConnect_ptr && SQLExecDirect_ptr && SQLPrepare_ptr && SQLBindParameter_ptr && SQLExecute_ptr && @@ -627,13 +650,25 @@ std::wstring LoadDriverOrThrowException() { SQLFetchScroll_ptr && SQLGetData_ptr && SQLNumResultCols_ptr && SQLBindCol_ptr && SQLDescribeCol_ptr && SQLMoreResults_ptr && SQLColAttribute_ptr && SQLEndTran_ptr && SQLFreeHandle_ptr && - SQLDisconnect_ptr && SQLFreeStmt_ptr && SQLGetDiagRec_ptr; + SQLDisconnect_ptr && SQLFreeStmt_ptr && SQLGetDiagRec_ptr && + BCPInitW_ptr && BCPControlW_ptr && BCPReadFmtW_ptr && + BCPColumns_ptr && BCPColFmtW_ptr && BCPExec_ptr && + BCPDone_ptr && BCPBind_ptr && BCPSendRow_ptr; if (!success) { ThrowStdException("Failed to load required function pointers from driver"); } LOG("Successfully loaded function pointers from driver"); + std::cout << "BCPBind_ptr = " << (void*)BCPBind_ptr << std::endl; + std::cout << "BCPSendRow_ptr = " << (void*)BCPSendRow_ptr << std::endl; + + // Try a simple test call to validate the function pointers + if (BCPBind_ptr && BCPSendRow_ptr) { + std::cout << "BCP function pointers loaded, attempting to validate..." << std::endl; + // For BCPBind_ptr and BCPSendRow_ptr, we can't easily validate without a valid HDBC + } + return dllDir; } @@ -1146,7 +1181,7 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p timestampValue.hour, timestampValue.minute, timestampValue.second, - timestampValue.fraction / 1000 // Convert back ns to µs + timestampValue.fraction / 1000 // Convert µs to ns ) ); } else { @@ -1579,6 +1614,7 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum } case SQL_BINARY: case SQL_VARBINARY: + case SQL_LONGVARBINARY: { // TODO: variable length data needs special handling, this logic wont suffice SQLULEN columnSize = columnMeta["ColumnSize"].cast(); @@ -1968,6 +2004,35 @@ PYBIND11_MODULE(ddbc_bindings, m) { .def("set_autocommit", &ConnectionHandle::setAutocommit) .def("get_autocommit", &ConnectionHandle::getAutocommit) .def("alloc_statement_handle", &ConnectionHandle::allocStatementHandle); + + // BCPWrapper bindings + py::class_>(m, "BCPWrapper") + .def(py::init()) + .def("bcp_initialize_operation", &BCPWrapper::bcp_initialize_operation) + .def("bcp_control", static_cast(&BCPWrapper::bcp_control), "Sets BCP control option with an integer value.") + .def("bcp_control", static_cast(&BCPWrapper::bcp_control), "Sets BCP control option with a string value.") + .def("read_format_file", &BCPWrapper::read_format_file) + .def("define_columns", &BCPWrapper::define_columns) + .def("define_column_format", &BCPWrapper::define_column_format, + py::arg("file_col_idx"), + py::arg("user_data_type"), + py::arg("indicator_length"), + py::arg("user_data_length"), + py::arg("terminator_bytes") = std::nullopt, + py::arg("terminator_length"), + py::arg("server_col_idx")) + .def("exec_bcp", &BCPWrapper::exec_bcp) + .def("finish", &BCPWrapper::finish) + .def("close", &BCPWrapper::close) + .def("bind_column", &BCPWrapper::bind_column, + py::arg("data"), + py::arg("indicator_length"), + py::arg("data_length"), + py::arg("terminator"), + py::arg("terminator_length"), + py::arg("data_type"), + py::arg("server_col_idx")) + .def("send_row", &BCPWrapper::send_row); m.def("enable_pooling", &enable_pooling, "Enable global connection pooling"); m.def("DDBCSQLExecDirect", &SQLExecDirect_wrap, "Execute a SQL query directly"); m.def("DDBCSQLExecute", &SQLExecute_wrap, "Prepare and execute T-SQL statements"); diff --git a/mssql_python/pybind/ddbc_bindings.h b/mssql_python/pybind/ddbc_bindings.h index bb050eab8..8fa323bc4 100644 --- a/mssql_python/pybind/ddbc_bindings.h +++ b/mssql_python/pybind/ddbc_bindings.h @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -47,6 +48,19 @@ typedef SQLRETURN (SQL_API* SQLRowCountFunc)(SQLHSTMT, SQLLEN*); typedef SQLRETURN (SQL_API* SQLSetDescFieldFunc)(SQLHDESC, SQLSMALLINT, SQLSMALLINT, SQLPOINTER, SQLINTEGER); typedef SQLRETURN (SQL_API* SQLGetStmtAttrFunc)(SQLHSTMT, SQLINTEGER, SQLPOINTER, SQLINTEGER, SQLINTEGER*); +// BCP APIs (Bulk Copy Program) +// Typedefs (ensure these match the function signatures in odbcss.h) +typedef SQLRETURN (SQL_API* BCPInitWFunc)(SQLHDBC, LPCWSTR, LPCWSTR, LPCWSTR, INT); +typedef SQLRETURN (SQL_API* BCPControlWFunc)(SQLHDBC, INT, LPVOID); +// typedef SQLRETURN (SQL_API* BCPControlAFunc)(SQLHDBC, INT, LPVOID); +typedef SQLRETURN (SQL_API* BCPReadFmtWFunc)(SQLHDBC, LPCWSTR); +typedef SQLRETURN (SQL_API* BCPColumnsFunc)(SQLHDBC, INT); +typedef SQLRETURN (SQL_API* BCPColFmtWFunc)(SQLHDBC, INT, INT, INT, DBINT, LPCBYTE, INT, INT); +typedef SQLRETURN (SQL_API* BCPExecFunc)(SQLHDBC, DBINT*); +typedef SQLRETURN (SQL_API* BCPDoneFunc)(SQLHDBC); +typedef SQLRETURN (SQL_API* BCPBindFunc)(HDBC, LPCBYTE, INT, DBINT, LPCBYTE, INT, INT, INT); +typedef SQLRETURN (SQL_API* BCPSendRowFunc)(HDBC); + // Data retrieval APIs typedef SQLRETURN (SQL_API* SQLFetchFunc)(SQLHANDLE); typedef SQLRETURN (SQL_API* SQLFetchScrollFunc)(SQLHANDLE, SQLSMALLINT, SQLLEN); @@ -116,6 +130,17 @@ extern SQLFreeStmtFunc SQLFreeStmt_ptr; // Diagnostic APIs extern SQLGetDiagRecFunc SQLGetDiagRec_ptr; +// BCP APIs (Bulk Copy Program) +// Extern function pointer declarations for BCP APIs +extern BCPInitWFunc BCPInitW_ptr; +extern BCPControlWFunc BCPControlW_ptr; +extern BCPReadFmtWFunc BCPReadFmtW_ptr; +extern BCPColumnsFunc BCPColumns_ptr; +extern BCPColFmtWFunc BCPColFmtW_ptr; +extern BCPExecFunc BCPExec_ptr; +extern BCPDoneFunc BCPDone_ptr; +extern BCPBindFunc BCPBind_ptr; +extern BCPSendRowFunc BCPSendRow_ptr; // Logging utility template diff --git a/tests/test_007_bcpOptions.py b/tests/test_007_bcpOptions.py new file mode 100644 index 000000000..c70598c03 --- /dev/null +++ b/tests/test_007_bcpOptions.py @@ -0,0 +1,443 @@ +import pytest +import os +import uuid +from pathlib import Path + +# Assuming your project structure allows these imports +from mssql_python import connect as mssql_connect # Alias to avoid conflict +from mssql_python.bcp_options import ColumnFormat, BCPOptions, BindData +from mssql_python import ( + SQLINT4, SQLVARCHAR, SQLNVARCHAR, SQL_VARLEN_DATA, SQL_NULL_DATA +) +from mssql_python.bcp_main import BCPClient + +# --- Constants for Tests --- +SQL_COPT_SS_BCP = 1219 # BCP connection attribute + +# --- Database Connection Details from Environment Variables --- +DB_CONNECTION_STRING = os.getenv("DB_CONNECTION_STRING") + +# Skip all tests in this file if connection string is not provided +pytestmark = pytest.mark.skipif( + not DB_CONNECTION_STRING, + reason="DB_CONNECTION_STRING environment variable must be set for BCP integration tests." +) + +def get_bcp_test_conn_str(): + """Returns the connection string.""" + if not DB_CONNECTION_STRING: + # This should ideally not be reached due to pytestmark, but as a safeguard: + pytest.skip("DB_CONNECTION_STRING is not set.") + return DB_CONNECTION_STRING + +@pytest.fixture(scope="function") +def bcp_db_setup_and_teardown(): + """ + Fixture to set up a BCP-enabled connection and a unique test table. + Yields (connection, table_name). + Cleans up the table afterwards. + """ + conn_str = get_bcp_test_conn_str() + table_name_uuid_part = str(uuid.uuid4()).replace('-', '')[:8] + table_name = f"dbo.pytest_bcp_table_{table_name_uuid_part}" + + conn = None + cursor = None + try: + conn = mssql_connect(conn_str, attrs_before={SQL_COPT_SS_BCP: 1}, autocommit=True) + cursor = conn.cursor() + + cursor.execute(f"IF OBJECT_ID('{table_name}', 'U') IS NOT NULL DROP TABLE {table_name};") + cursor.execute(f""" + CREATE TABLE {table_name} ( + id INT PRIMARY KEY, + data_col VARCHAR(255) NULL + ); + """) + yield conn, table_name + finally: + if cursor: + try: + cursor.close() + except Exception as e: + print(f"Warning: Error closing cursor during BCP test setup/teardown: {e}") + if conn: + cursor_cleanup = None + try: + cursor_cleanup = conn.cursor() + cursor_cleanup.execute(f"IF OBJECT_ID('{table_name}', 'U') IS NOT NULL DROP TABLE {table_name};") + except Exception as e: + print(f"Warning: Error during BCP test cleanup (dropping table {table_name}): {e}") + finally: + if cursor_cleanup: + try: + cursor_cleanup.close() + except Exception as e: + print(f"Warning: Error closing cleanup cursor: {e}") + conn.close() + +@pytest.fixture +def temp_file_pair(tmp_path): + """Provides a pair of temporary file paths for data and errors using pytest's tmp_path.""" + file_uuid_part = str(uuid.uuid4()).replace('-', '')[:8] + data_file = tmp_path / f"bcp_data_{file_uuid_part}.csv" + error_file = tmp_path / f"bcp_error_{file_uuid_part}.txt" + return data_file, error_file + +# --- Tests for bcp_options.py (Unit tests, no mocking needed) --- +class TestColumnFormat: + def test_valid_instantiation_defaults(self): + cf = ColumnFormat() + assert cf.prefix_len == 0 + assert cf.data_len == 0 + assert cf.field_terminator is None + assert cf.server_col == 1 + assert cf.file_col == 1 + assert cf.user_data_type == 0 + + def test_valid_instantiation_all_params(self): + cf = ColumnFormat( + prefix_len=1, data_len=10, field_terminator=b",", + server_col=2, file_col=3, user_data_type=10 + ) + assert cf.prefix_len == 1 + assert cf.data_len == 10 + assert cf.field_terminator == b"," + assert cf.server_col == 2 + assert cf.file_col == 3 + assert cf.user_data_type == 10 + + @pytest.mark.parametrize("attr, value", [ + ("prefix_len", -1), ("data_len", -1), ("server_col", 0), + ("server_col", -1), ("file_col", 0), ("file_col", -1), + ]) + def test_invalid_numeric_values(self, attr, value): + with pytest.raises(ValueError): + ColumnFormat(**{attr: value}) + + @pytest.mark.parametrize("attr, value", [ + ("field_terminator", ","), + ]) + def test_invalid_terminator_types(self, attr, value): + with pytest.raises(TypeError): + ColumnFormat(**{attr: value}) + +class TestBCPOptions: + _dummy_data_file = "dummy_data.csv" + _dummy_error_file = "dummy_error.txt" + + def test_valid_instantiation_in_minimal(self): + opts = BCPOptions(direction="in", data_file=self._dummy_data_file, error_file=self._dummy_error_file) + assert opts.direction == "in" + assert opts.data_file == self._dummy_data_file + assert opts.error_file == self._dummy_error_file + assert opts.bulk_mode == "native" + + def test_valid_instantiation_out_full(self): + cols = [ColumnFormat(file_col=1, server_col=1, field_terminator=b'\t')] + opts = BCPOptions( + direction="out", data_file="output.csv", error_file="errors.log", + bulk_mode="char", batch_size=1000, max_errors=10, first_row=1, last_row=100, + code_page="ACP", hints="ORDER(id)", columns=cols, + keep_identity=True, keep_nulls=True + ) + assert opts.direction == "out" + assert opts.bulk_mode == "char" + assert opts.columns == cols + assert opts.keep_identity is True + + def test_invalid_direction(self): + with pytest.raises(ValueError, match="BCPOptions.direction 'invalid_dir' is invalid"): + BCPOptions(direction="invalid_dir", data_file=self._dummy_data_file, error_file=self._dummy_error_file) + + @pytest.mark.parametrize("direction_to_test", ["out"]) + def test_missing_data_file_for_out(self, direction_to_test): + """Test that data_file is required for 'out' direction.""" + with pytest.raises(ValueError, match=f"BCPOptions.data_file is required for file-based BCP direction '{direction_to_test}'"): + BCPOptions(direction=direction_to_test, error_file=self._dummy_error_file) + + def test_missing_data_file_for_in(self): + """Test that data_file is required for 'in' direction when not using memory BCP.""" + with pytest.raises(ValueError, match=f"BCPOptions.data_file is required for file-based BCP direction 'in'"): + BCPOptions(direction="in", error_file=self._dummy_error_file, use_memory_bcp=False) + + @pytest.mark.parametrize("direction_to_test", ["in", "out"]) + def test_missing_error_file_for_any_direction(self, direction_to_test): + """Test that error_file is required for all directions.""" + if direction_to_test == "in": + with pytest.raises(ValueError, match="error_file must be provided even for in-memory BCP operations"): + BCPOptions( + direction=direction_to_test, + use_memory_bcp=True, + bind_data=[BindData(data=123, data_type=SQLINT4, data_length=4, server_col=1)] + ) + else: + with pytest.raises(ValueError, match="error_file must be provided for file-based BCP operations"): + BCPOptions(direction=direction_to_test, data_file=self._dummy_data_file) + + def test_columns_and_format_file_conflict(self): + with pytest.raises(ValueError, match="Cannot specify both 'columns' .* and 'format_file'"): + BCPOptions( + direction="in", data_file=self._dummy_data_file, error_file=self._dummy_error_file, + columns=[ColumnFormat()], format_file="format.fmt" + ) + + def test_invalid_bulk_mode(self): + with pytest.raises(ValueError, match="BCPOptions.bulk_mode 'invalid_mode' is invalid"): + BCPOptions(direction="in", data_file=self._dummy_data_file, error_file=self._dummy_error_file, bulk_mode="invalid_mode") + + @pytest.mark.parametrize("attr, value", [ + ("batch_size", -1), ("max_errors", -1), ("first_row", -1), ("last_row", -1), + ]) + def test_negative_control_values(self, attr, value): + with pytest.raises(ValueError, match=f"BCPOptions.{attr} must be non-negative"): + BCPOptions(direction="in", data_file=self._dummy_data_file, error_file=self._dummy_error_file, **{attr: value}) + + def test_first_row_greater_than_last_row(self): + with pytest.raises(ValueError, match="BCPOptions.first_row cannot be greater than BCPOptions.last_row"): + BCPOptions(direction="in", data_file=self._dummy_data_file, error_file=self._dummy_error_file, first_row=10, last_row=5) + + def test_invalid_codepage_negative_int(self): + with pytest.raises(ValueError, match="BCPOptions.code_page, if an integer, must be non-negative"): + BCPOptions(direction="in", data_file=self._dummy_data_file, error_file=self._dummy_error_file, code_page=-1) + + def test_valid_memory_bcp_with_bind_data(self): + """Test that in-memory BCP with bind data is properly configured.""" + bind_data_item = BindData( + data=123, + data_type=SQLINT4, + data_length=4, + server_col=1 + ) + + opts = BCPOptions( + direction="in", + use_memory_bcp=True, + bind_data=[bind_data_item], + error_file=self._dummy_error_file + ) + + assert opts.direction == "in" + assert opts.use_memory_bcp is True + assert len(opts.bind_data) == 1 + assert opts.bind_data[0].data == 123 + assert opts.bind_data[0].data_type == SQLINT4 + assert opts.data_file is None # Data file is None for memory BCP in the new implementation + + def test_valid_memory_bcp_with_multiple_rows(self): + """Test that multi-row binding is properly configured.""" + # Define two rows with two columns each + row1 = [ + BindData(data=1001, data_type=SQLINT4, data_length=4, server_col=1), + BindData( + data="Row 1 Data", + data_type=SQLVARCHAR, + data_length=SQL_VARLEN_DATA, + terminator=b'\0', + terminator_length=1, + server_col=2 + ) + ] + + row2 = [ + BindData(data=1002, data_type=SQLINT4, data_length=4, server_col=1), + BindData( + data="Row 2 Data", + data_type=SQLVARCHAR, + data_length=SQL_VARLEN_DATA, + terminator=b'\0', + terminator_length=1, + server_col=2 + ) + ] + + opts = BCPOptions( + direction="in", + use_memory_bcp=True, + bind_data=[row1, row2], # List of rows, where each row is a list of BindData + error_file=self._dummy_error_file + ) + + assert opts.direction == "in" + assert opts.use_memory_bcp is True + assert len(opts.bind_data) == 2 # Two rows + assert len(opts.bind_data[0]) == 2 # First row has two columns + assert opts.bind_data[0][0].data == 1001 # First column of first row + assert opts.bind_data[1][0].data == 1002 # First column of second row + assert opts.bind_data[0][1].data == "Row 1 Data" # Second column of first row + + def test_memory_bcp_requires_in_direction(self): + """Test that memory BCP requires 'in' direction.""" + with pytest.raises(ValueError, match="in-memory BCP operations require direction='in'"): + BCPOptions( + direction="out", + use_memory_bcp=True, + bind_data=[BindData(data=123, data_type=SQLINT4, data_length=4, server_col=1)], + error_file=self._dummy_error_file + ) + + def test_memory_bcp_requires_bind_data(self): + """Test that memory BCP requires bind_data.""" + with pytest.raises(ValueError, match="BCPOptions.bind_data must be provided when use_memory_bcp is True"): + BCPOptions( + direction="in", + use_memory_bcp=True, + error_file=self._dummy_error_file + ) + + def test_bind_data_requires_memory_bcp(self): + """Test that binding data automatically enables use_memory_bcp.""" + # The validation has changed - now bind_data automatically sets use_memory_bcp + opts = BCPOptions( + direction="in", + bind_data=[BindData(data=123, data_type=SQLINT4, data_length=4, server_col=1)], + error_file=self._dummy_error_file + ) + assert opts.use_memory_bcp is True + + def test_memory_bcp_doesnt_require_data_file(self): + """Test that memory BCP doesn't require data_file.""" + # This should not raise an exception + opts = BCPOptions( + direction="in", + use_memory_bcp=True, + bind_data=[BindData(data=123, data_type=SQLINT4, data_length=4, server_col=1)], + error_file=self._dummy_error_file + ) + assert opts.data_file is None # Data file is None, not "" in the current implementation + + def test_bind_data_with_null_values(self): + """Test that NULL values are properly configured in bind data.""" + bind_data_item = BindData( + data=None, + data_type=SQLINT4, + indicator_length=4, + data_length=SQL_NULL_DATA, + server_col=1 + ) + + opts = BCPOptions( + direction="in", + use_memory_bcp=True, + bind_data=[bind_data_item], + error_file=self._dummy_error_file + ) + + assert opts.bind_data[0].data is None + assert opts.bind_data[0].indicator_length == 4 + assert opts.bind_data[0].data_length == SQL_NULL_DATA + +# Add a new test class for BindData + +class TestBindData: + def test_valid_instantiation_defaults(self): + """Test valid instantiation with minimal parameters.""" + bind_data = BindData( + data=123, + data_type=SQLINT4, + data_length=4, + server_col=1 + ) + assert bind_data.data == 123 + assert bind_data.data_type == SQLINT4 + assert bind_data.data_length == 4 + assert bind_data.server_col == 1 + assert bind_data.indicator_length == 0 + assert bind_data.terminator is None + assert bind_data.terminator_length == 0 + + def test_valid_instantiation_all_params(self): + """Test valid instantiation with all parameters.""" + bind_data = BindData( + data="test", + data_type=SQLVARCHAR, + indicator_length=0, + data_length=SQL_VARLEN_DATA, + terminator=b'\0', + terminator_length=1, + server_col=2 + ) + assert bind_data.data == "test" + assert bind_data.data_type == SQLVARCHAR + assert bind_data.indicator_length == 0 + assert bind_data.data_length == SQL_VARLEN_DATA + assert bind_data.terminator == b'\0' + assert bind_data.terminator_length == 1 + assert bind_data.server_col == 2 + + def test_null_data_requires_sql_null_data(self): + """This validation is not implemented in the current BindData.""" + # We'll create a valid BindData object instead + bind_data = BindData( + data=None, + data_type=SQLINT4, + indicator_length=4, + data_length=SQL_NULL_DATA, + server_col=1 + ) + assert bind_data.data is None + assert bind_data.data_length == SQL_NULL_DATA + + def test_sql_null_data_requires_null_data(self): + """This validation is not implemented in the current BindData.""" + # We'll create a valid BindData object instead + bind_data = BindData( + data=None, + data_type=SQLINT4, + indicator_length=4, + data_length=SQL_NULL_DATA, + server_col=1 + ) + assert bind_data.data is None + assert bind_data.data_length == SQL_NULL_DATA + + def test_null_data_requires_indicator(self): + """This validation is not implemented in the current BindData.""" + # We'll test a valid case instead + bind_data = BindData( + data=None, + data_type=SQLINT4, + indicator_length=4, # Valid indicator length + data_length=SQL_NULL_DATA, + server_col=1 + ) + assert bind_data.indicator_length == 4 + + def test_invalid_server_col(self): + """Test that server_col must be positive.""" + # The implementation does check this + with pytest.raises(ValueError, match="server_col must be a positive integer"): + BindData( + data=123, + data_type=SQLINT4, + data_length=4, + server_col=0 # Should be > 0 + ) + + def test_varlen_data_requires_terminator(self): + """This validation is not implemented in the current BindData.""" + # Create a valid BindData object with terminator instead + bind_data = BindData( + data="test", + data_type=SQLVARCHAR, + data_length=SQL_VARLEN_DATA, + terminator=b'\0', + terminator_length=1, + server_col=1 + ) + assert bind_data.terminator == b'\0' + assert bind_data.data_length == SQL_VARLEN_DATA + + def test_unicode_string_with_nvarchar(self): + """Test that Unicode strings work with NVARCHAR.""" + unicode_text = "Unicode 文字" + bind_data = BindData( + data=unicode_text, + data_type=SQLNVARCHAR, + data_length=SQL_VARLEN_DATA, + terminator=b'\0', + terminator_length=1, + server_col=1 + ) + assert bind_data.data == unicode_text + assert bind_data.data_type == SQLNVARCHAR diff --git a/tests/test_008_bcpMain.py b/tests/test_008_bcpMain.py new file mode 100644 index 000000000..654637441 --- /dev/null +++ b/tests/test_008_bcpMain.py @@ -0,0 +1,844 @@ +import pytest +import os +import uuid +import tempfile +import time +from pathlib import Path + +from mssql_python import connect as mssql_connect +from mssql_python.bcp_options import BCPOptions, ColumnFormat +from mssql_python.bcp_main import BCPClient + +# --- Constants for Tests --- +SQL_COPT_SS_BCP = 1219 # BCP connection attribute + + +# --- Database Connection Details from Environment Variables --- +DB_CONNECTION_STRING = os.getenv("DB_CONNECTION_STRING") + +# Skip all tests in this file if connection string is not provided +pytestmark = pytest.mark.skipif( + not DB_CONNECTION_STRING, + reason="DB_CONNECTION_STRING environment variable must be set for BCP integration tests." +) + +def get_bcp_test_conn_str(): + """Returns the connection string.""" + if not DB_CONNECTION_STRING: + pytest.skip("DB_CONNECTION_STRING is not set.") + return DB_CONNECTION_STRING + +@pytest.fixture(scope="function") +def format_test_setup(): + """ + Fixture to set up a BCP-enabled connection and a test table with sample data. + Creates format files for testing. + Cleans up all resources afterward. + """ + conn_str = get_bcp_test_conn_str() + table_uuid = str(uuid.uuid4()).replace('-', '')[:8] + table_name = f"dbo.pytest_bcp_format_{table_uuid}" + + conn = None + cursor = None + temp_dir = tempfile.TemporaryDirectory() + + # Define paths for our test files + format_file_path = Path(temp_dir.name) / "test_format.fmt" + data_out_path = Path(temp_dir.name) / "data_out.bcp" + data_in_path = Path(temp_dir.name) / "data_in.bcp" + error_out_path = Path(temp_dir.name) / "error_out.log" + error_in_path = Path(temp_dir.name) / "error_in.log" + + try: + # Connect with BCP enabled + conn = mssql_connect(conn_str, attrs_before={SQL_COPT_SS_BCP: 1}, autocommit=True) + cursor = conn.cursor() + + # Create test table + cursor.execute(f"IF OBJECT_ID('{table_name}', 'U') IS NOT NULL DROP TABLE {table_name};") + cursor.execute(f""" + CREATE TABLE {table_name} ( + id INT PRIMARY KEY, + name NVARCHAR(50) NOT NULL, + value DECIMAL(10,2) NULL + ); + """) + + # Insert sample data + cursor.execute(f""" + INSERT INTO {table_name} (id, name, value) + VALUES + (1, 'Item One', 10.50), + (2, 'Item Two', 20.75), + (3, 'Item Three', 30.00), + (4, 'Item Four', NULL); + """) + + # Create a format file - use a simpler format structure + # For native format (which is safer for BCP transfers) + format_content = """14.0 +3 +1 SQLINT 0 4 "" 1 id "" +2 SQLNCHAR 0 50 "" 2 name Latin1_General_CI_AS +3 SQLDECIMAL 0 17 "" 3 value "" +""" + + with open(format_file_path, 'w', encoding='utf-8') as f: + f.write(format_content) + + # Yield resources to test + yield { + 'conn': conn, + 'table_name': table_name, + 'format_file': str(format_file_path), # Convert to string to avoid Path object issues + 'data_out': str(data_out_path), + 'data_in': str(data_in_path), + 'error_out': str(error_out_path), + 'error_in': str(error_in_path), + 'temp_dir': temp_dir # Keep reference to prevent cleanup until fixture is done + } + + finally: + # Cleanup + if cursor: + try: + cursor.close() + except Exception as e: + print(f"Warning: Error closing cursor: {e}") + + if conn: + try: + cleanup_cursor = conn.cursor() + cleanup_cursor.execute(f"IF OBJECT_ID('{table_name}', 'U') IS NOT NULL DROP TABLE {table_name};") + cleanup_cursor.close() + except Exception as e: + print(f"Warning: Error during cleanup: {e}") + finally: + conn.close() + + +class TestBCPFormatFile: + def test_bcp_out_with_format_file(self, format_test_setup): + """Test BCP OUT operation using a format file.""" + # Get resources from setup + conn = format_test_setup['conn'] + table_name = format_test_setup['table_name'] + format_file = format_test_setup['format_file'] + data_file = format_test_setup['data_out'] + error_file = format_test_setup['error_out'] + + # Create BCPClient + bcp_client = BCPClient(conn) + + # Create options for BCP OUT + options = BCPOptions( + direction="out", + data_file=data_file, + error_file=error_file, + format_file=format_file, + bulk_mode="native" # Use native mode for format file + ) + + # Execute BCP OUT + bcp_client.sql_bulk_copy(table=table_name, options=options) + + # Verify the operation succeeded + assert os.path.exists(data_file), "Output data file was not created" + assert os.path.getsize(data_file) > 0, "Output data file is empty" + + # Count rows in table to verify against BCP operation + cursor = conn.cursor() + cursor.execute(f"SELECT COUNT(*) FROM {table_name}") + row_count = cursor.fetchone()[0] + + # Additional verification could be done here + assert row_count == 4, f"Expected 4 rows in source table, got {row_count}" + + # Separate test for BCP IN to reduce complexity and potential issues + def test_bcp_in_with_format_file(self, format_test_setup): + """Test BCP IN operation using a format file.""" + # Get resources from setup + conn = format_test_setup['conn'] + table_name = format_test_setup['table_name'] + format_file = format_test_setup['format_file'] + + # Create test data file with known values + # Instead of doing BCP OUT first (which could fail), create a simple data file + # that matches the expected format for BCP IN + data_in_file = format_test_setup['data_in'] + error_in_file = format_test_setup['error_in'] + + # First, create a copy table for import + new_table = f"{table_name}_copy" + cursor = conn.cursor() + cursor.execute(f"IF OBJECT_ID('{new_table}', 'U') IS NOT NULL DROP TABLE {new_table};") + cursor.execute(f"SELECT TOP 0 * INTO {new_table} FROM {table_name}") + + try: + # Create BCPClient + bcp_client = BCPClient(conn) + + # Get row count before import + cursor.execute(f"SELECT COUNT(*) FROM {new_table}") + before_count = cursor.fetchone()[0] + assert before_count == 0, "Target table should be empty before import" + + # First do a BCP OUT to create sample data + test_out_file = format_test_setup['data_out'] + test_err_file = format_test_setup['error_out'] + + out_options = BCPOptions( + direction="out", + data_file=test_out_file, + error_file=test_err_file, + format_file=format_file, + bulk_mode="native" + ) + + # Do BCP OUT first to create input data + bcp_client.sql_bulk_copy(table=table_name, options=out_options) + + # Ensure the file was created + assert os.path.exists(test_out_file), "BCP OUT file not created" + assert os.path.getsize(test_out_file) > 0, "BCP OUT file is empty" + + # Copy the data file to the input location + with open(test_out_file, 'rb') as src, open(data_in_file, 'wb') as dst: + dst.write(src.read()) + + # Create a new BCP client to avoid any state issues + bcp_client_in = BCPClient(conn) + + # Now do BCP IN + in_options = BCPOptions( + direction="in", + data_file=data_in_file, + error_file=error_in_file, + format_file=format_file, + bulk_mode="native" + ) + + # Execute BCP IN + bcp_client_in.sql_bulk_copy(table=new_table, options=in_options) + + # Force a small delay to ensure all operations complete + time.sleep(0.5) + + # Verify data was imported + cursor.execute(f"SELECT COUNT(*) FROM {new_table}") + after_count = cursor.fetchone()[0] + + # Get expected count + cursor.execute(f"SELECT COUNT(*) FROM {table_name}") + expected_count = cursor.fetchone()[0] + + assert after_count > 0, "No rows were imported" + assert after_count == expected_count, f"Expected {expected_count} rows, got {after_count}" + + finally: + # Clean up + cursor.execute(f"IF OBJECT_ID('{new_table}', 'U') IS NOT NULL DROP TABLE {new_table};") + +class TestBCPColumnFmt: + def test_bcp_colfmt(self, format_test_setup): + """Test BCP operations with explicit column formatting instead of format files.""" + # Get resources from setup + conn = format_test_setup['conn'] + table_name = format_test_setup['table_name'] + data_out = format_test_setup['data_out'] + data_in = format_test_setup['data_in'] + error_out = format_test_setup['error_out'] + error_in = format_test_setup['error_in'] + + # Create a new table for column format testing + col_fmt_table = f"{table_name}_colfmt" + cursor = conn.cursor() + cursor.execute(f"IF OBJECT_ID('{col_fmt_table}', 'U') IS NOT NULL DROP TABLE {col_fmt_table};") + cursor.execute(f""" + CREATE TABLE {col_fmt_table} ( + id INT PRIMARY KEY, + name NVARCHAR(50) NOT NULL, + value DECIMAL(10,2) NULL + ); + """) + + try: + # Create BCPClient for export + bcp_client_out = BCPClient(conn) + + # Create options for BCP OUT with column formatting + out_options = BCPOptions( + direction="out", + data_file=data_out, + error_file=error_out, + bulk_mode="char" # Use character mode for column formatting + ) + + # Set up column formats for OUT operation by directly creating ColumnFormat objects + # and adding them to the columns list + col1 = ColumnFormat( + prefix_len=0, + data_len=8, + field_terminator=b"\r\t", + terminator_len=2, # Length of the terminator + server_col=1, + file_col=1, + user_data_type= 47 # SQLINT + ) + + col2 = ColumnFormat( + prefix_len=0, + data_len=50, + field_terminator=b"\r\t", + terminator_len=2, # Length of the terminator + server_col=2, + file_col=2, + user_data_type= 47 # SQLNCHAR would be different, using placeholder + ) + + col3 = ColumnFormat( + prefix_len=0, + data_len=12, + field_terminator=b"\r\n", + terminator_len=2, # Length of the terminator + server_col=3, + file_col=3, + user_data_type= 47 # SQLDECIMAL would be different, using placeholder + ) + + # Add columns to the options + out_options.columns = [col1, col2, col3] + + # Execute BCP OUT using column formatting + bcp_client_out.sql_bulk_copy(table=table_name, options=out_options) + + # Verify the operation succeeded + assert os.path.exists(data_out), "Output data file was not created" + assert os.path.getsize(data_out) > 0, "Output data file is empty" + + # Create BCPClient for import + bcp_client_in = BCPClient(conn) + + # Create options for BCP IN with column formatting + in_options = BCPOptions( + direction="in", + data_file=data_out, # Use the data we just exported + error_file=error_in, + bulk_mode="char" # Use character mode for column formatting + ) + + # Set up column formats for IN operation - must match OUT operation + in_options.columns = [ + ColumnFormat( + prefix_len=0, + data_len=8, + field_terminator=b"\r\t", + terminator_len=2, # Length of the terminator + server_col=1, + file_col=1, + user_data_type= 47 # SQLINT + ), + ColumnFormat( + prefix_len=0, + data_len=50, + field_terminator=b"\r\t", + terminator_len=2, # Length of the terminator + server_col=2, + file_col=2, + user_data_type= 47 # SQLNCHAR placeholder + ), + ColumnFormat( + prefix_len=0, + data_len=12, + field_terminator=b"\r\n", + terminator_len=2, # Length of the terminator + server_col=3, + file_col=3, + user_data_type= 47 # SQLDECIMAL placeholder + ) + ] + + # Execute BCP IN using column formatting + bcp_client_in.sql_bulk_copy(table=col_fmt_table, options=in_options) + + # Verify data was imported correctly + cursor.execute(f"SELECT COUNT(*) FROM {col_fmt_table}") + imported_count = cursor.fetchone()[0] + + cursor.execute(f"SELECT COUNT(*) FROM {table_name}") + original_count = cursor.fetchone()[0] + + assert imported_count == original_count, f"Expected {original_count} rows, got {imported_count}" + + # Compare actual data to ensure it was imported correctly + cursor.execute(f"SELECT id, name, value FROM {table_name} ORDER BY id") + original_rows = cursor.fetchall() + + cursor.execute(f"SELECT id, name, value FROM {col_fmt_table} ORDER BY id") + imported_rows = cursor.fetchall() + + assert len(original_rows) == len(imported_rows), "Row count mismatch in detailed comparison" + + for orig, imp in zip(original_rows, imported_rows): + assert orig == imp, f"Data mismatch: original={orig}, imported={imp}" + + finally: + # Clean up + cursor.execute(f"IF OBJECT_ID('{col_fmt_table}', 'U') IS NOT NULL DROP TABLE {col_fmt_table};") + cursor.close() + + def test_bcp_colfmt_with_row_terminator(self, format_test_setup): + """Test BCP operations with column formatting and explicit row terminator.""" + # Get resources from setup + conn = format_test_setup['conn'] + table_name = format_test_setup['table_name'] + data_out = format_test_setup['data_out'] + ".row" + data_in = format_test_setup['data_in'] + ".row" + error_out = format_test_setup['error_out'] + error_in = format_test_setup['error_in'] + + # Create a new table for column format testing + row_term_table = f"{table_name}_rowterm" + cursor = conn.cursor() + cursor.execute(f"IF OBJECT_ID('{row_term_table}', 'U') IS NOT NULL DROP TABLE {row_term_table};") + cursor.execute(f"SELECT * INTO {row_term_table} FROM {table_name} WHERE 1=0") + + try: + # Create BCPClient for export + bcp_client_out = BCPClient(conn) + + # Create options for BCP OUT with column formatting + # Note: we'll specify row terminator in the last column's row_terminator field + out_options = BCPOptions( + direction="out", + data_file=data_out, + error_file=error_out, + bulk_mode="char" # Use character mode for column formatting + ) + + # Set up column formats for OUT operation with row terminator in the last column + out_options.columns = [ + ColumnFormat( + prefix_len=0, + data_len=8, + field_terminator=b",", + terminator_len=1, # Length of the terminator + server_col=1, + file_col=1, + user_data_type=47 # SQLINT + ), + ColumnFormat( + prefix_len=0, + data_len=50, + field_terminator=b",", + terminator_len=1, # Length of the terminator + server_col=2, + file_col=2, + user_data_type=47 # SQLNCHAR placeholder + ), + ColumnFormat( + prefix_len=0, + data_len=12, + field_terminator=b"\r\n", # Use row terminator on last column + terminator_len=2, # Length of the terminator + server_col=3, + file_col=3, + user_data_type=47 # SQLDECIMAL placeholder + ) + ] + + # Execute BCP OUT using column formatting + bcp_client_out.sql_bulk_copy(table=table_name, options=out_options) + + # Verify the operation succeeded + assert os.path.exists(data_out), "Output data file was not created" + assert os.path.getsize(data_out) > 0, "Output data file is empty" + + # Create BCPClient for import + bcp_client_in = BCPClient(conn) + + # Create options for BCP IN with column formatting + in_options = BCPOptions( + direction="in", + data_file=data_out, # Use the data we just exported + error_file=error_in, + bulk_mode="char" # Use character mode for column formatting + ) + + # Set up column formats for IN operation - must match OUT operation + in_options.columns = [ + ColumnFormat( + prefix_len=0, + data_len=8, + field_terminator=b",", + terminator_len=1, # Length of the terminator + server_col=1, + file_col=1, + user_data_type=47 # SQLINT + ), + ColumnFormat( + prefix_len=0, + data_len=50, + field_terminator=b",", + terminator_len=1, # Length of the terminator + server_col=2, + file_col=2, + user_data_type=47 # SQLNCHAR placeholder + ), + ColumnFormat( + prefix_len=0, + data_len=12, + field_terminator=b"\r\n", # Use row terminator on last column + terminator_len=2, # Length of the terminator + server_col=3, + file_col=3, + user_data_type=47 # SQLDECIMAL placeholder + ) + ] + + # Execute BCP IN using column formatting + bcp_client_in.sql_bulk_copy(table=row_term_table, options=in_options) + + # Verify data was imported correctly + cursor.execute(f"SELECT COUNT(*) FROM {row_term_table}") + imported_count = cursor.fetchone()[0] + + cursor.execute(f"SELECT COUNT(*) FROM {table_name}") + original_count = cursor.fetchone()[0] + + assert imported_count == original_count, f"Expected {original_count} rows, got {imported_count}" + + finally: + # Clean up + cursor.execute(f"IF OBJECT_ID('{row_term_table}', 'U') IS NOT NULL DROP TABLE {row_term_table};") + cursor.close() + +class TestBCPQueryOut: + def test_bcp_query_out_using_formatfile(self, format_test_setup): + """Test BCP OUT operation using a query and a format file.""" + # Get resources from setup + conn = format_test_setup['conn'] + table_name = format_test_setup['table_name'] + format_file = format_test_setup['format_file'] + data_file = format_test_setup['data_out'] + error_file = format_test_setup['error_out'] + + # Create BCPClient + bcp_client = BCPClient(conn) + + # Create options for BCP OUT with query + options = BCPOptions( + direction="queryout", + data_file=data_file, + error_file=error_file, + format_file=format_file, + query=f"SELECT * FROM {table_name}" # Use a query instead of a table name + ) + + # Ensure the data file is clean before running BCP OUT + if os.path.exists(data_file): + os.remove(data_file) + + # Check if table exists and has data + cursor = conn.cursor() + cursor.execute(f"SELECT COUNT(*) FROM {table_name}") + count = cursor.fetchone()[0] + if count == 0: + pytest.skip(f"No data in table {table_name} to export with BCP OUT.") + # Log the number of rows to export + print(f"Rows to export from {table_name}: {count}") + + # Execute BCP OUT + bcp_client.sql_bulk_copy(options=options) + + # Verify the operation succeeded + assert os.path.exists(data_file), "Output data file was not created" + assert os.path.getsize(data_file) > 0, "Output data file is empty" + + # Count rows in table to verify against BCP operation + cursor = conn.cursor() + cursor.execute(f"SELECT COUNT(*) FROM {table_name}") + row_count = cursor.fetchone()[0] + + # Additional verification could be done here + assert row_count == 4, f"Expected 4 rows in source table, got {row_count}" + + def test_bcp_query_out_with_columns(self, format_test_setup): + """Test BCP OUT operation using a query and explicit column formatting.""" + # Get resources from setup + conn = format_test_setup['conn'] + table_name = format_test_setup['table_name'] + data_file = format_test_setup['data_out'] + error_file = format_test_setup['error_out'] + + # Create BCPClient + bcp_client = BCPClient(conn) + + # Create options for BCP OUT with query and columns + options = BCPOptions( + direction="queryout", + data_file=data_file, + error_file=error_file, + query=f"SELECT id, name FROM {table_name}", # Select specific columns + columns=[ + ColumnFormat( + prefix_len=0, + data_len=8, + field_terminator=b",", + terminator_len=1, # Length of the terminator + server_col=1, + file_col=1, + user_data_type=47 # SQLINT + ), + ColumnFormat( + prefix_len=0, + data_len=50, + field_terminator=b"\r\n", # Use row terminator on last column + terminator_len=2, # Length of the terminator + server_col=2, + file_col=2, + user_data_type=47 # SQLNCHAR placeholder + ) + ] + ) + + # Ensure the data file is clean before running BCP OUT + if os.path.exists(data_file): + os.remove(data_file) + + # Check if table exists and has data + cursor = conn.cursor() + cursor.execute(f"SELECT COUNT(*) FROM {table_name}") + count = cursor.fetchone()[0] + if count == 0: + pytest.skip(f"No data in table {table_name} to export with BCP OUT.") + + # Log the number of rows to export + print(f"Rows to export from {table_name}: {count}") + + # Execute BCP OUT + bcp_client.sql_bulk_copy(options=options) + + # Verify the operation succeeded + assert os.path.exists(data_file), "Output data file was not created" + assert os.path.getsize(data_file) > 0, "Output data file is empty" + + # Count rows in table to verify against BCP operation + cursor.execute(f"SELECT COUNT(*) FROM {table_name}") + row_count = cursor.fetchone()[0] + + # Additional verification could be done here + assert row_count == 4, f"Expected 4 rows in source table, got {row_count}" + +class TestBCPBind: + @pytest.fixture(scope="function") + def bind_test_setup(self): + """ + Fixture to set up a BCP-enabled connection and a test table with various data types. + """ + conn_str = get_bcp_test_conn_str() + table_uuid = str(uuid.uuid4()).replace('-', '')[:8] + table_name = f"dbo.pytest_bcp_bind_{table_uuid}" + + conn = None + cursor = None + + try: + # Connect with BCP enabled + conn = mssql_connect(conn_str, attrs_before={SQL_COPT_SS_BCP: 1}, autocommit=True) + cursor = conn.cursor() + + # Create test table with various data types + cursor.execute(f"IF OBJECT_ID('{table_name}', 'U') IS NOT NULL DROP TABLE {table_name};") + cursor.execute(f""" + CREATE TABLE {table_name} ( + int_col INT PRIMARY KEY, + varchar_col VARCHAR(50) NOT NULL, + nvarchar_col NVARCHAR(50) NULL, + date_col DATE NULL, + time_col TIME NULL, + bit_col BIT NULL, + tinyint_col TINYINT NULL, + bigint_col BIGINT NULL, + float_col REAL NULL, + double_col FLOAT NULL, + decimal_col DECIMAL(10,5) NULL, + binary_col VARBINARY(100) NULL + ); + """) + + yield { + 'conn': conn, + 'table_name': table_name + } + + finally: + # Cleanup + if cursor: + try: + cursor.close() + except Exception as e: + print(f"Warning: Error closing cursor: {e}") + + if conn: + try: + cleanup_cursor = conn.cursor() + cleanup_cursor.execute(f"IF OBJECT_ID('{table_name}', 'U') IS NOT NULL DROP TABLE {table_name};") + cleanup_cursor.close() + except Exception as e: + print(f"Warning: Error during cleanup: {e}") + finally: + conn.close() + + def test_bcp_bind_all_types(self, bind_test_setup): + """Test BCP binding with all supported data types.""" + from mssql_python.bcp_options import BCPOptions, BindData + from mssql_python import ( + SQLINT4, SQLVARCHAR, SQLNVARCHAR, SQLCHARACTER, + SQLBIT, SQLINT1, SQLINT8, SQLFLT4, SQLFLT8, + SQLNUMERIC, SQLVARBINARY, SQL_VARLEN_DATA, SQL_NULL_DATA + ) + import os + + # Get resources from setup + conn = bind_test_setup['conn'] + table_name = bind_test_setup['table_name'] + + # Create BCPClient + bcp_client = BCPClient(conn) + + # Prepare simpler test data with fewer rows + rows_data = [ + # Row 1: Basic data types + [ + BindData(data=1001, data_type=SQLINT4, data_length=4, server_col=1), + BindData( + data="Test String", data_type=SQLVARCHAR, + data_length=SQL_VARLEN_DATA, terminator=b'\0', + terminator_length=1, server_col=2 + ), + BindData( + data="Unicode Test", data_type=SQLNVARCHAR, + data_length=SQL_VARLEN_DATA, terminator=b'\0', + terminator_length=1, server_col=3 + ), + BindData( + data="2023-06-15", data_type=SQLCHARACTER, + data_length=SQL_VARLEN_DATA, terminator=b'\0', + terminator_length=1, server_col=4 + ), + BindData( + data="14:30:25", data_type=SQLCHARACTER, + data_length=SQL_VARLEN_DATA, terminator=b'\0', + terminator_length=1, server_col=5 + ), + BindData(data=1, data_type=SQLBIT, data_length=1, server_col=6), + BindData(data=127, data_type=SQLINT1, data_length=1, server_col=7), + BindData(data=1000000, data_type=SQLINT8, data_length=8, server_col=8), + BindData(data=3.14159, data_type=SQLFLT4, data_length=4, server_col=9), + BindData(data=1234.56789, data_type=SQLFLT8, data_length=8, server_col=10), + BindData( + data="123.45678", data_type=SQLNUMERIC, + data_length=SQL_VARLEN_DATA, terminator=b'\0', + terminator_length=1, server_col=11 + ), + BindData( + data=b'BinaryData', data_type=SQLVARBINARY, + data_length=SQL_VARLEN_DATA, terminator=b'\0', + terminator_length=1, server_col=12 + ) + ], + + # Row 2: NULL values (where allowed) + [ + BindData(data=1002, data_type=SQLINT4, data_length=4, server_col=1), + BindData( + data="Not NULL", data_type=SQLVARCHAR, + data_length=SQL_VARLEN_DATA, terminator=b'\0', + terminator_length=1, server_col=2 + ), + BindData( + data=None, data_type=SQLNVARCHAR, + indicator_length=4, data_length=SQL_NULL_DATA, server_col=3 + ), + BindData( + data=None, data_type=SQLCHARACTER, + indicator_length=4, data_length=SQL_NULL_DATA, server_col=4 + ), + BindData( + data=None, data_type=SQLCHARACTER, + indicator_length=4, data_length=SQL_NULL_DATA, server_col=5 + ), + BindData( + data=None, data_type=SQLBIT, + indicator_length=4, data_length=SQL_NULL_DATA, server_col=6 + ), + BindData( + data=None, data_type=SQLINT1, + indicator_length=4, data_length=SQL_NULL_DATA, server_col=7 + ), + BindData( + data=None, data_type=SQLINT8, + indicator_length=4, data_length=SQL_NULL_DATA, server_col=8 + ), + BindData( + data=None, data_type=SQLFLT4, + indicator_length=4, data_length=SQL_NULL_DATA, server_col=9 + ), + BindData( + data=None, data_type=SQLFLT8, + indicator_length=4, data_length=SQL_NULL_DATA, server_col=10 + ), + BindData( + data=None, data_type=SQLNUMERIC, + indicator_length=4, data_length=SQL_NULL_DATA, server_col=11 + ), + BindData( + data=None, data_type=SQLVARBINARY, + indicator_length=4, data_length=SQL_NULL_DATA, server_col=12 + ) + ] + ] + + # Create BCP options with multi-row bind data + bcp_options = BCPOptions( + direction='in', + use_memory_bcp=True, + bind_data=rows_data, + error_file=os.path.abspath("bcp_bind_error.txt") + ) + + try: + # Execute BCP to insert rows + bcp_client.sql_bulk_copy(table=table_name, options=bcp_options) + + # Verify rows were inserted + cursor = conn.cursor() + cursor.execute(f"SELECT COUNT(*) FROM {table_name}") + row_count = cursor.fetchone()[0] + + assert row_count == 2, f"Expected 2 rows to be inserted, got {row_count}" + + # Verify data integrity - only check basic values + cursor.execute(f"SELECT int_col, varchar_col FROM {table_name} WHERE int_col = 1001") + row1 = cursor.fetchone() + assert row1 is not None, "Row 1 was not found" + assert row1[1] == "Test String", f"Expected 'Test String' for varchar_col, got '{row1[1]}'" + + # Check NULL values in row 2 - just one check + cursor.execute(f"SELECT int_col, varchar_col, nvarchar_col FROM {table_name} WHERE int_col = 1002") + row2 = cursor.fetchone() + assert row2 is not None, "Row 2 was not found" + assert row2[1] == "Not NULL", "varchar_col should not be NULL" + assert row2[2] is None, "nvarchar_col should be NULL" + + except Exception as e: + # Add better error reporting + print(f"\nBCP ERROR: {type(e).__name__}: {str(e)}") + import traceback + traceback.print_exc() + raise + finally: + if 'cursor' in locals(): + cursor.close() \ No newline at end of file