diff --git a/README.md b/README.md index 86bade28c..d2c47ccf7 100644 --- a/README.md +++ b/README.md @@ -87,7 +87,8 @@ These connectors are ready to use out of the box, requiring minimal modification - [rabbitmq](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/rabbitmq) - This example shows how to sync messages from RabbitMQ queues using Connector SDK. It uses the `pika` library to connect to RabbitMQ and fetch messages from specified queues. You need to provide your RabbitMQ connection URL for this example to work. - Redshift - [simple_redshift_connector](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/redshift/simple_redshift_connector) - This example shows how to sync records from Redshift by using Connector SDK. You need to provide your Redshift credentials for this example to work. - - [large_data_volumes](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/redshift/large_data_volume) - This example shows how to sync large data volumes from Redshift by using Connector SDK. You need to provide your Redshift credentials for this example to work. + - [large_data_volumes](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/redshift/large_data_volume) - This example shows how to sync large data volumes from Redshift by using Connector SDK. You need to provide your Redshift credentials for this example to work. +- [rotamaster](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/rotamaster) - This example shows how to sync employee data, team information, and role definitions from the Rotamaster API by using Connector SDK. You need to provide your Rotamaster API key for this example to work. - [s3_csv_validation](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/s3_csv_validation) - This is an example of how to read .CSV file from Amazon S3 and validate the data. You need to provide your AWS S3 credentials for this example to work. - [SAP HANA SQL](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/sap_hana_sql) - This example uses hdbcli to connect to SAP HANA SQL Server for syncing data using Connector SDK. You need to provide your SAP HANA SQL Server credentials for this example to work. - [sensor_tower](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/sensor_tower) - This example shows how to use the Connector SDK to integrate with Sensor Tower and sync market intelligence data for mobile apps of your choice. diff --git a/connectors/rotamaster/README.md b/connectors/rotamaster/README.md new file mode 100644 index 000000000..2980df3d9 --- /dev/null +++ b/connectors/rotamaster/README.md @@ -0,0 +1,130 @@ +# Rotamaster API Connector Example + +## Connector overview +This connector syncs employee, team, and role data from Rotamaster API into your destination. The connector fetches data from three core endpoints: `/api/People` for employee information including personal details and employment status, `/api/Team` for organizational team structure, and `/api/Role` for role definitions and hierarchies. It supports incremental synchronization using timestamp-based cursors and includes comprehensive filtering options for employee status (active, archived, suspended, deleted). + +## Requirements +- [Supported Python versions](https://github.com/fivetran/fivetran_connector_sdk/blob/main/README.md#requirements): +- Operating system: + - Windows: 10 or later (64-bit only) + - macOS: 13 (Ventura) or later (Apple Silicon [arm64] or Intel [x86_64]) + - Linux: Distributions such as Ubuntu 20.04 or later, Debian 10 or later, or Amazon Linux 2 or later (arm64 or x86_64) + +## Getting started +Refer to the [Connector SDK Setup Guide](https://fivetran.com/docs/connectors/connector-sdk/setup-guide) to get started. + +## Features +- Syncs employee data, team information, and role definitions from Rotamaster API +- Bearer token authentication with comprehensive error handling (refer to the `execute_api_request` function) +- Record-based pagination with configurable limits (refer to the `get_people_data` function) +- Memory-efficient streaming prevents data accumulation for large employee datasets +- Incremental synchronization using timestamp-based cursors (refer to the `get_time_range` function) +- Comprehensive error handling with exponential backoff retry logic +- Configurable employee filtering by status (active, archived, suspended, deleted) +- Rate limiting handling with automatic retry delays (refer to the `__handle_rate_limit` function) + +## Configuration file +```json +{ + "api_key": "", + "sync_frequency_hours": "", + "initial_sync_days": "", + "max_records_per_page": "", + "request_timeout_seconds": "", + "retry_attempts": "", + "enable_people_sync": "", + "enable_teams_sync": "", + "enable_roles_sync": "", + "include_active_people": "", + "include_archived_people": "", + "include_suspended_people": "", + "include_deleted_people": "", + "enable_debug_logging": "" +} +``` + +### Configuration parameters +- `api_key` (required): Bearer token for Rotamaster API authentication +- `sync_frequency_hours` (optional): Hours between sync attempts, defaults to 4 +- `initial_sync_days` (optional): Days of historical data for initial sync, defaults to 90 +- `max_records_per_page` (optional): Records per API request (1-1000), defaults to 100 +- `request_timeout_seconds` (optional): API request timeout, defaults to 30 +- `retry_attempts` (optional): Failed request retry attempts, defaults to 3 +- `enable_people_sync` (optional): Enable employee data sync, defaults to true +- `enable_teams_sync` (optional): Enable team data sync, defaults to true +- `enable_roles_sync` (optional): Enable role data sync, defaults to true +- `include_active_people` (optional): Include active employees, defaults to true +- `include_archived_people` (optional): Include archived employees, defaults to false +- `include_suspended_people` (optional): Include suspended employees, defaults to false +- `include_deleted_people` (optional): Include deleted employees, defaults to false + +## Requirements file +This connector does not require any additional packages beyond those provided by the Fivetran environment. + +Note: The `fivetran_connector_sdk:latest` and `requests:latest` packages are pre-installed in the Fivetran environment. To avoid dependency conflicts, do not declare them in your `requirements.txt`. + +## Authentication +1. Log in to the [Rotamaster Developer Portal](https://data-api.rotamasterweb.co.uk). +2. Generate an API key from your account settings or contact Rotamaster administrators. +3. Make a note of the Bearer token provided for API authentication. +4. Ensure your API key has permissions for People, Team, and Role endpoints. +5. Use test credentials for development, production credentials for live syncing. + +Note: The connector handles authentication errors automatically with clear error messages. API keys are never logged or exposed in plain text. + +## Pagination +Record-based pagination using `numberOfRecords` parameter with automatic handling (refer to `get_people_data` function). Generator-based processing prevents memory accumulation for large employee datasets. Processes API responses sequentially while yielding individual records for immediate processing. + +## Data handling +Employee data is mapped from Rotamaster's API format to normalized database columns (refer to the `__map_people_data` function). Nested objects are flattened, and all timestamps are converted to UTC format for consistency. + +Supports timestamp-based incremental synchronization using the `last_sync_time` state parameter (refer to the `get_time_range` function). Initial sync can be configured to fetch historical data up to 365 days. + +## Error handling +- 429 Rate Limited: Automatic retry with Retry-After header support (refer to the `__handle_rate_limit` function) +- Timeout handling with configurable retry attempts (refer to the `__handle_request_error` function) +- Exponential backoff with jitter prevents multiple clients from making requests at the same time +- Parameter validation with descriptive error messages provides clear guidance for fixing setup issues +- Authentication errors with clear guidance for API key configuration + +## Tables created +| Table | Primary Key | Description | +|-------|-------------|-------------| +| PEOPLE | `id` | Employee personal information, employment details, and status. | +| TEAMS | `id` | Team and organizational group information. | +| ROLES | `id` | Role definitions and organizational hierarchy. | + +Column types are automatically inferred by Fivetran. Sample columns include `first_name`, `last_name`, `email`, `employment_start_date`, `is_active`, `team_name`, `role_name`, `nominal_code`. + +### PEOPLE table columns +- `id`: Unique employee identifier +- `external_id`: External employee reference +- `first_name`: Employee first name +- `last_name`: Employee last name +- `email`: Employee email address +- `date_of_birth`: Employee date of birth +- `gender`: Employee gender +- `mobile_number`: Employee mobile phone +- `landline_number`: Employee landline phone +- `employment_start_date`: Employment start date +- `employment_end_date`: Employment end date (if applicable) +- `is_active`: Active employment status +- `is_archived`: Archived status +- `is_suspended`: Suspended status +- `is_deleted`: Deleted status +- `sync_timestamp`: Record synchronization timestamp + +### TEAMS table columns +- `id`: Unique team identifier +- `external_id`: External team reference +- `name`: Team name +- `sync_timestamp`: Record synchronization timestamp + +### ROLES table columns +- `id`: Unique role identifier +- `name`: Role name +- `nominal_code`: Role nominal code (optional) +- `sync_timestamp`: Record synchronization timestamp + +## Additional considerations +The examples provided are intended to help you effectively use Fivetran's Connector SDK. While we've tested the code, Fivetran cannot be held responsible for any unexpected or negative consequences that may arise from using these examples. For inquiries, please reach out to our Support team. diff --git a/connectors/rotamaster/configuration.json b/connectors/rotamaster/configuration.json new file mode 100644 index 000000000..b7a7726cd --- /dev/null +++ b/connectors/rotamaster/configuration.json @@ -0,0 +1,16 @@ +{ + "api_key": "", + "sync_frequency_hours": "", + "initial_sync_days": "", + "max_records_per_page": "", + "request_timeout_seconds": "", + "retry_attempts": "", + "enable_people_sync": "", + "enable_teams_sync": "", + "enable_roles_sync": "", + "include_active_people": "", + "include_archived_people": "", + "include_suspended_people": "", + "include_deleted_people": "", + "enable_debug_logging": "" +} \ No newline at end of file diff --git a/connectors/rotamaster/connector.py b/connectors/rotamaster/connector.py new file mode 100644 index 000000000..7e169287e --- /dev/null +++ b/connectors/rotamaster/connector.py @@ -0,0 +1,532 @@ +"""Rotamaster API connector for syncing employee, team, and role data. +This connector demonstrates how to fetch data from Rotamaster API and upsert it into destination using memory-efficient streaming patterns. +See the Technical Reference documentation (https://fivetran.com/docs/connectors/connector-sdk/technical-reference#update) +and the Best Practices documentation (https://fivetran.com/docs/connectors/connector-sdk/best-practices) for details +""" + +# For reading configuration from a JSON file +import json + +# For implementing delays in retry logic and rate limiting +import time + +# For adding jitter to retry delays +import random + +# Import required classes from fivetran_connector_sdk +from fivetran_connector_sdk import Connector + +# For enabling Logs in your connector code +from fivetran_connector_sdk import Logging as log + +# For supporting Data operations like Upsert(), Update(), Delete() and checkpoint() +from fivetran_connector_sdk import Operations as op + +# For making HTTP requests to Rotamaster API +import requests + +# For handling dates and timestamps +from datetime import datetime, timedelta, timezone + +""" ADD YOUR SOURCE-SPECIFIC IMPORTS HERE +Example: import pandas, boto3, etc. +Add comment for each import to explain its purpose for users to follow. +""" +# Private constants (use __ prefix) +__API_ENDPOINT = "https://data-api.rotamasterweb.co.uk" + + +def __get_config_int(configuration, key, default, min_val=None, max_val=None): + """ + Extract and validate integer configuration parameters with range checking. + This function safely extracts integer values from configuration and applies validation. + + Args: + configuration: Configuration dictionary containing connector settings. + key: The configuration key to extract. + default: Default value to return if key is missing or invalid. + min_val: Minimum allowed value (optional). + max_val: Maximum allowed value (optional). + + Returns: + int: The validated integer value or default if validation fails. + """ + try: + value = int(configuration.get(key, default)) + if min_val is not None and value < min_val: + return default + if max_val is not None and value > max_val: + return default + return value + except (ValueError, TypeError): + return default + + +def __get_config_str(configuration, key, default=""): + """ + Extract string configuration parameters with type safety. + This function safely extracts string values from configuration dictionary. + + Args: + configuration: Configuration dictionary containing connector settings. + key: The configuration key to extract. + default: Default value to return if key is missing. + + Returns: + str: The string value or default if key is missing. + """ + return str(configuration.get(key, default)) + + +def __get_config_bool(configuration, key, default=False): + """ + Extract and parse boolean configuration parameters from strings or boolean values. + This function handles string representations of boolean values commonly used in JSON configuration. + + Args: + configuration: Configuration dictionary containing connector settings. + key: The configuration key to extract. + default: Default boolean value to return if key is missing. + + Returns: + bool: The parsed boolean value or default if key is missing. + """ + value = configuration.get(key, default) + if isinstance(value, str): + return value.lower() in ("true", "1", "yes", "on") + return bool(value) + + +def __calculate_wait_time(attempt, response_headers, base_delay=1, max_delay=60): + """ + Calculate exponential backoff wait time with jitter for retry attempts. + This function implements exponential backoff with random jitter to prevent thundering herd problems. + + Args: + attempt: Current attempt number (0-based). + response_headers: HTTP response headers dictionary that may contain Retry-After. + base_delay: Base delay in seconds for exponential backoff. + max_delay: Maximum delay cap in seconds. + + Returns: + float: Wait time in seconds before next retry attempt. + """ + if "Retry-After" in response_headers: + return min(int(response_headers["Retry-After"]), max_delay) + + # Exponential backoff with jitter + wait_time = min(base_delay * (2**attempt), max_delay) + jitter = random.uniform(0.1, 0.3) * wait_time + return wait_time + jitter + + +def __handle_rate_limit(attempt, response): + """ + Handle HTTP 429 rate limiting responses with appropriate delays. + This function logs the rate limit and waits before allowing retry attempts. + + Args: + attempt: Current attempt number for logging purposes. + response: HTTP response object containing rate limit headers. + """ + wait_time = __calculate_wait_time(attempt, response.headers) + log.warning(f"Rate limit hit, waiting {wait_time:.1f} seconds before retry {attempt + 1}") + time.sleep(wait_time) + + +def __handle_request_error(attempt, retry_attempts, error, endpoint): + """ + Handle request errors with exponential backoff retry logic. + This function manages retry attempts for failed API requests with appropriate delays. + + Args: + attempt: Current attempt number (0-based). + retry_attempts: Total number of retry attempts allowed. + error: The exception that occurred during the request. + endpoint: API endpoint that failed for logging purposes. + + Raises: + Exception: Re-raises the original error after all retry attempts are exhausted. + """ + if attempt < retry_attempts - 1: + wait_time = __calculate_wait_time(attempt, {}) + log.warning( + f"Request failed for {endpoint}: {str(error)}. Retrying in {wait_time:.1f} seconds..." + ) + time.sleep(wait_time) + else: + log.severe(f"All retry attempts failed for {endpoint}: {str(error)}") + raise error + + +def execute_api_request(endpoint, api_key, params=None, configuration=None): + """ + Execute HTTP API requests with comprehensive error handling and retry logic. + This function handles authentication, rate limiting, timeouts, and network errors. + + Args: + endpoint: API endpoint path to request. + api_key: Authentication key for API access. + params: Query parameters for the request (optional). + configuration: Configuration dictionary for timeout and retry settings. + + Returns: + dict: Parsed JSON response from the API. + + Raises: + RuntimeError: If all retry attempts fail or unexpected errors occur. + requests.exceptions.RequestException: For unrecoverable HTTP errors. + """ + url = f"{__API_ENDPOINT}{endpoint}" + headers = {"Authorization": f"Bearer {api_key}"} + + timeout = __get_config_int(configuration, "request_timeout_seconds", 30) + retry_attempts = __get_config_int(configuration, "retry_attempts", 3) + + for attempt in range(retry_attempts): + try: + response = requests.get(url, headers=headers, params=params, timeout=timeout) + + if response.status_code == 429: + __handle_rate_limit(attempt, response) + continue + + response.raise_for_status() + return response.json() + + except requests.exceptions.RequestException as e: + __handle_request_error(attempt, retry_attempts, e, endpoint) + continue + + raise RuntimeError("Unexpected error in API request execution") + + +def get_time_range(last_sync_time=None, configuration=None): + """ + Generate time range for incremental or initial data synchronization. + This function creates start and end timestamps for API queries based on sync state. + + Args: + last_sync_time: Timestamp of last successful sync (optional). + configuration: Configuration dictionary containing sync settings. + + Returns: + dict: Dictionary containing 'start' and 'end' timestamps in ISO format. + """ + end_time = datetime.now(timezone.utc).isoformat() + + if last_sync_time: + start_time = last_sync_time + else: + initial_sync_days = __get_config_int(configuration, "initial_sync_days", 90) + start_time = (datetime.now(timezone.utc) - timedelta(days=initial_sync_days)).isoformat() + + return {"start": start_time, "end": end_time} + + +def __map_people_data(record): + """ + Transform API response record to PEOPLE table schema format. + This function maps raw API fields to normalized database column names and types. + + Args: + record: Raw API response record dictionary. + + Returns: + dict: Transformed record ready for database insertion. + """ + return { + "id": record.get("id", ""), + "external_id": record.get("externalId", ""), + "first_name": record.get("firstName", ""), + "last_name": record.get("lastName", ""), + "email": record.get("email", ""), + "date_of_birth": record.get("dateOfBirth", ""), + "gender": record.get("gender", ""), + "mobile_number": record.get("mobileNumber", ""), + "landline_number": record.get("landlineNumber", ""), + "employment_start_date": record.get("employmentStartDate", ""), + "employment_end_date": record.get("employmentEndDate", ""), + "is_active": record.get("isActive", False), + "is_archived": record.get("isArchived", False), + "is_suspended": record.get("isSuspended", False), + "is_deleted": record.get("isDeleted", False), + "sync_timestamp": datetime.now(timezone.utc).isoformat(), + } + + +def __map_teams_data(record): + """ + Transform API response record to TEAMS table schema format. + This function maps raw API fields to normalized database column names and types. + + Args: + record: Raw API response record dictionary. + + Returns: + dict: Transformed record ready for database insertion. + """ + return { + "id": record.get("id", ""), + "external_id": record.get("externalId", ""), + "name": record.get("name", ""), + "sync_timestamp": datetime.now(timezone.utc).isoformat(), + } + + +def __map_roles_data(record): + """ + Transform API response record to ROLES table schema format. + This function maps raw API fields to normalized database column names and types. + + Args: + record: Raw API response record dictionary. + + Returns: + dict: Transformed record ready for database insertion. + """ + return { + "id": record.get("id", ""), + "name": record.get("name", ""), + "nominal_code": record.get("nominalCode", ""), + "sync_timestamp": datetime.now(timezone.utc).isoformat(), + } + + +def get_people_data(api_key, last_sync_time=None, configuration=None): + """ + Fetch people data using memory-efficient streaming approach with pagination. + This generator function prevents memory accumulation by yielding individual records. + + Args: + api_key: API authentication key for making requests. + last_sync_time: Timestamp for incremental sync (optional). + configuration: Configuration dictionary containing connector settings. + + Yields: + dict: Individual people records mapped to destination schema. + + Raises: + RuntimeError: If API requests fail after all retry attempts. + """ + endpoint = "/api/People" + max_records = __get_config_int(configuration, "max_records_per_page", 100, 1, 1000) + include_active = __get_config_bool(configuration, "include_active_people", True) + include_archived = __get_config_bool(configuration, "include_archived_people", False) + include_suspended = __get_config_bool(configuration, "include_suspended_people", False) + include_deleted = __get_config_bool(configuration, "include_deleted_people", False) + + params = { + "numberOfRecords": max_records, + "includeActive": include_active, + "includeArchived": include_archived, + "includeSuspended": include_suspended, + "includeDeleted": include_deleted, + } + + response = execute_api_request(endpoint, api_key, params, configuration) + data = response if isinstance(response, list) else response.get("data", []) + + for record in data: + yield __map_people_data(record) + + +def get_teams_data(api_key, configuration=None): + """ + Fetch teams data using memory-efficient streaming approach. + This generator function prevents memory accumulation by yielding individual records. + + Args: + api_key: API authentication key for making requests. + configuration: Configuration dictionary containing connector settings. + + Yields: + dict: Individual team records mapped to destination schema. + + Raises: + RuntimeError: If API requests fail after all retry attempts. + """ + endpoint = "/api/Team" + max_records = __get_config_int(configuration, "max_records_per_page", 100, 1, 1000) + + params = { + "numberOfRecords": max_records, + } + + response = execute_api_request(endpoint, api_key, params, configuration) + data = response if isinstance(response, list) else response.get("data", []) + + for record in data: + yield __map_teams_data(record) + + +def get_roles_data(api_key, configuration=None): + """ + Fetch roles data using memory-efficient streaming approach. + This generator function prevents memory accumulation by yielding individual records. + + Args: + api_key: API authentication key for making requests. + configuration: Configuration dictionary containing connector settings. + + Yields: + dict: Individual role records mapped to destination schema. + + Raises: + RuntimeError: If API requests fail after all retry attempts. + """ + endpoint = "/api/Role" + max_records = __get_config_int(configuration, "max_records_per_page", 100, 1, 1000) + + params = { + "numberOfRecords": max_records, + } + + response = execute_api_request(endpoint, api_key, params, configuration) + data = response if isinstance(response, list) else response.get("data", []) + + for record in data: + yield __map_roles_data(record) + + +def schema(configuration: dict): + """ + Define database schema with table names and primary keys for the connector. + This function specifies the destination tables and their primary keys for Fivetran to create. + + Args: + configuration: Configuration dictionary (not used but required by SDK). + + Returns: + list: List of table schema dictionaries with table names and primary keys. + """ + return [ + {"table": "people", "primary_key": ["id"]}, + {"table": "teams", "primary_key": ["id"]}, + {"table": "roles", "primary_key": ["id"]}, + ] + + +def update(configuration: dict, state: dict): + """ + Main synchronization function that fetches and processes data from the Rotamaster API. + This function orchestrates the entire sync process using memory-efficient streaming patterns. + + Args: + configuration: Configuration dictionary containing API credentials and settings. + state: State dictionary containing sync cursors and checkpoints from previous runs. + + Raises: + RuntimeError: If sync fails due to API errors or configuration issues. + """ + log.info("Starting Rotamaster API connector sync") + + # Extract configuration parameters (SDK auto-validates required fields) + api_key = __get_config_str(configuration, "api_key") + max_records_per_page = __get_config_int(configuration, "max_records_per_page", 100, 1, 1000) + enable_people_sync = __get_config_bool(configuration, "enable_people_sync", True) + enable_teams_sync = __get_config_bool(configuration, "enable_teams_sync", True) + enable_roles_sync = __get_config_bool(configuration, "enable_roles_sync", True) + + # Get state for incremental sync + last_sync_time = state.get("last_sync_time") + + try: + # Fetch people data if enabled + if enable_people_sync: + log.info("Fetching people data...") + people_count = 0 + + for record in get_people_data(api_key, last_sync_time, configuration): + # The 'upsert' operation is used to insert or update data in the destination table. + # The op.upsert method is called with two arguments: + # - The first argument is the name of the table to upsert the data into. + # - The second argument is a dictionary containing the data to be upserted, + op.upsert(table="people", data=record) + people_count += 1 + + # Checkpoint every batch to save progress incrementally + if people_count % max_records_per_page == 0: + checkpoint_state = { + "last_sync_time": record.get( + "sync_timestamp", datetime.now(timezone.utc).isoformat() + ), + "people_processed": people_count, + } + # Save the progress by checkpointing the state. This is important for ensuring that the sync process can resume + # from the correct position in case of next sync or interruptions. + # Learn more about how and where to checkpoint by reading our best practices documentation + # (https://fivetran.com/docs/connectors/connector-sdk/best-practices#largedatasetrecommendation). + op.checkpoint(checkpoint_state) + + log.info(f"Completed people sync. Processed {people_count} records.") + + # Fetch teams data if enabled + if enable_teams_sync: + log.info("Fetching teams data...") + teams_count = 0 + + for record in get_teams_data(api_key, configuration): + # The 'upsert' operation is used to insert or update data in the destination table. + # The op.upsert method is called with two arguments: + # - The first argument is the name of the table to upsert the data into. + # - The second argument is a dictionary containing the data to be upserted, + op.upsert(table="teams", data=record) + teams_count += 1 + + log.info(f"Completed teams sync. Processed {teams_count} records.") + + # Fetch roles data if enabled + if enable_roles_sync: + log.info("Fetching roles data...") + roles_count = 0 + + for record in get_roles_data(api_key, configuration): + # The 'upsert' operation is used to insert or update data in the destination table. + # The op.upsert method is called with two arguments: + # - The first argument is the name of the table to upsert the data into. + # - The second argument is a dictionary containing the data to be upserted, + op.upsert(table="roles", data=record) + roles_count += 1 + + log.info(f"Completed roles sync. Processed {roles_count} records.") + + # Final checkpoint with completion status + final_state = { + "last_sync_time": datetime.now(timezone.utc).isoformat(), + "sync_completed": True, + } + # Save the progress by checkpointing the state. This is important for ensuring that the sync process can resume + # from the correct position in case of next sync or interruptions. + # Learn more about how and where to checkpoint by reading our best practices documentation + # (https://fivetran.com/docs/connectors/connector-sdk/best-practices#largedatasetrecommendation). + op.checkpoint(final_state) + + total_records = sum( + [ + people_count if enable_people_sync else 0, + teams_count if enable_teams_sync else 0, + roles_count if enable_roles_sync else 0, + ] + ) + log.info(f"Sync completed successfully. Total processed: {total_records} records.") + + except Exception as e: + log.severe(f"Sync failed: {str(e)}") + raise RuntimeError(f"Failed to sync data: {str(e)}") + + +# Create the connector instance +connector = Connector(update=update, schema=schema) + +# Check if the script is being run as the main module. +# This is Python's standard entry method allowing your script to be run directly from the command line or IDE 'run' button. +# This is useful for debugging while you write your code. Note this method is not called by Fivetran when executing your connector in production. +# Please test using the Fivetran debug command prior to finalizing and deploying your connector. +if __name__ == "__main__": + # Open the configuration.json file and load its contents + with open("configuration.json", "r") as f: + configuration = json.load(f) + + # Test the connector locally + connector.debug(configuration=configuration)