Skip to content

Commit

Permalink
s3 to kinesis via lambda
Browse files Browse the repository at this point in the history
  • Loading branch information
hdulay committed Nov 1, 2022
1 parent ba326ed commit f81bfc3
Showing 1 changed file with 37 additions and 26 deletions.
63 changes: 37 additions & 26 deletions s3events/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ This demo shows how to notify an AWS lambda function to send latest contents to
```

## Lambda Permissions

For S3 read
```json
{
"Version": "2012-10-17",
Expand All @@ -60,46 +60,57 @@ This demo shows how to notify an AWS lambda function to send latest contents to
}
```

For Kinesis write
```json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "kinesis:*",
"Resource": "*"
}
]
}
```

## Lambda

```python
import json
import urllib.parse
import boto3
import base64, io
import requests

print('Loading function')

s3 = boto3.client('s3')
kinesis = boto3.client('kinesis','us-west-2')


def lambda_handler(event, context):

# Get the object from the event and send to Decodable
for b in event['Records']:
bucket = b['s3']['bucket']['name']
key = urllib.parse.unquote_plus(b['s3']['object']['key'], encoding='utf-8')
# Get the object from the event and show its content type
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = urllib.parse.unquote_plus(record['s3']['object']['key'], encoding='utf-8')
try:
response = s3.get_object(Bucket=bucket, Key=key)
body = response['Body'].read()
events = []
message = { "events": events }
for record in body.decode('utf-8').split('\n')[:-1]:
events.append(json.loads(record))

m = json.dumps(message)
print(m)

bearer = ''
account = ''
endpoint = ''

headers = {
"Accept": "application/json",
"Content-Type": "application/json",
"Authorization": f"Bearer {bearer}"
}
kinesis.put_record(
StreamName='s3-kinesis',
Data=record,
PartitionKey="-1")

url = f"https://{account}.api.decodable.co{endpoint}"
return "success"

response = requests.post(url, headers=headers, data=m)
print(f"response: {response.text}")

return response.text
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
raise e

```

## Create a notification
Expand Down

0 comments on commit f81bfc3

Please sign in to comment.