diff --git a/README.md b/README.md index 86bade28c..3b24ed5c2 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,7 @@ These connectors are ready to use out of the box, requiring minimal modification - [ibm_informix_using_ibm_db](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/ibm_informix_using_ibm_db) - This example shows how to connect and sync data from IBM Informix using Connector SDK. This example uses the `ibm_db` library to connect to the Informix database and fetch data. - [influx_db](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/influx_db) - This example shows how to sync data from InfluxDB using Connector SDK. It uses the `influxdb3_python` library to connect to InfluxDB and fetch time-series data from a specified measurement. - [iterate](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/iterate) - This example shows how to sync NPS survey data from the Iterate REST API and load it into your destination using Connector SDK. The connector fetches NPS surveys and their individual responses, providing complete survey analytics data for downstream analysis. +- [jboard](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/jboard) - This example shows how to sync employers, job categories, and alert subscriptions data from Jboard API to your destination warehouse. You need to provide your Jboard API key for this example to work. - [leavedates](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/leavedates) - This example shows how to sync leave report data from LeaveDates API by using the Connector SDK. You need to provide your LeaveDates API token and company ID for this example to work. - [microsoft_excel](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/microsoft_excel) - This example shows how to sync data from Microsoft Excel using Connector SDK. It shows three different ways to sync data from Excel files using `pandas`, `python-calamine` and `openpyxl`. - [microsoft_intune](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/microsoft_intune/) - This example shows how to connect to Microsoft Intune and retrieve all managed devices using the Connector SDK. diff --git a/connectors/jboard/README.md b/connectors/jboard/README.md new file mode 100644 index 000000000..8fbdce31f --- /dev/null +++ b/connectors/jboard/README.md @@ -0,0 +1,104 @@ +# Jboard API Connector Example + +## Connector overview +This connector syncs employers, job categories, and alert subscriptions data from Jboard API to your destination warehouse. The connector fetches employer profiles with their metadata, job categories with hierarchical structure, and user alert subscriptions with search criteria. It supports incremental synchronization using timestamp-based cursors and handles API rate limiting with automatic retry logic. The connector uses memory-efficient streaming to process large datasets without accumulation issues. + +## Requirements +- [Supported Python versions](https://github.com/fivetran/fivetran_connector_sdk/blob/main/README.md#requirements): **3.9-3.13** +- Operating system: + - Windows: 10 or later (64-bit only) + - macOS: 13 (Ventura) or later (Apple Silicon [arm64] or Intel [x86_64]) + - Linux: Distributions such as Ubuntu 20.04 or later, Debian 10 or later, or Amazon Linux 2 or later (arm64 or x86_64) + +## Getting started +Refer to the [Connector SDK Setup Guide](https://fivetran.com/docs/connectors/connector-sdk/setup-guide) to get started. + +## Features +- Syncs employer data, job categories, and alert subscriptions from Jboard API +- Bearer token authentication with secure API key handling (refer to `execute_api_request` function) +- Page-based pagination with automatic page traversal (refer to `get_employers`, `get_categories`, and `get_alert_subscriptions` functions) +- Memory-efficient streaming prevents data accumulation for large datasets using generator patterns +- Incremental synchronization using timestamp-based cursors (refer to `get_time_range` function) +- Comprehensive error handling with exponential backoff retry logic (refer to `__handle_rate_limit` and `__handle_request_error` functions) +- Configurable sync parameters including page size, timeout, and retry attempts +- Cognitive complexity optimization with helper function extraction for maintainability + +## Configuration file +```json +{ + "api_key": "", + "sync_frequency_hours": "", + "initial_sync_days": "", + "max_records_per_page": "", + "request_timeout_seconds": "", + "retry_attempts": "", + "enable_incremental_sync": "", + "enable_debug_logging": "" +} +``` + +### Configuration parameters +- `api_key`: Your Jboard API authentication key (required) +- `sync_frequency_hours`: How often to sync data in hours (default: 4) +- `initial_sync_days`: Days of historical data to fetch on first sync (default: 90) +- `max_records_per_page`: Maximum records per API request (default: 100, range: 1-1000) +- `request_timeout_seconds`: API request timeout in seconds (default: 30) +- `retry_attempts`: Number of retry attempts for failed requests (default: 3) +- `enable_incremental_sync`: Enable timestamp-based incremental sync (default: true) +- `enable_debug_logging`: Enable detailed debug logs (default: false) + +## Requirements file +This connector does not require any additional packages beyond those provided by the Fivetran environment. + +Note: The `fivetran_connector_sdk:latest` and `requests:latest` packages are pre-installed in the Fivetran environment. To avoid dependency conflicts, do not declare them in your `requirements.txt`. + +## Authentication +1. Log in to the [Jboard API Developer Portal](https://app.jboard.io/api/documentation). +2. Navigate to your account settings page to access API credentials. +3. Generate a new API key or copy your existing API key. +4. Make a note of the API key - it will be used as the Bearer token for authentication. +5. Use sandbox credentials for testing, production credentials for live syncing. + +Note: The connector automatically handles Bearer token authentication (API key never expires). Credentials are never logged or exposed in plain text for security. + +## Pagination +Page-based pagination with automatic page traversal (refer to `get_employers`, `get_categories`, and `get_alert_subscriptions` functions). The connector uses `page` and `per_page` parameters to fetch data in configurable chunks. Generator-based processing prevents memory accumulation for large datasets by yielding individual records. Processes pages sequentially while yielding individual records for immediate processing, with pagination metadata used to determine when all data has been fetched. + +## Data handling +Employer, category, and alert subscription data is mapped from Jboard API's format to normalized database columns (refer to the `__map_employer_data`, `__map_category_data`, and `__map_alert_subscription_data` functions). Nested objects like tags arrays are serialized to JSON strings, and all timestamps are converted to UTC format for consistency. + +Supports timestamp-based incremental synchronization using the `last_sync_time` state parameter (refer to the `get_time_range` function). Initial sync can be configured to fetch historical data up to 365 days using the `initial_sync_days` configuration parameter. + +## Error handling +- 429 Rate Limited: Automatic retry with Retry-After header support and exponential backoff (refer to the `__handle_rate_limit` function) +- Timeout handling with configurable retry attempts and exponential backoff with jitter (refer to the `__handle_request_error` function) +- Exponential backoff with jitter prevents multiple clients from making requests at the same time +- HTTP error handling with appropriate retry logic for temporary failures +- Parameter validation with descriptive error messages provides clear guidance for fixing configuration issues + +## Tables created +| Table | Primary Key | Description | +|-------|-------------|-------------| +| EMPLOYERS | `id` | Employer profiles with company information and metadata | +| CATEGORIES | `id` | Job categories with hierarchical structure and settings | +| ALERT_SUBSCRIPTIONS | `id` | User alert subscriptions with search criteria and preferences | + +Column types are automatically inferred by Fivetran. Sample columns include: + +**EMPLOYERS**: `id`, `name`, `description`, `website`, `logo_url`, `featured`, `source`, `created_at`, `updated_at`, `have_posted_jobs`, `have_a_logo`, `sync_timestamp` + +**CATEGORIES**: `id`, `name`, `description`, `parent_id`, `sort_order`, `is_active`, `created_at`, `updated_at`, `sync_timestamp` + +**ALERT_SUBSCRIPTIONS**: `id`, `email`, `query`, `location`, `search_radius`, `remote_work_only`, `category_id`, `job_type`, `tags`, `is_active`, `created_at`, `updated_at`, `sync_timestamp` + +## Additional files + +The connector includes several additional files to support functionality, testing, and deployment: + +- `requirements.txt` – Python dependency specification for Jboard API integration and connector requirements including faker for mock testing. + +- `configuration.json` – Configuration template for API credentials and connector parameters (should be excluded from version control). + + +## Additional considerations +The examples provided are intended to help you effectively use Fivetran's Connector SDK. While we've tested the code, Fivetran cannot be held responsible for any unexpected or negative consequences that may arise from using these examples. For inquiries, please reach out to our Support team. \ No newline at end of file diff --git a/connectors/jboard/configuration.json b/connectors/jboard/configuration.json new file mode 100644 index 000000000..56de5c0e9 --- /dev/null +++ b/connectors/jboard/configuration.json @@ -0,0 +1,10 @@ +{ + "api_key": "", + "sync_frequency_hours": "", + "initial_sync_days": "", + "max_records_per_page": "", + "request_timeout_seconds": "", + "retry_attempts": "", + "enable_incremental_sync": "", + "enable_debug_logging": "" +} \ No newline at end of file diff --git a/connectors/jboard/connector.py b/connectors/jboard/connector.py new file mode 100644 index 000000000..2f481e9ac --- /dev/null +++ b/connectors/jboard/connector.py @@ -0,0 +1,578 @@ +"""Jboard API connector for syncing employers, categories, and alert subscriptions data. +This connector demonstrates how to fetch data from Jboard API and upsert it into destination using memory-efficient streaming patterns. +See the Technical Reference documentation (https://fivetran.com/docs/connectors/connector-sdk/technical-reference#update) +and the Best Practices documentation (https://fivetran.com/docs/connectors/connector-sdk/best-practices) for details +""" + +# For reading configuration from a JSON file +import json + +# Import required classes from fivetran_connector_sdk +from fivetran_connector_sdk import Connector + +# For enabling Logs in your connector code +from fivetran_connector_sdk import Logging as log + +# For supporting Data operations like Upsert(), Update(), Delete() and checkpoint() +from fivetran_connector_sdk import Operations as op + +# For making HTTP requests to Jboard API +import requests + +# For handling dates and timestamps +from datetime import datetime, timedelta, timezone + +# For implementing delays in retry logic and rate limiting +import time + +# For adding jitter to retry delays +import random + +""" ADD YOUR SOURCE-SPECIFIC IMPORTS HERE +Example: import pandas, boto3, etc. +Add comment for each import to explain its purpose for users to follow. +""" +# Private constants (use __ prefix) +__API_ENDPOINT = "https://app.jboard.io/api" +__API_VERSION = "v1" + + +def __get_config_int(configuration, key, default, min_val=None, max_val=None): + """ + Extract and validate integer configuration parameters with range checking. + This function safely extracts integer values from configuration and applies validation. + + Args: + configuration: Configuration dictionary containing connector settings. + key: The configuration key to extract. + default: Default value to return if key is missing or invalid. + min_val: Minimum allowed value (optional). + max_val: Maximum allowed value (optional). + + Returns: + int: The validated integer value or default if validation fails. + """ + try: + value = int(configuration.get(key, default)) + if min_val is not None and value < min_val: + return default + if max_val is not None and value > max_val: + return default + return value + except (ValueError, TypeError): + return default + + +def __get_config_str(configuration, key, default=""): + """ + Extract string configuration parameters with type safety. + This function safely extracts string values from configuration dictionary. + + Args: + configuration: Configuration dictionary containing connector settings. + key: The configuration key to extract. + default: Default value to return if key is missing. + + Returns: + str: The string value or default if key is missing. + """ + return str(configuration.get(key, default)) + + +def __get_config_bool(configuration, key, default=False): + """ + Extract and parse boolean configuration parameters from strings or boolean values. + This function handles string representations of boolean values commonly used in JSON configuration. + + Args: + configuration: Configuration dictionary containing connector settings. + key: The configuration key to extract. + default: Default boolean value to return if key is missing. + + Returns: + bool: The parsed boolean value or default if key is missing. + """ + value = configuration.get(key, default) + if isinstance(value, str): + return value.lower() in ("true", "1", "yes", "on") + return bool(value) + + +def __calculate_wait_time(attempt, response_headers, base_delay=1, max_delay=60): + """ + Calculate exponential backoff wait time with jitter for retry attempts. + This function implements exponential backoff with random jitter to prevent thundering herd problems. + + Args: + attempt: Current attempt number (0-based). + response_headers: HTTP response headers dictionary that may contain Retry-After. + base_delay: Base delay in seconds for exponential backoff. + max_delay: Maximum delay cap in seconds. + + Returns: + float: Wait time in seconds before next retry attempt. + """ + if "Retry-After" in response_headers: + return min(int(response_headers["Retry-After"]), max_delay) + + # Exponential backoff with jitter + wait_time = min(base_delay * (2**attempt), max_delay) + jitter = random.uniform(0.1, 0.3) * wait_time + return wait_time + jitter + + +def __handle_rate_limit(attempt, response): + """ + Handle HTTP 429 rate limiting responses with appropriate delays. + This function logs the rate limit and waits before allowing retry attempts. + + Args: + attempt: Current attempt number for logging purposes. + response: HTTP response object containing rate limit headers. + """ + wait_time = __calculate_wait_time(attempt, response.headers) + log.warning(f"Rate limit hit, waiting {wait_time:.1f} seconds before retry {attempt + 1}") + time.sleep(wait_time) + + +def __handle_request_error(attempt, retry_attempts, error, endpoint): + """ + Handle request errors with exponential backoff retry logic. + This function manages retry attempts for failed API requests with appropriate delays. + + Args: + attempt: Current attempt number (0-based). + retry_attempts: Total number of retry attempts allowed. + error: The exception that occurred during the request. + endpoint: API endpoint that failed for logging purposes. + + Raises: + Exception: Re-raises the original error after all retry attempts are exhausted. + """ + if attempt < retry_attempts - 1: + wait_time = __calculate_wait_time(attempt, {}) + log.warning( + f"Request failed for {endpoint}: {str(error)}. Retrying in {wait_time:.1f} seconds..." + ) + time.sleep(wait_time) + else: + log.severe(f"All retry attempts failed for {endpoint}: {str(error)}") + raise error + + +def execute_api_request(endpoint, api_key, params=None, configuration=None): + """ + Execute HTTP API requests with comprehensive error handling and retry logic. + This function handles authentication, rate limiting, timeouts, and network errors. + + Args: + endpoint: API endpoint path to request. + api_key: Authentication key for API access. + params: Query parameters for the request (optional). + configuration: Configuration dictionary for timeout and retry settings. + + Returns: + dict: Parsed JSON response from the API. + + Raises: + RuntimeError: If all retry attempts fail or unexpected errors occur. + requests.exceptions.RequestException: For unrecoverable HTTP errors. + """ + url = f"{__API_ENDPOINT}{endpoint}" + headers = {"Authorization": f"Bearer {api_key}"} + + timeout = __get_config_int(configuration, "request_timeout_seconds", 30) + retry_attempts = __get_config_int(configuration, "retry_attempts", 3) + + for attempt in range(retry_attempts): + try: + response = requests.get(url, headers=headers, params=params, timeout=timeout) + + if response.status_code == 429: + __handle_rate_limit(attempt, response) + continue + + response.raise_for_status() + return response.json() + + except requests.exceptions.RequestException as e: + __handle_request_error(attempt, retry_attempts, e, endpoint) + continue + + raise RuntimeError("Unexpected error in API request execution") + + +def get_time_range(last_sync_time=None, configuration=None): + """ + Generate time range for incremental or initial data synchronization. + This function creates start and end timestamps for API queries based on sync state. + + Args: + last_sync_time: Timestamp of last successful sync (optional). + configuration: Configuration dictionary containing sync settings. + + Returns: + dict: Dictionary containing 'start' and 'end' timestamps in ISO format. + """ + end_time = datetime.now(timezone.utc).isoformat() + + if last_sync_time: + start_time = last_sync_time + else: + initial_sync_days = __get_config_int(configuration, "initial_sync_days", 90) + start_time = (datetime.now(timezone.utc) - timedelta(days=initial_sync_days)).isoformat() + + return {"start": start_time, "end": end_time} + + +def __map_employer_data(record): + """ + Transform API response record to EMPLOYERS table schema format. + This function maps raw API fields to normalized database column names and types. + + Args: + record: Raw API response record dictionary. + + Returns: + dict: Transformed record ready for database insertion. + """ + return { + "id": record.get("id", ""), + "name": record.get("name", ""), + "description": record.get("description", ""), + "website": record.get("website", ""), + "logo_url": record.get("logo_url", ""), + "featured": record.get("featured", False), + "source": record.get("source", ""), + "created_at": record.get("created_at", ""), + "updated_at": record.get("updated_at", ""), + "have_posted_jobs": record.get("have_posted_jobs", False), + "have_a_logo": record.get("have_a_logo", False), + "sync_timestamp": datetime.now(timezone.utc).isoformat(), + } + + +def __map_category_data(record): + """ + Transform API response record to CATEGORIES table schema format. + This function maps raw API fields to normalized database column names and types. + + Args: + record: Raw API response record dictionary. + + Returns: + dict: Transformed record ready for database insertion. + """ + return { + "id": record.get("id", ""), + "name": record.get("name", ""), + "description": record.get("description", ""), + "parent_id": record.get("parent_id"), + "sort_order": record.get("sort_order", 0), + "is_active": record.get("is_active", True), + "created_at": record.get("created_at", ""), + "updated_at": record.get("updated_at", ""), + "sync_timestamp": datetime.now(timezone.utc).isoformat(), + } + + +def __map_alert_subscription_data(record): + """ + Transform API response record to ALERT_SUBSCRIPTIONS table schema format. + This function maps raw API fields to normalized database column names and types. + + Args: + record: Raw API response record dictionary. + + Returns: + dict: Transformed record ready for database insertion. + """ + return { + "id": record.get("id", ""), + "email": record.get("email", ""), + "query": record.get("query", ""), + "location": record.get("location", ""), + "search_radius": record.get("search_radius", 0), + "remote_work_only": record.get("remote_work_only", False), + "category_id": record.get("category_id"), + "job_type": record.get("job_type", ""), + "tags": json.dumps(record.get("tags", [])) if record.get("tags") else "[]", + "is_active": record.get("is_active", True), + "created_at": record.get("created_at", ""), + "updated_at": record.get("updated_at", ""), + "sync_timestamp": datetime.now(timezone.utc).isoformat(), + } + + +def get_employers(api_key, last_sync_time=None, configuration=None): + """ + Fetch employers data using memory-efficient streaming approach with pagination. + This generator function prevents memory accumulation by yielding individual records. + + Args: + api_key: API authentication key for making requests. + last_sync_time: Timestamp for incremental sync (optional). + configuration: Configuration dictionary containing connector settings. + + Yields: + dict: Individual employer records mapped to destination schema. + + Raises: + RuntimeError: If API requests fail after all retry attempts. + """ + endpoint = "/employers" + max_records = __get_config_int(configuration, "max_records_per_page", 100) + + params = { + "per_page": max_records, + "page": 1, + } + + # Add incremental sync filters if last_sync_time provided + if last_sync_time: + params["created_at_from"] = last_sync_time + + page = 1 + while True: + params["page"] = page + response = execute_api_request(endpoint, api_key, params, configuration) + + # Handle different response formats + data = response.get("data", response.get("employers", [])) + if not data: + break + + # Yield individual records instead of accumulating + for record in data: + yield __map_employer_data(record) + + # Check pagination metadata + meta = response.get("meta", {}) + current_page = meta.get("current_page", page) + last_page = meta.get("last_page", page) + + if current_page >= last_page or len(data) < max_records: + break + page += 1 + + +def get_categories(api_key, last_sync_time=None, configuration=None): + """ + Fetch categories data using memory-efficient streaming approach with pagination. + This generator function prevents memory accumulation by yielding individual records. + + Args: + api_key: API authentication key for making requests. + last_sync_time: Timestamp for incremental sync (optional). + configuration: Configuration dictionary containing connector settings. + + Yields: + dict: Individual category records mapped to destination schema. + + Raises: + RuntimeError: If API requests fail after all retry attempts. + """ + endpoint = "/categories" + max_records = __get_config_int(configuration, "max_records_per_page", 100) + + params = { + "per_page": max_records, + "page": 1, + } + + page = 1 + while True: + params["page"] = page + response = execute_api_request(endpoint, api_key, params, configuration) + + # Handle different response formats + data = response.get("data", response.get("categories", [])) + if not data: + break + + # Yield individual records instead of accumulating + for record in data: + yield __map_category_data(record) + + # Check pagination metadata + meta = response.get("meta", {}) + current_page = meta.get("current_page", page) + last_page = meta.get("last_page", page) + + if current_page >= last_page or len(data) < max_records: + break + page += 1 + + +def get_alert_subscriptions(api_key, last_sync_time=None, configuration=None): + """ + Fetch alert subscriptions data using memory-efficient streaming approach with pagination. + This generator function prevents memory accumulation by yielding individual records. + + Args: + api_key: API authentication key for making requests. + last_sync_time: Timestamp for incremental sync (optional). + configuration: Configuration dictionary containing connector settings. + + Yields: + dict: Individual alert subscription records mapped to destination schema. + + Raises: + RuntimeError: If API requests fail after all retry attempts. + """ + endpoint = "/alert_subscriptions" + max_records = __get_config_int(configuration, "max_records_per_page", 100) + + params = { + "per_page": max_records, + "page": 1, + } + + page = 1 + while True: + params["page"] = page + response = execute_api_request(endpoint, api_key, params, configuration) + + # Handle different response formats + data = response.get("data", response.get("alert_subscriptions", [])) + if not data: + break + + # Yield individual records instead of accumulating + for record in data: + yield __map_alert_subscription_data(record) + + # Check pagination metadata + meta = response.get("meta", {}) + current_page = meta.get("current_page", page) + last_page = meta.get("last_page", page) + + if current_page >= last_page or len(data) < max_records: + break + page += 1 + + +def schema(configuration: dict): + """ + Define database schema with table names and primary keys for the connector. + This function specifies the destination tables and their primary keys for Fivetran to create. + + Args: + configuration: Configuration dictionary (not used but required by SDK). + + Returns: + list: List of table schema dictionaries with table names and primary keys. + """ + return [ + {"table": "employers", "primary_key": ["id"]}, + {"table": "categories", "primary_key": ["id"]}, + {"table": "alert_subscriptions", "primary_key": ["id"]}, + ] + + +def update(configuration: dict, state: dict): + """ + Main synchronization function that fetches and processes data from the Jboard API. + This function orchestrates the entire sync process using memory-efficient streaming patterns. + + Args: + configuration: Configuration dictionary containing API credentials and settings. + state: State dictionary containing sync cursors and checkpoints from previous runs. + + Raises: + RuntimeError: If sync fails due to API errors or configuration issues. + """ + log.info("Starting Jboard API connector sync") + + # Extract configuration parameters (SDK auto-validates required fields) + api_key = __get_config_str(configuration, "api_key") + max_records_per_page = __get_config_int(configuration, "max_records_per_page", 100, 1, 1000) + enable_incremental = __get_config_bool(configuration, "enable_incremental_sync", True) + + # Get state for incremental sync + last_sync_time = state.get("last_sync_time") if enable_incremental else None + + try: + # Fetch employers data using generator with incremental checkpointing + log.info("Fetching employers data...") + employer_count = 0 + for record in get_employers(api_key, last_sync_time, configuration): + # The 'upsert' operation is used to insert or update data in the destination table. + # The op.upsert method is called with two arguments: + # - The first argument is the name of the table to upsert the data into. + # - The second argument is a dictionary containing the data to be upserted, + op.upsert(table="employers", data=record) + employer_count += 1 + + # Checkpoint every page/batch to save progress incrementally + if employer_count % max_records_per_page == 0: + checkpoint_state = { + "last_sync_time": record.get( + "updated_at", datetime.now(timezone.utc).isoformat() + ), + "employers_processed": employer_count, + } + # Save the progress by checkpointing the state. This is important for ensuring that the sync process can resume + # from the correct position in case of next sync or interruptions. + # Learn more about how and where to checkpoint by reading our best practices documentation + # (https://fivetran.com/docs/connectors/connector-sdk/best-practices#largedatasetrecommendation). + op.checkpoint(checkpoint_state) + + # Fetch categories data + log.info("Fetching categories data...") + category_count = 0 + for record in get_categories(api_key, last_sync_time, configuration): + # The 'upsert' operation is used to insert or update data in the destination table. + # The op.upsert method is called with two arguments: + # - The first argument is the name of the table to upsert the data into. + # - The second argument is a dictionary containing the data to be upserted, + op.upsert(table="categories", data=record) + category_count += 1 + + # Fetch alert subscriptions data + log.info("Fetching alert subscriptions data...") + alert_count = 0 + for record in get_alert_subscriptions(api_key, last_sync_time, configuration): + # The 'upsert' operation is used to insert or update data in the destination table. + # The op.upsert method is called with two arguments: + # - The first argument is the name of the table to upsert the data into. + # - The second argument is a dictionary containing the data to be upserted, + op.upsert(table="alert_subscriptions", data=record) + alert_count += 1 + + # Final checkpoint with completion status + final_state = { + "last_sync_time": datetime.now(timezone.utc).isoformat(), + "employers_processed": employer_count, + "categories_processed": category_count, + "alert_subscriptions_processed": alert_count, + } + # Save the progress by checkpointing the state. This is important for ensuring that the sync process can resume + # from the correct position in case of next sync or interruptions. + # Learn more about how and where to checkpoint by reading our best practices documentation + # (https://fivetran.com/docs/connectors/connector-sdk/best-practices#largedatasetrecommendation). + op.checkpoint(final_state) + + log.info( + f"Sync completed successfully. Processed {employer_count} employers, {category_count} categories, {alert_count} alert subscriptions." + ) + + except Exception as e: + log.severe(f"Sync failed: {str(e)}") + raise RuntimeError(f"Failed to sync data: {str(e)}") + + +# Create the Connector instance +connector = Connector(update=update, schema=schema) + +# Check if the script is being run as the main module. +# This is Python's standard entry method allowing your script to be run directly from the command line or IDE 'run' button. +# This is useful for debugging while you write your code. Note this method is not called by Fivetran when executing your connector in production. +# Please test using the Fivetran debug command prior to finalizing and deploying your connector. +if __name__ == "__main__": + # Open the configuration.json file and load its contents + with open("configuration.json", "r") as f: + configuration = json.load(f) + + # Test the connector locally + connector.debug(configuration=configuration)