Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingest data from new commits in IMGTHLA repository #102

Open
wants to merge 460 commits into
base: main
Choose a base branch
from

Conversation

chrisammon3000
Copy link
Contributor

@chrisammon3000 chrisammon3000 commented Feb 12, 2024

Description

  • Updates to gfe-db are triggered by new commits instead of just new branches (quarterly releases)
  • Execution state is tracked for each commit and release version in the IMGTHLA repo
  • Pipeline execution requests are idempotent when executions are in progress
  • Errors during pipeline executions are handled
  • Concurrency for database loading is constrained to one release version at a time to avoid fatal collisions
  • Formatted notifications of pipeline executions are sent by email
  • Pydantic models are implemented for automatic validation

New Commits

The CheckSourceUpdate Lambda function monitors the IMGTHLA source repository for new commits daily. If one or more new commits are found, the service resolves the release version number and creates new rows in the Execution State table (DynamoDB) and processes only the most recent commit for any given release. The commit's execution status is updated throughout pipeline execution using an enum class:

class ExecutionStatus(str, Enum):
    """
    ExecutionStatus is synced using the Step Functions DynamoDB integration:
    NOT_PROCESSED: never processed (set by CheckSourceUpdate) ✅
    SKIPPED: never processed (set by CheckSourceUpdate) ✅
    PENDING: state machine execution started (set by CheckSourceUpdate) ✅
    BUILD_IN_PROGRESS: build started (set by State Machine) ✅
    BUILD_SUCCESS: build succeeded (set by State Machine) ✅
    LOAD_IN_PROGRESS: load started (set by State Machine) ✅
    LOAD_SUCCESS: load succeeded (set by State Machine) ✅
    LOAD_FAILED: load failed (set by State Machine) ✅
    LOAD_INVALID: load invalid from query results (set by State Machine) ✅
    LOAD_SKIPPED: load skipped (set by State Machine) ✅
    BUILD_FAILED: build failed (set by State Machine) ✅
    EXECUTION_FAILED: build or load failed (set by State Machine) ✅
    ABORTED: build or load aborted (set by UpdateExecutionState) ✅
    """
    NOT_PROCESSED = "NOT_PROCESSED"
    SKIPPED = "SKIPPED"
    PENDING = "PENDING"
    BUILD_IN_PROGRESS = "BUILD_IN_PROGRESS"
    BUILD_SUCCESS = "BUILD_SUCCESS"
    BUILD_FAILED = "BUILD_FAILED"
    LOAD_IN_PROGRESS = "LOAD_IN_PROGRESS"
    LOAD_COMPLETE = "LOAD_COMPLETE"
    LOAD_SUCCESS = "LOAD_SUCCESS"
    LOAD_FAILED = "LOAD_FAILED"
    LOAD_INVALID = "LOAD_INVALID"
    LOAD_SKIPPED = "LOAD_SKIPPED"
    EXECUTION_FAILED = "EXECUTION_FAILED"
    ABORTED = "ABORTED"

Execution State

A DynamoDB table is deployed to store state for pipeline executions. For new deployments, the repository's state is built using the GitHub REST API and loaded into the table.

Screenshot 2024-02-11 at 10 58 09 PM Screenshot 2024-02-11 at 11 00 04 PM Screenshot 2024-02-11 at 11 02 45 PM

Pipeline Request Idempotency

Idempotency is acheived by using SQS FIFO queues, where the group ID is the unique deployment ID (${STAGE}-${APP_NAME}) and the deduplication ID is the release version. This means that when messages are in the queue, duplicate messages are not processed and that releases are loaded in chronological order.

Error Handling

Errors that occur during pipeline execution are caught and the state table entry is updated to reflect failure or aborted executions.

Database Concurrency Management

Database concurrency is maintained by a state machine called the Load Concurrency Manager (LCM). The LCM runs continuously when the main pipeline is running (by monitoring a CloudWatch Alarm) and handles the pre- and post-execution backups. This is to ensure that the database is not overloaded or shut-down during the loading process. All requests for loading data to Neo4j pass through a FIFO queue to avoid duplication and maintain the order of release versions. The consumer of the FIFO queue (Message Received?) will only receive one message at a time, and will not be invoked again until loading has succeeded or failed. Once the queue is empty and all releases have been loaded, the LCM will stop running.

Screenshot 2024-02-11 at 11 07 47 PM

Success/Failure Notifications

Notifications are sent by email including execution outcomes, validation results and error information in the event of failure.

Screenshot 2024-02-11 at 11 05 42 PM

Pydantic Models

Pydantic is a Python framework for ensuring data integrity. Every object within the pipeline now uses a Pydantic class for automatic schema and type validation. This prevents the state table from having corrupt or missing fields when reading and writing records.

Infrastructure Changes

  • New Lambda functions
    • FormatResults - Formats notification messages
    • InvokeLoadConcurrencyManager - Triggers the LCM when the Update Pipeline state machine has executions in progress
    • LcmReceiveMessage - Checks the GfeDbLoadQueue for messages
    • UpdateExecutionState - Handle aborted state machines and updates the state table
  • Lambda Layers
    • GfeDbModelsLayer - Contains the logic and methods for building execution state from GitHub API calls as well as models for data handling and validation
  • SQS Queues
    • GfeDbProcessingQueue - Queues releases for processing
    • GfeDbLoadQueue - Queues releases for loading once they are built
  • DynamoDB table
    • GfeDbExecutionStateTable - Stores state for each commit, release and execution combination
  • State Machines
    • LoadConcurrencyManager - Runs continuously during release processing and limits concurrency of loading to 1 release at a time

Known Issues

  • Some of the earlier releases are missing because 1) their commits are not on the default branch (Latest), and 2) because of inconsistencies in the versioning and availability of metadata (fix in progress)
  • Commits should only be processed if they include a change to hla.dat or msf/ since these assets contain the source data (fix in progress)

Next Steps

  • Address the known issues
  • Merge CSV builds before loading to Neo4j to speed up loading
  • Update the documentation
  • Write tests

@chrisammon3000 chrisammon3000 changed the title Ingest data from new commits to IMGTHLA repository Ingest data from new commits in IMGTHLA repository Feb 12, 2024
@chrisammon3000
Copy link
Contributor Author

This is behind the most recent PR for Amazon Linux 2, but I'll update once it's caught up.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Upgrade the pipeline trigger logic to run on updates to previously processed release branches
1 participant