-
Notifications
You must be signed in to change notification settings - Fork 30
add dagWorks connector example #441
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
🧹 Python Code Quality Check📎 Download full report from workflow artifacts. 📌 Only Python files changed in this PR were checked. This comment is auto-updated with every commit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a new DagWorks connector that fetches pipeline metadata and run status from the DagWorks API. However, the implementation uses an incorrect API pattern that is incompatible with the Fivetran Connector SDK.
Key Changes:
- Adds a new connector at
connectors/DagWorks/connector.pyusing decorator-based configuration - Implements a single table schema for pipeline runs
- Fetches data from a DagWorks API endpoint
| import requests | ||
| from fivetran_connector_sdk import connector, config, state, records, log, schema |
Copilot
AI
Oct 31, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BLOCKER: The imports are using an incorrect SDK API. The Fivetran Connector SDK does not expose connector, config, state, records, or schema as importable modules with these APIs. The correct SDK v2+ imports should be: from fivetran_connector_sdk import Connector, from fivetran_connector_sdk import Logging as log, and from fivetran_connector_sdk import Operations as op. See the template at template_example_connector/connector.py lines 11-17 for the correct import pattern.
| CONFIG = config.Config( | ||
| base_url=config.StringField(description="DagWorks API base URL"), | ||
| api_key=config.SecretField() | ||
| ) |
Copilot
AI
Oct 31, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BLOCKER: The SDK does not use a config.Config() API for configuration. Configuration should be defined in a separate configuration.json file with fields like base_url and api_key, which the SDK will auto-validate. Configuration values are then accessed in the update() function via the configuration parameter dictionary (e.g., configuration['base_url']).
| SCHEMA = schema.Schema( | ||
| name="dagworks_runs", | ||
| columns={ | ||
| "id": schema.StringColumn(), | ||
| "dag_id": schema.StringColumn(), | ||
| "status": schema.StringColumn(), | ||
| "start_time": schema.StringColumn(), | ||
| "end_time": schema.StringColumn(), | ||
| } | ||
| ) |
Copilot
AI
Oct 31, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BLOCKER: The SDK does not use a schema.Schema() API with column type classes. Schema should be defined in a schema() function that returns a list of table dictionaries. For example: return [{"table": "dagworks_runs", "primary_key": ["id"], "columns": {"id": "STRING", "dag_id": "STRING", "status": "STRING", "start_time": "STRING", "end_time": "STRING"}}]. See template_example_connector/connector.py lines 65-82 for the correct pattern.
| @connector( | ||
| name="DagWorksConnector", | ||
| version="0.1.0", | ||
| config=CONFIG, | ||
| schema=SCHEMA, | ||
| ) | ||
| def run_connector(ctx: state.Context): |
Copilot
AI
Oct 31, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BLOCKER: The SDK does not use a decorator-based @connector API. The correct pattern is to define an update(configuration: dict, state: dict) function and optionally a schema(configuration: dict) function, then create a connector object using connector = Connector(update=update, schema=schema). The update function should accept configuration and state dictionaries, not a Context object. See template_example_connector/connector.py lines 85-154 for the correct structure.
| headers = {"Authorization": f"Bearer {ctx.config.api_key}"} | ||
| response = requests.get(f"{ctx.config.base_url}/runs", headers=headers) | ||
| response.raise_for_status() |
Copilot
AI
Oct 31, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BLOCKER: No retry logic is implemented for the API request. Network calls must include retry logic with exponential backoff to handle transient failures. Wrap the request in a try-except block catching specific exceptions like requests.Timeout, requests.ConnectionError, and implement retries with a pattern like: for attempt in range(__MAX_RETRIES): try: ... except (...) as e: if attempt == __MAX_RETRIES - 1: raise; sleep_time = min(60, 2 ** attempt); time.sleep(sleep_time).
| """ | ||
| DagWorks connector for Fivetran Connector SDK. | ||
| Fetches pipeline metadata and run status from DagWorks API. | ||
| """ |
Copilot
AI
Oct 31, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The module docstring is missing the required reference to technical documentation. It should follow the template format: Include a description and add references like See the Technical Reference documentation (https://fivetran.com/docs/connectors/connector-sdk/technical-reference#update) and the Best Practices documentation (https://fivetran.com/docs/connectors/connector-sdk/best-practices) for details. See template_example_connector/connector.py lines 1-5.
| Fetches pipeline metadata and run status from DagWorks API. | ||
| """ | ||
|
|
||
| import requests |
Copilot
AI
Oct 31, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The requests import is missing an explanatory comment. Every import must have an inline comment explaining its purpose. This should be: import requests # For making HTTP API requests (NOTE: provided by SDK runtime). Additionally, requests should never be added to requirements.txt as it's provided by the SDK runtime.
| for run in response.json().get("runs", []): | ||
| records.write("dagworks_runs", run) | ||
|
|
||
| return ctx.update_state({"last_sync": "now"}) |
Copilot
AI
Oct 31, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The state value "now" is not a valid timestamp format. State should use ISO 8601 format timestamps (e.g., "2024-01-15T10:30:00Z") or other meaningful cursor values. Using "now" as a string literal is not actionable for incremental syncs and doesn't provide proper cursor-based pagination.
| """ | ||
| DagWorks connector for Fivetran Connector SDK. | ||
| Fetches pipeline metadata and run status from DagWorks API. | ||
| """ | ||
|
|
||
| import requests | ||
| from fivetran_connector_sdk import connector, config, state, records, log, schema | ||
|
|
||
| CONFIG = config.Config( |
Copilot
AI
Oct 31, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The connector is missing the required main block for debugging. At the end of the file, you must include: connector = Connector(update=update, schema=schema) followed by an if __name__ == "__main__": block that loads configuration from a JSON file and calls connector.debug(configuration=configuration). See template_example_connector/connector.py lines 153-166 for the exact pattern and required comments.
| """ | ||
|
|
||
| import requests | ||
| from fivetran_connector_sdk import connector, config, state, records, log, schema |
Copilot
AI
Oct 31, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import of 'log' is not used.
| from fivetran_connector_sdk import connector, config, state, records, log, schema | |
| from fivetran_connector_sdk import connector, config, state, records, schema |
Jira ticket
Closes
<ADD TICKET LINK HERE, EACH PR MUST BE LINKED TO A JIRA TICKET>Description of Change
<MENTION A SHORT DESCRIPTION OF YOUR CHANGES HERE>Testing
<MENTION ABOUT YOUR TESTING DETAILS HERE, ATTACH SCREENSHOTS IF NEEDED (WITHOUT PII)>Checklist
Some tips and links to help validate your PR:
fivetran debugcommand.