diff --git a/README.md b/README.md index 5a491cd3a..57ceeacbf 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,7 @@ These connectors are ready to use out of the box, requiring minimal modification - [influx_db](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/influx_db) - This example shows how to sync data from InfluxDB using Connector SDK. It uses the `influxdb3_python` library to connect to InfluxDB and fetch time-series data from a specified measurement. - [iterate](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/iterate) - This example shows how to sync NPS survey data from the Iterate REST API and load it into your destination using Connector SDK. The connector fetches NPS surveys and their individual responses, providing complete survey analytics data for downstream analysis. - [leavedates](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/leavedates) - This example shows how to sync leave report data from LeaveDates API by using the Connector SDK. You need to provide your LeaveDates API token and company ID for this example to work. +- [marqeta](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/marqeta) - This example shows how to sync users, businesses, and transactions data from the Marqeta Core API by using the Connector SDK. You need to provide your Marqeta API username and password for this example to work. - [microsoft_excel](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/microsoft_excel) - This example shows how to sync data from Microsoft Excel using Connector SDK. It shows three different ways to sync data from Excel files using `pandas`, `python-calamine` and `openpyxl`. - [microsoft_intune](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/microsoft_intune/) - This example shows how to connect to Microsoft Intune and retrieve all managed devices using the Connector SDK. - [newsapi](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/newsapi) - This is a simple example of how to sync data from NewsAPI using Connector SDK. diff --git a/connectors/marqeta/README.md b/connectors/marqeta/README.md new file mode 100644 index 000000000..f4f6698bc --- /dev/null +++ b/connectors/marqeta/README.md @@ -0,0 +1,92 @@ +# Marqeta API Connector Example + +## Connector overview +This connector syncs users, businesses, and transactions data from the Marqeta Core API platform. Marqeta provides modern card issuing and payment processing infrastructure, enabling companies to build innovative fintech products. The connector fetches comprehensive data including user profiles, business entities, and detailed transaction records with support for incremental synchronization and memory-efficient processing of large datasets. + +## Requirements +- [Supported Python versions](https://github.com/fivetran/fivetran_connector_sdk/blob/main/README.md#requirements): +- Operating system: + - Windows: 10 or later (64-bit only) + - macOS: 13 (Ventura) or later (Apple Silicon [arm64] or Intel [x86_64]) + - Linux: Distributions such as Ubuntu 20.04 or later, Debian 10 or later, or Amazon Linux 2 or later (arm64 or x86_64) + +## Getting started +Refer to the [Connector SDK Setup Guide](https://fivetran.com/docs/connectors/connector-sdk/setup-guide) to get started. + +## Features +- Syncs user profiles, business entities, and transaction records from Marqeta Core API +- HTTP Basic authentication with automatic retry logic (refer to `execute_api_request` function) +- Index-based pagination with automatic page traversal (refer to `get_users`, `get_businesses`, and `get_transactions` functions) +- Memory-efficient streaming prevents data accumulation for large datasets +- Incremental synchronization using timestamp-based cursors (refer to `get_time_range` function) +- Comprehensive error handling with exponential backoff retry logic +- Rate limiting support with automatic retry after delays + +## Configuration file +```json +{ + "username": "", + "password": "", + "initial_sync_days": "", + "max_records_per_page": "", + "request_timeout_seconds": "", + "retry_attempts": "", + "enable_incremental_sync": "" +} +``` + +### Configuration parameters +- `username` (required): Marqeta API username for HTTP Basic authentication +- `password` (required): Marqeta API password for HTTP Basic authentication +- `initial_sync_days` (optional): Number of days to fetch for initial sync (1-365) +- `max_records_per_page` (optional): Records per API page (1-500, default: 100) +- `request_timeout_seconds` (optional): HTTP request timeout (10-120, default: 30) +- `retry_attempts` (optional): Number of retry attempts for failed requests (1-5, default: 3) +- `enable_incremental_sync` (optional): Enable timestamp-based incremental sync + +## Requirements file +This connector requires the `faker` package for testing mock data generation. + +Note: The `fivetran_connector_sdk:latest` and `requests:latest` packages are pre-installed in the Fivetran environment. To avoid dependency conflicts, do not declare them in your `requirements.txt`. + +## Authentication +1. Log in to the [Marqeta Developer Portal](https://www.marqeta.com/docs). +2. Navigate to your application settings to obtain API credentials. +3. Make a note of the `username` and `password` for HTTP Basic authentication. +4. Retrieve your application token from the Marqeta Developer Portal. +5. Use sandbox credentials for testing, production credentials for live syncing. + +Note: The connector uses HTTP Basic authentication with automatic retry logic. Credentials are never logged or exposed in plain text. + +## Pagination +Index-based pagination with automatic page traversal (refer to the `get_users`, `get_businesses`, and `get_transactions` functions). Generator-based processing prevents memory accumulation for large datasets. Processes pages sequentially while yielding individual records for immediate processing using `start_index` and `count` parameters. + +## Data handling +User, business, and transaction data are mapped from Marqeta's API format to normalized database columns (refer to the `__map_user_data`, `__map_business_data`, and `__map_transaction_data` functions). Nested objects are flattened to JSON strings, and all timestamps are converted to UTC format for consistency. + +Supports timestamp-based incremental synchronization using the `last_sync_time` state parameter (refer to the `get_time_range` function). Initial sync can be configured to fetch historical data up to 365 days. + +## Error handling +- 429 Rate Limited: Automatic retry with Retry-After header support (refer to the `__handle_rate_limit` function) +- Timeout handling with configurable retry attempts (refer to the `__handle_request_error` function) +- Exponential backoff with jitter prevents multiple clients from making requests at the same time +- Parameter validation with descriptive error messages provides clear guidance for fixing setup issues +- Network connectivity errors with automatic retry logic + +## Tables created +| Table | Primary Key | Description | +|-------|-------------|-------------| +| USERS | `token` | User profiles and personal information | +| BUSINESSES | `token` | Business entities and company information | +| TRANSACTIONS | `token` | Transaction records and payment data | + +Column types are automatically inferred by Fivetran. Sample columns include: + +**USERS**: `token`, `first_name`, `last_name`, `email`, `phone`, `address1`, `city`, `state`, `created_time`, `last_modified_time`, `status`, `metadata` + +**BUSINESSES**: `token`, `business_name_legal`, `business_name_dba`, `business_type`, `ein`, `website`, `phone`, `address1`, `city`, `state`, `created_time`, `last_modified_time`, `status`, `metadata` + +**TRANSACTIONS**: `token`, `type`, `state`, `user_token`, `business_token`, `card_token`, `amount`, `currency_code`, `network`, `created_time`, `settlement_date`, `merchant`, `response`, `metadata` + +## Additional considerations +The examples provided are intended to help you effectively use Fivetran's Connector SDK. While we've tested the code, Fivetran cannot be held responsible for any unexpected or negative consequences that may arise from using these examples. For inquiries, please reach out to our Support team. \ No newline at end of file diff --git a/connectors/marqeta/configuration.json b/connectors/marqeta/configuration.json new file mode 100644 index 000000000..382531845 --- /dev/null +++ b/connectors/marqeta/configuration.json @@ -0,0 +1,9 @@ +{ + "username": "", + "password": "", + "initial_sync_days": "", + "max_records_per_page": "", + "request_timeout_seconds": "", + "retry_attempts": "", + "enable_incremental_sync": "" +} \ No newline at end of file diff --git a/connectors/marqeta/connector.py b/connectors/marqeta/connector.py new file mode 100644 index 000000000..7f941319c --- /dev/null +++ b/connectors/marqeta/connector.py @@ -0,0 +1,651 @@ +"""Marqeta API connector for syncing users, businesses, and transactions data. +This connector demonstrates how to fetch data from Marqeta Core API and upsert it into destination using memory-efficient streaming patterns. +See the Technical Reference documentation (https://fivetran.com/docs/connectors/connector-sdk/technical-reference#update) +and the Best Practices documentation (https://fivetran.com/docs/connectors/connector-sdk/best-practices) for details +""" + +# For reading configuration from a JSON file +import json + +# Import required classes from fivetran_connector_sdk +from fivetran_connector_sdk import Connector + +# For enabling Logs in your connector code +from fivetran_connector_sdk import Logging as log + +# For supporting Data operations like Upsert(), Update(), Delete() and checkpoint() +from fivetran_connector_sdk import Operations as op + +# For making HTTP requests to Marqeta API +import requests + +# For handling dates and timestamps +from datetime import datetime, timedelta, timezone + +# For implementing delays in retry logic and rate limiting +import time + +# For adding jitter to retry delays +import random + +""" ADD YOUR SOURCE-SPECIFIC IMPORTS HERE +Example: import pandas, boto3, etc. +Add comment for each import to explain its purpose for users to follow. +""" +# Private constants (use __ prefix) +__API_ENDPOINT = "https://api.marqeta.com/v3" + + +def __get_config_int(configuration, key, default, min_val=None, max_val=None): + """ + Extract and validate integer configuration parameters with range checking. + This function safely extracts integer values from configuration and applies validation. + + Args: + configuration: Configuration dictionary containing connector settings. + key: The configuration key to extract. + default: Default value to return if key is missing or invalid. + min_val: Minimum allowed value (optional). + max_val: Maximum allowed value (optional). + + Returns: + int: The validated integer value or default if validation fails. + """ + try: + value = int(configuration.get(key, default)) + if min_val is not None and value < min_val: + return default + if max_val is not None and value > max_val: + return default + return value + except (ValueError, TypeError): + return default + + +def __get_config_str(configuration, key, default=""): + """ + Extract string configuration parameters with type safety. + This function safely extracts string values from configuration dictionary. + + Args: + configuration: Configuration dictionary containing connector settings. + key: The configuration key to extract. + default: Default value to return if key is missing. + + Returns: + str: The string value or default if key is missing. + """ + return str(configuration.get(key, default)) + + +def __get_config_bool(configuration, key, default=False): + """ + Extract and parse boolean configuration parameters from strings or boolean values. + This function handles string representations of boolean values commonly used in JSON configuration. + + Args: + configuration: Configuration dictionary containing connector settings. + key: The configuration key to extract. + default: Default boolean value to return if key is missing. + + Returns: + bool: The parsed boolean value or default if key is missing. + """ + value = configuration.get(key, default) + if isinstance(value, str): + return value.lower() in ("true", "1", "yes", "on") + return bool(value) + + +def __calculate_wait_time(attempt, response_headers, base_delay=1, max_delay=60): + """ + Calculate exponential backoff wait time with jitter for retry attempts. + This function implements exponential backoff with random jitter to prevent thundering herd problems. + + Args: + attempt: Current attempt number (0-based). + response_headers: HTTP response headers dictionary that may contain Retry-After. + base_delay: Base delay in seconds for exponential backoff. + max_delay: Maximum delay cap in seconds. + + Returns: + float: Wait time in seconds before next retry attempt. + """ + if "Retry-After" in response_headers: + return min(int(response_headers["Retry-After"]), max_delay) + + # Exponential backoff with jitter + wait_time = min(base_delay * (2**attempt), max_delay) + jitter = random.uniform(0.1, 0.3) * wait_time + return wait_time + jitter + + +def __handle_rate_limit(attempt, response): + """ + Handle HTTP 429 rate limiting responses with appropriate delays. + This function logs the rate limit and waits before allowing retry attempts. + + Args: + attempt: Current attempt number for logging purposes. + response: HTTP response object containing rate limit headers. + """ + wait_time = __calculate_wait_time(attempt, response.headers) + log.warning(f"Rate limit hit, waiting {wait_time:.1f} seconds before retry {attempt + 1}") + time.sleep(wait_time) + + +def __handle_request_error(attempt, retry_attempts, error, endpoint): + """ + Handle request errors with exponential backoff retry logic. + This function manages retry attempts for failed API requests with appropriate delays. + + Args: + attempt: Current attempt number (0-based). + retry_attempts: Total number of retry attempts allowed. + error: The exception that occurred during the request. + endpoint: API endpoint that failed for logging purposes. + + Raises: + Exception: Re-raises the original error after all retry attempts are exhausted. + """ + if attempt < retry_attempts - 1: + wait_time = __calculate_wait_time(attempt, {}) + log.warning( + f"Request failed for {endpoint}: {str(error)}. Retrying in {wait_time:.1f} seconds..." + ) + time.sleep(wait_time) + else: + log.severe(f"All retry attempts failed for {endpoint}: {str(error)}") + raise error + + +def execute_api_request(endpoint, auth_credentials, params=None, configuration=None): + """ + Execute HTTP API requests with comprehensive error handling and retry logic. + This function handles authentication, rate limiting, timeouts, and network errors. + + Args: + endpoint: API endpoint path to request. + auth_credentials: Dictionary containing username and password for Basic auth. + params: Query parameters for the request (optional). + configuration: Configuration dictionary for timeout and retry settings. + + Returns: + dict: Parsed JSON response from the API. + + Raises: + RuntimeError: If all retry attempts fail or unexpected errors occur. + requests.exceptions.RequestException: For unrecoverable HTTP errors. + """ + url = f"{__API_ENDPOINT}{endpoint}" + + timeout = __get_config_int(configuration, "request_timeout_seconds", 30) + retry_attempts = __get_config_int(configuration, "retry_attempts", 3) + + for attempt in range(retry_attempts): + try: + response = requests.get( + url, + auth=(auth_credentials["username"], auth_credentials["password"]), + params=params, + timeout=timeout, + ) + + if response.status_code == 429: + __handle_rate_limit(attempt, response) + continue + + response.raise_for_status() + return response.json() + + except requests.exceptions.RequestException as e: + __handle_request_error(attempt, retry_attempts, e, endpoint) + continue + + raise RuntimeError("Unexpected error in API request execution") + + +def get_time_range(last_sync_time=None, configuration=None): + """ + Generate time range for incremental or initial data synchronization. + This function creates start and end timestamps for API queries based on sync state. + + Args: + last_sync_time: Timestamp of last successful sync (optional). + configuration: Configuration dictionary containing sync settings. + + Returns: + dict: Dictionary containing 'start' and 'end' timestamps in ISO format. + """ + end_time = datetime.now(timezone.utc).isoformat() + + if last_sync_time: + start_time = last_sync_time + else: + initial_sync_days = __get_config_int(configuration, "initial_sync_days", 90) + start_time = (datetime.now(timezone.utc) - timedelta(days=initial_sync_days)).isoformat() + + return {"start": start_time, "end": end_time} + + +def __map_user_data(record): + """ + Transform API response record to users table schema format. + This function maps raw Marqeta user API fields to normalized database column names and types. + + Args: + record: Raw API response record dictionary for user data. + + Returns: + dict: Transformed user record ready for database insertion. + """ + return { + "token": record.get("token", ""), + "active": record.get("active", True), + "first_name": record.get("first_name", ""), + "last_name": record.get("last_name", ""), + "email": record.get("email", ""), + "phone": record.get("phone", ""), + "gender": record.get("gender", ""), + "birth_date": record.get("birth_date", ""), + "ssn": record.get("ssn", ""), + "address1": record.get("address1", ""), + "address2": record.get("address2", ""), + "city": record.get("city", ""), + "state": record.get("state", ""), + "postal_code": record.get("postal_code", ""), + "country": record.get("country", ""), + "company": record.get("company", ""), + "created_time": record.get("created_time", ""), + "last_modified_time": record.get("last_modified_time", ""), + "metadata": json.dumps(record.get("metadata", {})) if record.get("metadata") else None, + "account_holder_group_token": record.get("account_holder_group_token", ""), + "status": record.get("status", ""), + "uses_parent_account": record.get("uses_parent_account", False), + "parent_token": record.get("parent_token", ""), + "timestamp": datetime.now(timezone.utc).isoformat(), + } + + +def __map_business_data(record): + """ + Transform API response record to businesses table schema format. + This function maps raw Marqeta business API fields to normalized database column names and types. + + Args: + record: Raw API response record dictionary for business data. + + Returns: + dict: Transformed business record ready for database insertion. + """ + return { + "token": record.get("token", ""), + "active": record.get("active", True), + "business_name_legal": record.get("business_name_legal", ""), + "business_name_dba": record.get("business_name_dba", ""), + "business_type": record.get("business_type", ""), + "incorporation_date": record.get("incorporation_date", ""), + "ein": record.get("ein", ""), + "website": record.get("website", ""), + "phone": record.get("phone", ""), + "address1": record.get("address1", ""), + "address2": record.get("address2", ""), + "city": record.get("city", ""), + "state": record.get("state", ""), + "postal_code": record.get("postal_code", ""), + "country": record.get("country", ""), + "created_time": record.get("created_time", ""), + "last_modified_time": record.get("last_modified_time", ""), + "metadata": json.dumps(record.get("metadata", {})) if record.get("metadata") else None, + "account_holder_group_token": record.get("account_holder_group_token", ""), + "status": record.get("status", ""), + "timestamp": datetime.now(timezone.utc).isoformat(), + } + + +def __map_transaction_data(record): + """ + Transform API response record to transactions table schema format. + This function maps raw Marqeta transaction API fields to normalized database column names and types. + + Args: + record: Raw API response record dictionary for transaction data. + + Returns: + dict: Transformed transaction record ready for database insertion. + """ + return { + "token": record.get("token", ""), + "type": record.get("type", ""), + "state": record.get("state", ""), + "identifier": record.get("identifier", ""), + "user_token": record.get("user_token", ""), + "business_token": record.get("business_token", ""), + "card_token": record.get("card_token", ""), + "amount": record.get("amount", 0.0), + "currency_code": record.get("currency_code", ""), + "request_amount": record.get("request_amount", 0.0), + "network": record.get("network", ""), + "subnetwork": record.get("subnetwork", ""), + "acquirer_fee_amount": record.get("acquirer_fee_amount", 0.0), + "created_time": record.get("created_time", ""), + "user_transaction_time": record.get("user_transaction_time", ""), + "settlement_date": record.get("settlement_date", ""), + "merchant": json.dumps(record.get("merchant", {})) if record.get("merchant") else None, + "card_acceptor": ( + json.dumps(record.get("card_acceptor", {})) if record.get("card_acceptor") else None + ), + "acquirer": json.dumps(record.get("acquirer", {})) if record.get("acquirer") else None, + "issuer": json.dumps(record.get("issuer", {})) if record.get("issuer") else None, + "response": json.dumps(record.get("response", {})) if record.get("response") else None, + "metadata": json.dumps(record.get("metadata", {})) if record.get("metadata") else None, + "clearing_record_sequence_number": record.get("clearing_record_sequence_number", ""), + "network_reference_id": record.get("network_reference_id", ""), + "approval_code": record.get("approval_code", ""), + "timestamp": datetime.now(timezone.utc).isoformat(), + } + + +def get_users(auth_credentials, params, last_sync_time=None, configuration=None): + """ + Fetch users data using memory-efficient streaming approach with pagination. + This generator function prevents memory accumulation by yielding individual records. + + Args: + auth_credentials: Dictionary containing authentication credentials. + params: Additional parameters for the API request. + last_sync_time: Timestamp for incremental sync (optional). + configuration: Configuration dictionary containing connector settings. + + Yields: + dict: Individual user records mapped to destination schema. + + Raises: + RuntimeError: If API requests fail after all retry attempts. + """ + endpoint = "/users" + max_records = __get_config_int(configuration, "max_records_per_page", 100, 1, 500) + + request_params = { + "count": max_records, + "start_index": 0, + } + + if last_sync_time: + request_params["start_date"] = last_sync_time + + start_index = 0 + while True: + request_params["start_index"] = start_index + response = execute_api_request(endpoint, auth_credentials, request_params, configuration) + + data = response.get("data", []) + if not data: + break + + # Yield individual records instead of accumulating + for record in data: + yield __map_user_data(record) + + if len(data) < max_records: + break + start_index += max_records + + +def get_businesses(auth_credentials, params, last_sync_time=None, configuration=None): + """ + Fetch businesses data using memory-efficient streaming approach with pagination. + This generator function prevents memory accumulation by yielding individual records. + + Args: + auth_credentials: Dictionary containing authentication credentials. + params: Additional parameters for the API request. + last_sync_time: Timestamp for incremental sync (optional). + configuration: Configuration dictionary containing connector settings. + + Yields: + dict: Individual business records mapped to destination schema. + + Raises: + RuntimeError: If API requests fail after all retry attempts. + """ + endpoint = "/businesses" + max_records = __get_config_int(configuration, "max_records_per_page", 100, 1, 500) + + request_params = { + "count": max_records, + "start_index": 0, + } + + if last_sync_time: + request_params["start_date"] = last_sync_time + + start_index = 0 + while True: + request_params["start_index"] = start_index + response = execute_api_request(endpoint, auth_credentials, request_params, configuration) + + data = response.get("data", []) + if not data: + break + + # Yield individual records instead of accumulating + for record in data: + yield __map_business_data(record) + + if len(data) < max_records: + break + start_index += max_records + + +def get_transactions(auth_credentials, params, last_sync_time=None, configuration=None): + """ + Fetch transactions data using memory-efficient streaming approach with pagination. + This generator function prevents memory accumulation by yielding individual records. + + Args: + auth_credentials: Dictionary containing authentication credentials. + params: Additional parameters for the API request. + last_sync_time: Timestamp for incremental sync (optional). + configuration: Configuration dictionary containing connector settings. + + Yields: + dict: Individual transaction records mapped to destination schema. + + Raises: + RuntimeError: If API requests fail after all retry attempts. + """ + endpoint = "/transactions" + max_records = __get_config_int(configuration, "max_records_per_page", 100, 1, 500) + + time_range = get_time_range(last_sync_time, configuration) + + request_params = { + "count": max_records, + "start_index": 0, + "start_date": time_range["start"], + "end_date": time_range["end"], + } + + start_index = 0 + while True: + request_params["start_index"] = start_index + response = execute_api_request(endpoint, auth_credentials, request_params, configuration) + + data = response.get("data", []) + if not data: + break + + # Yield individual records instead of accumulating + for record in data: + yield __map_transaction_data(record) + + if len(data) < max_records: + break + start_index += max_records + + +def schema(configuration: dict): + """ + Define database schema with table names and primary keys for the connector. + This function specifies the destination tables and their primary keys for Fivetran to create. + + Args: + configuration: Configuration dictionary (not used but required by SDK). + + Returns: + list: List of table schema dictionaries with table names and primary keys. + """ + return [ + {"table": "users", "primary_key": ["token"]}, + {"table": "businesses", "primary_key": ["token"]}, + {"table": "transactions", "primary_key": ["token"]}, + ] + + +def update(configuration: dict, state: dict): + """ + Main synchronization function that fetches and processes data from the Marqeta API. + This function orchestrates the entire sync process using memory-efficient streaming patterns. + + Args: + configuration: Configuration dictionary containing API credentials and settings. + state: State dictionary containing sync cursors and checkpoints from previous runs. + + Raises: + RuntimeError: If sync fails due to API errors or configuration issues. + """ + log.info("Starting Marqeta API connector sync") + + # Extract configuration parameters (SDK auto-validates required fields) + username = __get_config_str(configuration, "username") + password = __get_config_str(configuration, "password") + max_records_per_page = __get_config_int(configuration, "max_records_per_page", 100, 1, 500) + enable_incremental = __get_config_bool(configuration, "enable_incremental_sync", True) + + auth_credentials = {"username": username, "password": password} + + # Get state for incremental sync + last_sync_time = state.get("last_sync_time") if enable_incremental else None + + try: + # Fetch users data using generator with incremental checkpointing + log.info("Fetching users data...") + user_count = 0 + for record in get_users(auth_credentials, {}, last_sync_time, configuration): + # The 'upsert' operation is used to insert or update data in the destination table. + # The op.upsert method is called with two arguments: + # - The first argument is the name of the table to upsert the data into. + # - The second argument is a dictionary containing the data to be upserted, + op.upsert(table="users", data=record) + user_count += 1 + + # Checkpoint every page/batch to save progress incrementally + if user_count % max_records_per_page == 0: + checkpoint_state = { + "last_sync_time": record.get( + "last_modified_time", datetime.now(timezone.utc).isoformat() + ), + "users_processed": user_count, + } + # Save the progress by checkpointing the state. This is important for ensuring that the sync process can resume + # from the correct position in case of next sync or interruptions. + # Learn more about how and where to checkpoint by reading our best practices documentation + # (https://fivetran.com/docs/connectors/connector-sdk/best-practices#largedatasetrecommendation). + op.checkpoint(checkpoint_state) + + log.info(f"Completed users sync. Processed {user_count} users.") + + # Fetch businesses data using generator with incremental checkpointing + log.info("Fetching businesses data...") + business_count = 0 + for record in get_businesses(auth_credentials, {}, last_sync_time, configuration): + # The 'upsert' operation is used to insert or update data in the destination table. + # The op.upsert method is called with two arguments: + # - The first argument is the name of the table to upsert the data into. + # - The second argument is a dictionary containing the data to be upserted, + op.upsert(table="businesses", data=record) + business_count += 1 + + # Checkpoint every page/batch to save progress incrementally + if business_count % max_records_per_page == 0: + checkpoint_state = { + "last_sync_time": record.get( + "last_modified_time", datetime.now(timezone.utc).isoformat() + ), + "users_processed": user_count, + "businesses_processed": business_count, + } + # Save the progress by checkpointing the state. This is important for ensuring that the sync process can resume + # from the correct position in case of next sync or interruptions. + # Learn more about how and where to checkpoint by reading our best practices documentation + # (https://fivetran.com/docs/connectors/connector-sdk/best-practices#largedatasetrecommendation). + op.checkpoint(checkpoint_state) + + log.info(f"Completed businesses sync. Processed {business_count} businesses.") + + # Fetch transactions data using generator with incremental checkpointing + log.info("Fetching transactions data...") + transaction_count = 0 + for record in get_transactions(auth_credentials, {}, last_sync_time, configuration): + # The 'upsert' operation is used to insert or update data in the destination table. + # The op.upsert method is called with two arguments: + # - The first argument is the name of the table to upsert the data into. + # - The second argument is a dictionary containing the data to be upserted, + op.upsert(table="transactions", data=record) + transaction_count += 1 + + # Checkpoint every page/batch to save progress incrementally + if transaction_count % max_records_per_page == 0: + checkpoint_state = { + "last_sync_time": record.get( + "created_time", datetime.now(timezone.utc).isoformat() + ), + "users_processed": user_count, + "businesses_processed": business_count, + "transactions_processed": transaction_count, + } + # Save the progress by checkpointing the state. This is important for ensuring that the sync process can resume + # from the correct position in case of next sync or interruptions. + # Learn more about how and where to checkpoint by reading our best practices documentation + # (https://fivetran.com/docs/connectors/connector-sdk/best-practices#largedatasetrecommendation). + op.checkpoint(checkpoint_state) + + log.info(f"Completed transactions sync. Processed {transaction_count} transactions.") + + # Final checkpoint with completion status + final_state = { + "last_sync_time": datetime.now(timezone.utc).isoformat(), + "users_processed": user_count, + "businesses_processed": business_count, + "transactions_processed": transaction_count, + } + # Save the progress by checkpointing the state. This is important for ensuring that the sync process can resume + # from the correct position in case of next sync or interruptions. + # Learn more about how and where to checkpoint by reading our best practices documentation + # (https://fivetran.com/docs/connectors/connector-sdk/best-practices#largedatasetrecommendation). + op.checkpoint(final_state) + + log.info( + f"Sync completed successfully. Total: {user_count} users, {business_count} businesses, {transaction_count} transactions." + ) + + except Exception as e: + log.severe(f"Sync failed: {str(e)}") + raise RuntimeError(f"Failed to sync data: {str(e)}") + + +# Creating an instance of the connector +connector = Connector(update=update, schema=schema) + +# Check if the script is being run as the main module. +# This is Python's standard entry method allowing your script to be run directly from the command line or IDE 'run' button. +# This is useful for debugging while you write your code. Note this method is not called by Fivetran when executing your connector in production. +# Please test using the Fivetran debug command prior to finalizing and deploying your connector. +if __name__ == "__main__": + # Open the configuration.json file and load its contents + with open("configuration.json", "r") as f: + configuration = json.load(f) + + # Test the connector locally + connector.debug(configuration=configuration)