Skip to content

Commit

Permalink
influxdb
Browse files Browse the repository at this point in the history
  • Loading branch information
hdulay committed Jan 10, 2023
1 parent 4bdf811 commit df8103f
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 0 deletions.
2 changes: 2 additions & 0 deletions influxdb/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
package
*.zip
17 changes: 17 additions & 0 deletions influxdb/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
include .env

test:
telegraf --config https://us-east-1-1.aws.cloud2.influxdata.com/api/v2/telegrafs/0a8fe1d61a55a000


sample:
cat sample.json | base64

deploy:
rm -rf ./package
pip install --target ./package influxdb_client
cd package && zip -r ../influx-deployment-package.zip .
zip influx-deployment-package.zip lambda_function.py
aws lambda update-function-code --function-name influxdb --zip-file fileb://influx-deployment-package.zip


93 changes: 93 additions & 0 deletions influxdb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# InfluxDB and Decodable Integration
This example integrates Decodable and InfluxDB (Influx Data Cloud) via Kinesis + AWS Lambda / Python. We will be sending envoy logs being produced by the DataGen connector built into Decodable.

## Setup
Setup a python virtual environment and install the requirements.

```bash
python -m venv .venv

pip install -r requirements.txt
```

The requirements includes `influxdb_client` which will need to be packaged up and deployed using the AWS CLI tool. Therefore, you cannot copy paste the python code into the AWS web console.

## AWS Deployment and Environment Variables
As mentioned previously, the lambda function uses influxdb modules and will need to be packaged up and deployed from the command line. We have created [Makefile](./Makefile) that includes this command.

```bash
make deploy
```

This Makefile task is defined below. The output of this is the file `influx-deployment-package.zip` that gets deployed into your AWS lambda console.

```bash
rm -rf ./package
pip install --target ./package influxdb_client
cd package && zip -r ../influx-deployment-package.zip .
zip influx-deployment-package.zip lambda_function.py
aws lambda update-function-code --function-name influxdb --zip-file fileb://influx-deployment-package.zip
```

Proceed to the AWS console under lambda to find the function called `influxdb`. Click on the `Configuration` tab and select `Environment variables` on the left. Enter the values for these variables:

- INFLUXDB_BUCKET - the name of the InfluxDB bucket to write streaming data
- INFLUXDB_ORG - the Influx Data Cloud org
- INFLUXDB_TOKEN - the Influx Data Cloud token
- INFLUXDB_URL - the URL to Influx Data Cloud

## Permissions
On the `Configuration` page of the lambda function, click on `Permissions` on the left. Under `Execution role` you will see a Role name. Click on this role to add permissions to this function to read from Kinesis. Click on `Add permissions` and select `Create inline policy`. Go to the JSON view and paste the policy below.

```json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "kinesis:*",
"Resource": "*"
}
]
}
```

This will grant permissions for the lambda function to read from Kinesis.

## Setup Trigger
On the `Configuration` page, click on `Triggers` to the left. Then click on `Add trigger` on the right. Select a `Kinesis` source. Enter the Kinesis stream and click on Add.

Your function will not be invoked when a message/event arrives in the Kinesis stream.

## Setup a Kinesis Sink in Decodable.
Create a Datagen connector in Decodable that generates envoy_logs. Have that connector write it's data into a `_raw` field in a stream named `envoy_raw`.

Create a Decodable pipeline that parses the logs and writes the output into a stream called `http_events` by pasting the SQL below.

```sql
-- A new stream will be created for the output of this pipeline.
-- The stream name will match the name used in the 'insert' statement.
insert into http_events
select
_raw,
to_timestamp(fields['_time'], 'yyyy-MM-dd''T''HH:mm:ss''Z''') _time,
fields['method'] req_method,
fields['path'] req_path,
fields['protocol'] req_proto,
fields['flags'] req_flags,
cast(fields['status'] as int) resp_status,
cast(fields['size'] as int) resp_size
from (
select
`value` _raw,
grok(`value`, '\[%{TIMESTAMP_ISO8601:_time}\] "%{DATA:method} %{DATA:path} %{DATA:protocol}" %{INT:status} %{DATA:flags} %{INT:size}') fields
from envoy_raw
)
```

Follow the instructions in this [doc](https://docs.decodable.co/docs/connector-reference-kinesis). Use the Kinesis stream you're using when you created a trigger in the previous section.

## InfluxDB
You should see a similar view in InfluxDB as the image below.

![alt](./influxdb.png)
Binary file added influxdb/influxdb.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
45 changes: 45 additions & 0 deletions influxdb/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import json
import influxdb_client, os, base64
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS


# GetRecords, GetShardIterator, DescribeStream, ListShards, and ListStreams Actions on your stream in IAM.

def lambda_handler(event, context):
# TODO implement

token = os.environ.get("INFLUXDB_TOKEN")
org = os.environ.get("INFLUXDB_ORG")
url = os.environ.get("INFLUXDB_URL")
client = influxdb_client.InfluxDBClient(url=url, token=token, org=org)

bucket=os.environ.get("INFLUXDB_BUCKET")

write_api = client.write_api(write_options=SYNCHRONOUS)

for record in event['Records']:
print(record)
string = base64.decodebytes(record["kinesis"]["data"].encode('utf-8')).decode('utf-8')
data = json.loads(string)
print(data)

point = (
Point("event")
.tag("http_event", data['req_path'])
.field("_raw", data['_raw'])
.field("_time", data['_time'])
.field("req_method", data['req_method'])
.field("req_path", data['req_path'])
.field("req_proto", data['req_proto'])
.field("req_flags", data['req_flags'])
.field("resp_status", data['resp_status'])
.field("resp_size", data['resp_size'])
)
write_api.write(bucket=bucket, org="dev", record=point)


return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
1 change: 1 addition & 0 deletions influxdb/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
influxdb_client
2 changes: 2 additions & 0 deletions influxdb/sample.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"_raw":"[2023-01-09T21:11:41Z] \"POST /products/2 HTTP/1.1\" 500 UH 9414 8115 88 95 \"-\" \"Mozilla/5.0 (Linux; Android 10) \" \"791e8771-d9ab-4f61-9583-a255f5f35f7d\" \"locations\" \"127.0.0.1:8080\"","_time":"2023-01-09 21:11:41","req_flags":"UH","req_method":"POST","req_path":"/products/2","req_proto":"HTTP/1.1","resp_size":9414,"resp_status":500}

0 comments on commit df8103f

Please sign in to comment.