Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions connectors/DagWorks/connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""
DagWorks connector for Fivetran Connector SDK.
Fetches pipeline metadata and run status from DagWorks API.
"""
Comment on lines +1 to +4
Copy link

Copilot AI Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The module docstring is missing the required reference to technical documentation. It should follow the template format: Include a description and add references like See the Technical Reference documentation (https://fivetran.com/docs/connectors/connector-sdk/technical-reference#update) and the Best Practices documentation (https://fivetran.com/docs/connectors/connector-sdk/best-practices) for details. See template_example_connector/connector.py lines 1-5.

Copilot generated this review using guidance from repository custom instructions.

import requests
Copy link

Copilot AI Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The requests import is missing an explanatory comment. Every import must have an inline comment explaining its purpose. This should be: import requests # For making HTTP API requests (NOTE: provided by SDK runtime). Additionally, requests should never be added to requirements.txt as it's provided by the SDK runtime.

Copilot generated this review using guidance from repository custom instructions.
from fivetran_connector_sdk import connector, config, state, records, log, schema
Comment on lines +6 to +7
Copy link

Copilot AI Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BLOCKER: The imports are using an incorrect SDK API. The Fivetran Connector SDK does not expose connector, config, state, records, or schema as importable modules with these APIs. The correct SDK v2+ imports should be: from fivetran_connector_sdk import Connector, from fivetran_connector_sdk import Logging as log, and from fivetran_connector_sdk import Operations as op. See the template at template_example_connector/connector.py lines 11-17 for the correct import pattern.

Copilot generated this review using guidance from repository custom instructions.
Copy link

Copilot AI Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'log' is not used.

Suggested change
from fivetran_connector_sdk import connector, config, state, records, log, schema
from fivetran_connector_sdk import connector, config, state, records, schema

Copilot uses AI. Check for mistakes.

CONFIG = config.Config(
Comment on lines +1 to +9
Copy link

Copilot AI Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connector is missing the required main block for debugging. At the end of the file, you must include: connector = Connector(update=update, schema=schema) followed by an if __name__ == "__main__": block that loads configuration from a JSON file and calls connector.debug(configuration=configuration). See template_example_connector/connector.py lines 153-166 for the exact pattern and required comments.

Copilot generated this review using guidance from repository custom instructions.
base_url=config.StringField(description="DagWorks API base URL"),
api_key=config.SecretField()
)
Comment on lines +9 to +12
Copy link

Copilot AI Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BLOCKER: The SDK does not use a config.Config() API for configuration. Configuration should be defined in a separate configuration.json file with fields like base_url and api_key, which the SDK will auto-validate. Configuration values are then accessed in the update() function via the configuration parameter dictionary (e.g., configuration['base_url']).

Copilot generated this review using guidance from repository custom instructions.

SCHEMA = schema.Schema(
name="dagworks_runs",
columns={
"id": schema.StringColumn(),
"dag_id": schema.StringColumn(),
"status": schema.StringColumn(),
"start_time": schema.StringColumn(),
"end_time": schema.StringColumn(),
}
)
Comment on lines +14 to +23
Copy link

Copilot AI Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BLOCKER: The SDK does not use a schema.Schema() API with column type classes. Schema should be defined in a schema() function that returns a list of table dictionaries. For example: return [{"table": "dagworks_runs", "primary_key": ["id"], "columns": {"id": "STRING", "dag_id": "STRING", "status": "STRING", "start_time": "STRING", "end_time": "STRING"}}]. See template_example_connector/connector.py lines 65-82 for the correct pattern.

Copilot generated this review using guidance from repository custom instructions.

@connector(
name="DagWorksConnector",
version="0.1.0",
config=CONFIG,
schema=SCHEMA,
)
def run_connector(ctx: state.Context):
Comment on lines +25 to +31
Copy link

Copilot AI Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BLOCKER: The SDK does not use a decorator-based @connector API. The correct pattern is to define an update(configuration: dict, state: dict) function and optionally a schema(configuration: dict) function, then create a connector object using connector = Connector(update=update, schema=schema). The update function should accept configuration and state dictionaries, not a Context object. See template_example_connector/connector.py lines 85-154 for the correct structure.

Copilot generated this review using guidance from repository custom instructions.
headers = {"Authorization": f"Bearer {ctx.config.api_key}"}
response = requests.get(f"{ctx.config.base_url}/runs", headers=headers)
Copy link

Copilot AI Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BLOCKER: The code loads all runs into memory at once by calling .json().get("runs", []) on the entire response. For large datasets, this can cause memory overflow. The code should implement pagination to process data in chunks. Add pagination logic to fetch runs in batches and process each page immediately, checkpointing state after each page.

Copilot generated this review using guidance from repository custom instructions.
response.raise_for_status()
Comment on lines +32 to +34
Copy link

Copilot AI Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BLOCKER: No retry logic is implemented for the API request. Network calls must include retry logic with exponential backoff to handle transient failures. Wrap the request in a try-except block catching specific exceptions like requests.Timeout, requests.ConnectionError, and implement retries with a pattern like: for attempt in range(__MAX_RETRIES): try: ... except (...) as e: if attempt == __MAX_RETRIES - 1: raise; sleep_time = min(60, 2 ** attempt); time.sleep(sleep_time).

Copilot generated this review using guidance from repository custom instructions.

for run in response.json().get("runs", []):
records.write("dagworks_runs", run)
Copy link

Copilot AI Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BLOCKER: The SDK does not use records.write() for data operations. The correct SDK v2+ API is op.upsert(table, data) where op is imported as from fivetran_connector_sdk import Operations as op. Each upsert call should be preceded by a comment: # The 'upsert' operation is used to insert or update data in the destination table. The first argument is the name of the destination table. The second argument is a dictionary containing the record to be upserted.

Copilot generated this review using guidance from repository custom instructions.

return ctx.update_state({"last_sync": "now"})
Copy link

Copilot AI Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BLOCKER: The SDK does not use ctx.update_state() or return values from the update function. State management is done via op.checkpoint(state) which should be called after processing each page/batch of data. The update function should return nothing (implicitly returns None). Each checkpoint call must be preceded by a comment: # Save the progress by checkpointing the state. This is important for ensuring that the sync process can resume from the correct position in case of next sync or interruptions. Learn more about how and where to checkpoint by reading our best practices documentation (https://fivetran.com/docs/connectors/connector-sdk/best-practices#largedatasetrecommendation).

Copilot generated this review using guidance from repository custom instructions.
Copy link

Copilot AI Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The state value "now" is not a valid timestamp format. State should use ISO 8601 format timestamps (e.g., "2024-01-15T10:30:00Z") or other meaningful cursor values. Using "now" as a string literal is not actionable for incremental syncs and doesn't provide proper cursor-based pagination.

Copilot generated this review using guidance from repository custom instructions.
Loading