From 9da5a638ba403d5d042c903b900d24b3859fb77f Mon Sep 17 00:00:00 2001 From: Shivang Rana <150310840+shiv-rna@users.noreply.github.com> Date: Sat, 16 Mar 2024 21:27:44 +0000 Subject: [PATCH] =?UTF-8?q?Adding=20Lambda=20function=20file=20?= =?UTF-8?q?=F0=9F=8F=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- S3_cli_command.sh => AWS_S3_CLI_command.sh | 0 AWS_lambda_function.py | 72 ++++++++++++++++++++++ 2 files changed, 72 insertions(+) rename S3_cli_command.sh => AWS_S3_CLI_command.sh (100%) create mode 100644 AWS_lambda_function.py diff --git a/S3_cli_command.sh b/AWS_S3_CLI_command.sh similarity index 100% rename from S3_cli_command.sh rename to AWS_S3_CLI_command.sh diff --git a/AWS_lambda_function.py b/AWS_lambda_function.py new file mode 100644 index 0000000..ec669ca --- /dev/null +++ b/AWS_lambda_function.py @@ -0,0 +1,72 @@ +""" +aws Lambda Function to process JSON data stored in an S3 bucket, +transform it into a Pandas DataFrame, normalize the DataFrame, and write +the resulting DataFrame to a Parquet file stored in another S3 bucket, +optionally registering it as a table in AWS Glue catalog. +""" + +import urllib.parse +import os + +import awswrangler as wr +import pandas as pd + +os_input_s3_cleansed_layer = os.environ['s3_cleansed_layer'] +os_input_glue_catalog_db_name = os.environ['glue_catalog_db_name'] +os_input_glue_catalog_table_name = os.environ['glue_catalog_table_name'] +os_input_write_data_operation = os.environ['write_data_operation'] + + +def lambda_handler(event, context): + """ + Lambda function to process JSON data stored in an S3 bucket, transform it into a Pandas DataFrame, + normalize the DataFrame, and write the resulting DataFrame to a Parquet file stored in another S3 bucket, + optionally registering it as a table in AWS Glue catalog. + + Parameters: + - event (dict): A dictionary containing information about the S3 event that triggered the Lambda function. + Typically includes details about the S3 bucket and object. + - context (LambdaContext): An object providing runtime information about the Lambda function invocation. + + Returns: + - dict: A dictionary containing the response from the AWS Data Wrangler write operation. + + Raises: + - Exception: If there is an error processing the S3 object or writing to S3. + This exception is raised to trigger AWS Lambda's error handling. + + Notes: + - This Lambda function expects JSON data stored in the specified S3 bucket, reads it into a Pandas DataFrame, + extracts required columns, and writes the resulting DataFrame to another S3 bucket in Parquet format. + - It optionally registers the Parquet file as a table in the specified AWS Glue catalog database. + - Make sure that the required environment variables (s3_cleansed_layer, glue_catalog_db_name, + glue_catalog_table_name, write_data_operation) are properly set before invoking this Lambda function. + """ + # Get the object from the event & show its content type --> + # getting the bucket name & key path to the object + bucket = event['Records'][0]['s3']['bucket']['name'] + key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8') + try: + + # Creating DF from the content + df_raw = wr.s3.read_json('s3://{}/{}'.format(bucket, key)) + + # Extract the required column of items: + df_step_1 = pd.json_normalize(df_raw['items']) + + # Write to S3 + wr_response = wr.s3.to_parquet( + df=df_step_1, + path=os_input_s3_cleansed_layer, + dataset=True, + database=os_input_glue_catalog_db_name, + table=os_input_glue_catalog_table_name, + mode=os_input_write_data_operation + ) + + return wr_response + + except Exception as e: + print(e) + print('Error getting object {} from the bucket {}. Make sure they exist & your bucket is in the same region as this function.'.format(key, bucket)) + raise e