Skip to content

Commit

Permalink
apache pinot
Browse files Browse the repository at this point in the history
  • Loading branch information
hdulay committed Jun 27, 2022
1 parent 40dc785 commit 30191e6
Show file tree
Hide file tree
Showing 5 changed files with 281 additions and 25 deletions.
26 changes: 13 additions & 13 deletions change-streams/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,19 @@ datagen:
--prop data.type=envoy \
--prop delay=100

postgres:
@decodable connection create \
--name demo_day_postgres \
--description "demo day postgres" \
--connector postgres \
--type sink \
--stream-id $(shell decodable stream list -o json | jq -sr '.[] |select(.name=="demo_day_change_stream")|.id ' ) \
--field status_code="string primary key" \
--field count=bigint \
--prop url="jdbc:postgresql://$(HOST):5432/postgres?currentSchema=public" \
--prop table-name=status_code_counts \
--prop username=$(USER) \
--prop password="$(PWD)"
# postgres:
# @decodable connection create \
# --name demo_day_postgres \
# --description "demo day postgres" \
# --connector postgres \
# --type sink \
# --stream-id $(shell decodable stream list -o json | jq -sr '.[] |select(.name=="demo_day_change_stream")|.id ' ) \
# --field status_code="string primary key" \
# --field count=bigint \
# --prop url="jdbc:postgresql://$(HOST):5432/postgres?currentSchema=public" \
# --prop table-name=status_code_counts \
# --prop username=$(USER) \
# --prop password="$(PWD)"


pipeline: envoy change_stream datagen postgres
Expand Down
114 changes: 114 additions & 0 deletions osquery/schemas/suspicious.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
{
"type": "object",
"required": [],
"properties": {
"name": {
"type": "string"
},
"hostIdentifier": {
"type": "string"
},
"calendarTime": {
"type": "string"
},
"unixTime": {
"type": "number"
},
"epoch": {
"type": "string"
},
"counter": {
"type": "number"
},
"numerics": {
"type": "string"
},
"columns": {
"type": "object",
"required": [],
"properties": {
"cmdline": {
"type": "string"
},
"cwd": {
"type": "string"
},
"disk_bytes_read": {
"type": "string"
},
"disk_bytes_written": {
"type": "string"
},
"egid": {
"type": "string"
},
"euid": {
"type": "string"
},
"gid": {
"type": "string"
},
"name": {
"type": "string"
},
"nice": {
"type": "string"
},
"on_disk": {
"type": "string"
},
"parent": {
"type": "string"
},
"path": {
"type": "string"
},
"pgroup": {
"type": "string"
},
"pid": {
"type": "string"
},
"resident_size": {
"type": "string"
},
"root": {
"type": "string"
},
"sgid": {
"type": "string"
},
"start_time": {
"type": "string"
},
"state": {
"type": "string"
},
"suid": {
"type": "string"
},
"system_time": {
"type": "string"
},
"threads": {
"type": "string"
},
"total_size": {
"type": "string"
},
"uid": {
"type": "string"
},
"user_time": {
"type": "string"
},
"wired_size": {
"type": "string"
}
}
},
"action": {
"type": "string"
}
}
}
121 changes: 111 additions & 10 deletions pinot/Makefile
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@

include .env

EKS_CLUSTER_NAME=hubert-superset

topic:
confluent kafka topic --cluster $(CLUSTER_ID) create osquery-pinot
confluent kafka topic --cluster $(CLUSTER_ID) create suspicious_processes

tconfig:
pinot.table.config:
@mkdir -p tables
@jinja \
-D BOOTSTRAP $(BOOTSTRAP) \
Expand All @@ -18,17 +21,56 @@ tconfig:
templates/osquery-table.json.j2 \
> tables/osquery-table.json

table: tconfig
pinot.table: pinot.table.config
pinot-admin AddTable \
-tableConfigFile tables/osquery-table.json \
-schemaFile schemas/osquery-schema.json \
-controllerHost $(CONTROLLER_HOST) \
-controllerPort $(CONTROLLER_PORT) \
-exec

sink:
suspicious_osquery_cleansed:
@decodable stream create \
--name suspicious_osquery_cleansed \
--description "osquery cleansed processes" \
--field name=string \
--field hostIdentifier=string \
--field calendarTime=string \
--field unixTime=bigint \
--field epoch=bigint \
--field counter=int \
--field numerics=boolean \
--field cmdline=STRING \
--field cwd=STRING \
--field disk_bytes_read=BIGINT \
--field disk_bytes_written=BIGINT \
--field egid=INT \
--field euid=INT \
--field gid=INT \
--field process_name=STRING \
--field nice=INT \
--field on_disk=INT \
--field parent=BIGINT \
--field path=STRING \
--field pgroup=BIGINT \
--field pid=BIGINT \
--field resident_size=BIGINT \
--field root=STRING \
--field sgid=INT \
--field start_time=BIGINT \
--field state=INT \
--field suid=INT \
--field system_time=BIGINT \
--field threads=INT \
--field total_size=BIGINT \
--field uid=INT \
--field user_time=BIGINT \
--field wired_size=INT \
--field action=string

osquery_cleansed_sink:
@decodable conn create \
--name osquery_pinot_sink \
--name osquery_cleansed_sink \
--connector confluent-cloud \
--type sink \
--stream-id $(shell decodable stream list -o json | jq -sr '.[] |select(.name=="cleansed_processes")|.id ' ) \
Expand Down Expand Up @@ -73,8 +115,63 @@ sink:
--prop cluster.api.key=$(CONFLUENT_KEY) \
--prop cluster.api.secret=$(CONFLUENT_SECRET)

suspicious_osquery_cleansed_sink:
@decodable conn create \
--name suspicious_osquery_cleansed_sink \
--connector confluent-cloud \
--type sink \
--stream-id $(shell decodable stream list -o json | jq -sr '.[] |select(.name=="suspicious_osquery_cleansed")|.id ' ) \
--field name=string \
--field hostIdentifier=string \
--field calendarTime=string \
--field unixTime=bigint \
--field epoch=bigint \
--field counter=int \
--field numerics=boolean \
--field cmdline=STRING \
--field cwd=STRING \
--field disk_bytes_read=BIGINT \
--field disk_bytes_written=BIGINT \
--field egid=INT \
--field euid=INT \
--field gid=INT \
--field process_name=STRING \
--field nice=INT \
--field on_disk=INT \
--field parent=BIGINT \
--field path=STRING \
--field pgroup=BIGINT \
--field pid=BIGINT \
--field resident_size=BIGINT \
--field root=STRING \
--field sgid=INT \
--field start_time=BIGINT \
--field state=INT \
--field suid=INT \
--field system_time=BIGINT \
--field threads=INT \
--field total_size=BIGINT \
--field uid=INT \
--field user_time=BIGINT \
--field wired_size=INT \
--field action=string \
--prop cluster.api.endpoint=https://$(BOOTSTRAP) \
--prop cluster.id=$(CLUSTER_ID) \
--prop topic=suspicious_processes \
--prop format=json \
--prop cluster.api.key=$(CONFLUENT_KEY) \
--prop cluster.api.secret=$(CONFLUENT_SECRET)

suspicious_osquery_cleansed_pipeline:
@decodable pipeline create \
--name suspicious_osquery_cleansed_pipeline \
--description "demo day osquery_cleanse" \
- < sql/suspicious.cleanse.sql

active:
decodable connection activate --force $(shell decodable connection list -o json | jq -sr '.[] |select(.name=="osquery_pinot_sink")|.id ' )
decodable connection activate --force $(shell decodable connection list -o json | jq -sr '.[] |select(.name=="osquery_cleansed_sink")|.id ' )
decodable connection activate --force $(shell decodable connection list -o json | jq -sr '.[] |select(.name=="suspicious_osquery_cleansed_sink")|.id ' )
decodable pl activate --force $(shell decodable pl list -o json | jq -sr '.[] |select(.name=="suspicious_osquery_cleansed_pipeline")|.id ' )

flow: sink table

Expand All @@ -83,10 +180,14 @@ drop:
-tableName osquery \
-state drop \
-controllerHost $(CONTROLLER_HOST) \
-controllerPort $(CONTROLLER_PORT) \

-controllerPort $(CONTROLLER_PORT)

clean:
decodable conn deactivate --force $(shell decodable connection list -o json | jq -sr '.[] |select(.name=="osquery_pinot_sink")|.id ' )
decodable conn delete $(shell decodable connection list -o json | jq -sr '.[] |select(.name=="osquery_pinot_sink")|.id ' )

decodable conn deactivate --force $(shell decodable connection list -o json | jq -sr '.[] |select(.name=="osquery_cleansed_sink")|.id ' )
decodable conn delete $(shell decodable connection list -o json | jq -sr '.[] |select(.name=="osquery_cleansed_sink")|.id ' )

decodable conn deactivate --force $(shell decodable connection list -o json | jq -sr '.[] |select(.name=="suspicious_osquery_cleansed_sink")|.id ' )
decodable conn delete $(shell decodable connection list -o json | jq -sr '.[] |select(.name=="suspicious_osquery_cleansed_sink")|.id ' )

decodable pl deactivate $(shell decodable pl list -o json | jq -sr '.[] |select(.name=="suspicious_osquery_cleansed_pipeline")|.id ' )
decodable pl delete $(shell decodable pl list -o json | jq -sr '.[] |select(.name=="suspicious_osquery_cleansed_pipeline")|.id ' )
6 changes: 4 additions & 2 deletions pinot/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ flowchart TD;
pc-->opp[[osquery_processes]]-->SQL:osquery_filter_noise-->fp[[filtered_processes]]
fp-->SQL:osquery_cleanse-->db[(Apache Pinot)]
fp-->SQL:suspicious_processes-->a>Alert]
fp-->SQL:osquery_cleanse-->db[(Apache Pinot)]-->as[/Apache Superset\]
fp-->sp[SQL:suspicious_processes]-->a>Alert]
sp-->db
```
Expand Down
39 changes: 39 additions & 0 deletions pinot/sql/suspicious.cleanse.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
-- A new stream will be created for the output of this pipeline.
-- The stream name will match the name used in the 'insert' statement.
insert into suspicious_osquery_cleansed
select
`name` as osquery_name,
hostIdentifier,
calendarTime,
cast(unixTime as bigint) as unixTime,
cast(epoch as bigint) as epoch,
cast(counter as int) as counter,
cast(numerics as boolean) as numerics,
columns['cmdline'] as cmdline,
columns['cwd'] as cwd,
cast(columns['disk_bytes_read'] as bigint) as disk_bytes_read,
cast(columns['disk_bytes_written'] as bigint) as disk_bytes_written,
cast(columns['egid'] as int) as egid,
cast(columns['euid'] as int) as euid,
cast(columns['gid'] as int) as gid,
columns['name'] as process_name,
cast(columns['nice'] as int) as nice,
cast(columns['on_disk'] as int) as on_disk,
cast(columns['parent'] as bigint) as parent,
columns['path'] as path,
cast(columns['pgroup'] as bigint) as pgroup,
cast(columns['pid'] as bigint) as pid,
cast(columns['resident_size'] as bigint) as resident_size,
columns['root'] as root,
cast(columns['sgid'] as int) as sgid,
cast(columns['start_time'] as bigint) as start_time,
cast(columns['state'] as int) as state,
cast(columns['suid'] as int) as suid,
cast(columns['system_time'] as bigint) as `system_time`,
cast(columns['threads'] as int) as threads,
cast(columns['total_size'] as bigint) as total_size,
cast(columns['uid'] as int) as uid,
cast(columns['user_time'] as bigint) as user_time,
cast(columns['wired_size'] as int) as wired_size,
action
from suspicious_processes

0 comments on commit 30191e6

Please sign in to comment.