Skip to content

Latest commit

 

History

History
315 lines (228 loc) · 14.3 KB

Dec 19 2020 - Using Azure Data Factory with Azure Databricks for merging CSV files copy.md

File metadata and controls

315 lines (228 loc) · 14.3 KB

Dec 19 2020 - Using Azure Data Factory with Azure Databricks for merging CSV files

Azure Databricks repository is a set of blogposts as a Advent of 2020 present to readers for easier onboarding to Azure Databricks!

Series of Azure Databricks posts:

Yesterday we created data factory and started using the service, created linked service and our first pipeline.

Today we will look how we can start using blob storage and Azure Databricks with Azure Data factory.

This would be a one of the scenarios where you would have multiple csv files coming in to blob storage (particular folder) and we would want:

  • merge CSV files
  • merge files with some transformation in between
  • transform the files first and do the merge
  • copying files from one data lake zone to another zone and making transformation in between
  • or any other...

Regardless of the scenario, let's dive in.

1.Create linked service for Azure Blob

Yesterday (day18) we looked how to create a linked service for Azure Databricks. We will need another linked service for Azure Blob storage. Navigate to linked services and create a new one. We need a new linked service for Azure Blob Storage.

While configuring, select your Azure Subscription, and choose the Storage account, we have created on day9 and I called it dbpystorage. You should have something like this:

On day 9 we also copied a file into the blobstorage, called Day9MLB_players.csv (file is also available at the Github repository). Now you should have Azure Blob Storage and Azure Databricks services linked to Azure Data Factory.

We will now need to create a dataset and a pipeline in ADF

2.Creating a dataset

By adding a new dataset, go to Datasets and select "New Dataset". Window will pop-up asking for the location of the dataset. Select the Azure Blob Storage, because file is available in this service.

After selecting the storage type, you will be prompted with file type. Choose CSV - DelimitedText type.

And after this, specify the path to the file. As I am using only one file, I am specifying the name. Otherwise, if this folder would have been a landing for multiple files (with same schema), I could use a wildcard, eg.: Day*.csv and all files following this patter would be read.

Once you have a dataset created, we will need a pipeline to connect the services.

3. Creating Pipeline

On the Author view in ADF, create a new Pipeline. A new canvas will appear for you to start working on data integration.

Select element "Copy Data" and element "Databricks". Element Copy Data will need the source and the sink data. It can copy a file from one location to another, it can merge files to another location or change format (going from CSV to Parquet). I will be using from CSV to merge into CSV.

Select all the properties for Source.

And for the Sink. For copy behaviour, I am selecting "merge files" to mimic the ETL job.

Once this part is completed, we need to look into the Databricks element:

Azure Databricks notebook can hold literarily anything. From data transformation, to data merge, analytics, or it can even serve as a transformation element and connection to further other elements. In this case, Databricks element will hold only for reading activity and creating a table.

4. Notebook

Before connecting the elements in ADF, we need to give some instructions to Notebook. Head to Azure Databricks and create a new notebook. I have named mine: Day19_csv and choose language: Python.

Set up the connection to file (this time using with python - before we used Scala):

%python
storage_account_name = "dbpystorage"
storage_account_access_key = "YOUR_ACCOUNT_ACCESS_KEY"


file_location = "wasbs://[email protected]/"
file_type = "csv"


spark.conf.set("fs.azure.account.key."+storage_account_name+".blob.core.windows.net",storage_account_access_key)

After the initial connection is set, we can load the data and create a SQL table:

%python
df = spark.read.format(file_type).option("header","true").option("inferSchema", "true").load(file_location)

df.createOrReplaceTempView("Day9data_view")

And the SQL query:

%sql
SELECT * FROM Day9data_view
--Check number of rows
SELECT COUNT(*) FROM Day9data_view

You can add many other data transformation or ETL scripts. Or you can harvest the Machine Learning script to do data analysis and data predictions. Normally, I would add analysis of merged dataset and save or expose the results to other services (via ADF), but to keep the post short, let's keep it as it is.

5. Connecting the dots

Back in Azure Data Factory, set the Notebook and select the Azure Databricks linked service and under setting, set the path to the notebook we have created in previous step.

You can always browse through the path and select the correct path.

Once you set the path, you can connect the elements (or activities) together, debug and publish all the elements. Once published you can schedule and harvest the pipeline.

This pipeline can be scheduled, can be used as part of bigger ETL or it can be extended. You can have each notebook doing part of ETL and have the notebooks orchestrated in ADF, you can have data flows created in ADF and connect Python code (instead of Notebooks). The possibilities are endless. Even if you want to capture streaming data, you can use ADF and Databricks, or only Databricks with Spark or you can use other services (Event hub, Azure functions, etc.).

Tomorrow we will look this orchestration part using two notebooks with Scala and Python.

Complete set of code and Notebooks will be available at the Github repository.

Happy Coding and Stay Healthy!