Skip to content

Data ingestion and transformation pipeline for AirBnB using Azure Data Factory, Synapse, CosmosDB, and ADLS for automated CDC and analytical reporting.

License

Notifications You must be signed in to change notification settings

Adarsh-Hota/CDC_SCD-1_Pipeline

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

19 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

AirBnB CDC Ingestion Pipeline

This project implements a data ingestion and transformation pipeline for AirBnB using Azure Data Factory (ADF), Synapse Analytics, CosmosDB, and other Azure technologies. It ensures efficient data processing and automated updates to maintain an up-to-date data warehouse.

Below is the architecture diagram showcasing how data flows through the different components:

AirBnB CDC Pipeline Architecture


Table of Contents


Tech Stack

  • Azure Data Factory (ADF): Orchestration of pipelines for data movement and transformations.
  • Azure Data Lake Storage (ADLS): Storage for raw and archived data.
  • Azure Synapse Analytics: Data warehouse for analytical queries.
  • Azure Cosmos DB for NoSQL: Source of change data for booking events.
  • Python: Custom data generation scripts.
  • T-SQL: Database and transformation logic.

SQL Files for Table Creation

The SQL files for creating the tables in the Synapse SQL Pool can be found in the following locations:

  1. Customer Dimension Table:

  2. Bookings Dimension Table:

  3. Booking Customer Aggregation Fact Table:


Pipeline Features

AirBnB CDC Pipeline

  1. Hourly SCD-1 Updates:

    • Reads customer data from ADLS every hour.
    • Performs Slowly Changing Dimension Type 1 (SCD-1) updates on the customer_dim table in Synapse Analytics, ensuring that the customer data is always up-to-date.
  2. Change Data Capture (CDC):

    • Captures incremental booking events from CosmosDB using change feeds.
    • Processes these events in Azure Data Factory (ADF), performs necessary transformations, and upserts the resulting data into Synapse.
  3. Automated Workflows:

    • Configures triggers and dependencies in ADF to automate the entire process, ensuring seamless and continuous data flow.

AirBnB CDC Pipeline Configuration File

You can find the JSON configuration file for this pipeline here:
AirBnBCDCPipeline.json


LoadCustomerDim Pipeline

The LoadCustomerDim pipeline implements Change Data Capture (CDC) to process customer data. It retrieves raw customer files from Azure Data Lake Storage (ADLS), applies transformations, and updates the airbnb.customer_dim in Azure Synapse Analytics. The pipeline also manages raw data file archiving and cleanup to maintain an organized data lake.

LoadCustomerDimPipeline


Pipeline Activities

LoadCustomerDimPipeline

1. Get Metadata of Files

Activity Name: Get MetadataOfEachFileInCustomerRawDataContainer

  • Retrieves metadata for each file in the customer_raw_data folder on ADLS.
  • This metadata is used to identify the files to be processed in the subsequent activity.
  • All the input CSV files are uploaded in the datasets folder here - Customer Data

2. Process Each File

Activity Name: ForEachFileInCustomerRawContainer

AirBnB Storage Container Folders

This activity iterates over each raw customer file identified in the previous step and performs the following sub-steps:

  1. Copy File Data to Synapse SQL Pool

    • Purpose: Transfers raw customer data to the airbnb.customer_dim table in Azure Synapse Analytics.
    • Method: Performs an Upsert operation using the customer_id as the primary key.
    • Format: Source files are in DelimitedText format.
  2. Archive Processed Files

    • Purpose: Moves successfully processed files from the customer_raw_data folder to the customer_archive folder.
    • Benefit: Ensures raw files are preserved for future reference while maintaining a clean source folder.
  3. Delete Raw Files

    • Purpose: Deletes raw files from the customer_raw_data folder after successful archiving.
    • Benefit: Prevents duplicate processing and optimizes storage space in the data lake.

LoadCustomerDim Pipeline Configuration File

You can find the JSON configuration file for this pipeline here:
LoadCustomerDimData.json


Booking_Dim_Fact_Transformation Pipeline

The Booking_Dim_Fact_Transformation pipeline performs several data transformations to update the airbnb.bookings_dim table in Azure Synapse. It uses data from CosmosDB and Synapse SQL Pool for enriching and updating booking data.

BookingsDimFactTransformationPipeline


Pipeline Activities

1. Data Flow Activity

This activity uses Azure Data Factory Data Flow to transform the raw booking data. It performs several transformations and enrichment on the data before it is written to the airbnb.bookings_dim table in Azure Synapse.

BookingDimDataflow

This Mapping Data Flow processes the booking records from Azure Cosmos DB and writes the transformed data to the Bookings Dimension Table in Azure Synapse SQL Pool.

1. Sources
  • CosmosDBBookingDataset: A dataset representing the source of the booking data in Cosmos DB.

    CosmoDBBookingData

  • The Python script generates booking data for CosmosDB, simulating real-time insertion with a delay between each record. The script sets up the CosmosDB client, creating the database and container, generating data using Faker, and inserting each booking record. Reference to the script file - cosmodb_generate_mock_data.py

    CosmoDBPythonGenerateBookingData

  • SynapseSQLPoolBookingsDimTable: The dataset representing the destination table in Azure Synapse SQL Pool, where the transformed data is written.

2. Transformations

ETL_BookingDimDataDataflowRun

  • DataQualityCheck:

    • Validates the incoming data and splits records into BadRecords and AcceptedRecords based on whether the check_out_date is later than the check_in_date
  • DerivedColumn:

    • Creates new columns for the records such as:
      • stay_duration: Calculated as the difference between check_out_date and check_in_date.
      • booking_year and booking_month: Extracted from the booking_date.
      • full_address: A concatenation of the city and country.
      • Other necessary fields like booking_id, amount, currency, etc.
  • JoinWithSynapse:

    • Performs a lookup to join incoming records with existing data in the Synapse bookings_dim Table based on the booking_id. This is to identify new or updated records.
  • SetInsertAndUpdateAlterRowPolicies:

    • Determines whether the record should be inserted or updated based on the existence of booking_id in the Synapse Booking Dim Table.
  • CastColumns:

    • Casts the data to the appropriate types for the destination table in Synapse.
  • SpecificColumns:

    • Filters out unnecessary columns, ensuring that only required columns are passed along for further processing.
  • FinalColumns:

    • Finalizes the columns to match the destination schema and handles the data before it is written to the Synapse SQL pool.
3. Sinks
  • SinkSynapseBookingDimTable: This is the final sink where processed booking data is inserted into the Synapse SQL Pool table.

    The JSON configuration file for this dataflow can be found here:
    ETL_BookingDimData.json


2. Stored Procedure Activity

After the data flow activity completes successfully, the AggregateBookingFactData activity calls a stored procedure to aggregate the transformed booking data. This stored procedure updates the airbnb.booking_customer_aggregation_fact table, which contains aggregated data at the customer level.

Stored Procedure Execution
  • Stored Procedure Name: [airbnb].[sp_aggregate_booking_data]
  • Linked Service: Azure Synapse Analytics
  • Dependency: Depends on the successful completion of the previous data flow activity.
  • Timeout and Retry: Timeout of 12 hours and 0 retries if it fails.
Stored Procedure Logic
  1. Truncates the airbnb.booking_customer_aggregation_fact table to remove outdated data.
  2. Aggregates booking data from the airbnb.bookings_dim table by joining with the airbnb.customer_dim table. It calculates:
    • total_bookings: The count of bookings per country.
    • total_amount: The sum of booking amounts for each country.
    • last_booking_date: The latest booking date per country.
  3. Inserts the aggregated results back into the airbnb.booking_customer_aggregation_fact table.

The SQL script for the stored procedure can be found here:
sp_aggregate_booking_data.sql.sql


Booking_Dim_Fact_Transformation Pipeline Configuration File

You can find the JSON configuration file for this pipeline here:
BookingDimFactTransformation.json


Outputs

The following tables are created and populated in Azure Synapse SQL Pool as part of the ETL process:

  1. Customer Dimension Table:

    • This table contains data related to customers, such as their demographics and history.

    CustomerDimTable

  2. Bookings Dimension Table:

    • This table contains the transformed booking data.

    BookingsDimTable

  3. Booking Customer Aggregation Fact Table:

    • This table stores aggregated booking data and insights.

    BookingCustomerAggregationFactTable


Conclusion

The AirBnB CDC Ingestion Pipeline pipeline offers a robust, automated solution for managing and processing large volumes of data related to AirBnB bookings. By leveraging Azure’s powerful services such as Data Factory, Synapse Analytics, and CosmosDB, the pipeline ensures data is processed efficiently, providing analytical and reporting insights into booking and customer behavior.

About

Data ingestion and transformation pipeline for AirBnB using Azure Data Factory, Synapse, CosmosDB, and ADLS for automated CDC and analytical reporting.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Languages