From cb654519638ef57e845d5d52d392a8b310ca4787 Mon Sep 17 00:00:00 2001 From: Hubert Dulay Date: Thu, 29 Sep 2022 12:44:56 -0400 Subject: [PATCH] confluent --- confluent/Makefile | 164 +++++++++++++++++++++ confluent/clickstream.json | 14 ++ confluent/clickstream_change.sql | 14 ++ confluent/clickstream_users_last_click.sql | 17 +++ mtls/Makefile | 4 +- 5 files changed, 210 insertions(+), 3 deletions(-) create mode 100644 confluent/Makefile create mode 100644 confluent/clickstream.json create mode 100644 confluent/clickstream_change.sql create mode 100644 confluent/clickstream_users_last_click.sql diff --git a/confluent/Makefile b/confluent/Makefile new file mode 100644 index 0000000..1857d22 --- /dev/null +++ b/confluent/Makefile @@ -0,0 +1,164 @@ +include .env +SHELL := bash + +P='current_' +streams = clickstream_users users clickstream_change clickstream_append + +init: + confluent login --organization-id ${ORG} + +# STREAMS +clickstream.append: + -decodable stream create \ + --name clickstream_append \ + --field ip="STRING NOT NULL" \ + --field userid="INT NOT NULL" \ + --field remote_user="STRING NOT NULL" \ + --field time="STRING NOT NULL" \ + --field _time="BIGINT NOT NULL" \ + --field request="STRING NOT NULL" \ + --field status="STRING NOT NULL" \ + --field bytes="STRING NOT NULL" \ + --field referrer="STRING NOT NULL" \ + --field agent="STRING NOT NULL" + +clickstream.change: + -decodable stream create \ + --name clickstream_change \ + --field id="STRING primary key" \ + --field ip="STRING" \ + --field userid="INT NOT NULL" \ + --field remote_user="STRING NOT NULL" \ + --field time=string \ + --field _time=timestamp \ + --field request="STRING NOT NULL" \ + --field status="STRING NOT NULL" \ + --field bytes="STRING NOT NULL" \ + --field referrer="STRING NOT NULL" \ + --field agent="STRING NOT NULL" + +clickstream_users_last_click: + -decodable stream create \ + --name clickstream_users_last_click \ + --field userid="int primary key" \ + --field first_name=varchar \ + --field last_name=varchar \ + --field phone=varchar \ + --field ip="STRING NOT NULL" \ + --field remote_user="STRING NOT NULL" \ + --field time="STRING NOT NULL" \ + --field _time=timestamp \ + --field request="STRING NOT NULL" \ + --field status="STRING NOT NULL" \ + --field bytes="STRING NOT NULL" \ + --field referrer="STRING NOT NULL" \ + --field agent="STRING NOT NULL" + +users.change: + -decodable stream create \ + --name users \ + --field userid="int primary key" \ + --field first_name=varchar \ + --field last_name=varchar \ + --field phone=varchar +create.streams: users.change clickstream_users_last_click clickstream.append clickstream.change +delete.streams: + $(eval ids := $(foreach s, $(streams), $(shell decodable stream list -o json | jq -sr '.[] |select(.name=="$s")|.id ' ))) + $(foreach i, $(ids), decodable stream delete $i;) + + +# CONNECTIONS +clickstream_source: + decodable connection create \ + --name clickstream_source \ + --connector confluent-cloud \ + --type source \ + --stream-id $(shell decodable stream list -o json | jq -sr '.[] |select(.name=="clickstream_append")|.id ' ) \ + --prop cluster.api.endpoint="$(API_ENDPOINT)" \ + --prop cluster.id=$(CLUSTER_ID) \ + --prop topic=$(TOPIC) \ + --prop format=avro \ + --prop cluster.api.key=$(CONFLUENT_KEY) \ + --prop cluster.api.secret=$(CONFLUENT_SECRET)\ + --prop confluent-registry.url=$(SR_ENDPOINT) \ + --prop confluent-registry.api-key=$(SR_KEY) \ + --prop confluent-registry.api-secret=$(SR_SECRET) \ + +users_source: + decodable conn create \ + --name users_source \ + --connector postgres-cdc \ + --type source \ + --stream-id $(shell decodable stream list -o json | jq -sr '.[] |select(.name=="users")|.id ' ) \ + --prop database-name=${DATABASE} \ + --prop decoding.plugin.name=pgoutput \ + --prop hostname=${PGHOST} \ + --prop port=5432 \ + --prop schema-name=${SCHEMA} \ + --prop table-name=users \ + --prop username=${USER} \ + --prop password=$(PASSWORD) + +clickstream_user_sink: + decodable conn create \ + --name clickstream_user_sink \ + --connector postgres \ + --type sink \ + --stream-id $(shell decodable stream list -o json | jq -sr '.[] |select(.name=="clickstream_users_last_click")|.id ' ) \ + --prop port=5432 \ + --prop hostname=${PGHOST} \ + --prop database-name=postgres \ + --prop schema-name=public \ + --prop password=$(PASSWORD) \ + --prop table-name=clickstream_users_last_click \ + --prop username=$(USER) +create.conns: clickstream_source clickstream_user_sink users_source +activate.conns: + $(eval conn := clickstream_source clickstream_user_sink users_source) + $(eval ids := $(foreach c, $(conn), $(shell decodable conn list -o json | jq -sr '.[] |select(.name=="$c")|.id ' ))) + $(foreach i, $(ids), decodable conn activate $i;) +deactivate.conns: + $(eval conn := clickstream_source clickstream_user_sink users_source) + $(eval ids := $(foreach c, $(conn), $(shell decodable conn list -o json | jq -sr '.[] |select(.name=="$c")|.id ' ))) + $(foreach i, $(ids), decodable conn deactivate $i;) +delete.conns: deactivate.conns + $(eval conn := clickstream_source clickstream_user_sink users_source) + $(eval ids := $(foreach c, $(conn), $(shell decodable conn list -o json | jq -sr '.[] |select(.name=="$c")|.id ' ))) + $(foreach i, $(ids), decodable conn delete $i;) + + +# PIPELINES +clickstream_change_pl: + @decodable pipeline create \ + --name clickstream_change_pl \ + - < clickstream_change.sql + +clickstream_users_last_click_pl: + @decodable pipeline create \ + --name clickstream_users_last_click_pl \ + - < clickstream_users_last_click.sql + +create.pls: clickstream_change_pl clickstream_users_last_click_pl +activate.pls: + $(eval pls := clickstream_change_pl clickstream_users_last_click_pl ) + $(eval ids := $(foreach i, $(pls), $(shell decodable pl list -o json | jq -sr '.[] |select(.name=="$i")|.id ' ))) + $(foreach i, $(ids), decodable pl activate $i;) +deactivate.pls: + $(eval pls := clickstream_change_pl clickstream_users_last_click_pl ) + $(eval ids := $(foreach i, $(pls), $(shell decodable pl list -o json | jq -sr '.[] |select(.name=="$i")|.id ' ))) + $(foreach i, $(ids), decodable pl deactivate $i;) +delete.pls: + $(eval pls := clickstream_change_pl clickstream_users_last_click_pl ) + $(eval ids := $(foreach i, $(pls), $(shell decodable pl list -o json | jq -sr '.[] |select(.name=="$i")|.id ' ))) + $(foreach i, $(ids), decodable pl delete $i;) + + +create.all: create.streas create.pls create.conns +deactivate.all: deactivate.conns deactivate.pls +activate.all: activate.conns activate.pls +destroy.all: delete.conns delete.pls delete.streams + + + + + diff --git a/confluent/clickstream.json b/confluent/clickstream.json new file mode 100644 index 0000000..46583ae --- /dev/null +++ b/confluent/clickstream.json @@ -0,0 +1,14 @@ +{ + "name": "Clickstream_Avro", + "config": { + "connector.class": "DatagenSource", + "name": "Clickstream_Avro", + "kafka.auth.mode": "KAFKA_API_KEY", + "kafka.api.key": "****************", + "kafka.api.secret": "****************************************************************", + "kafka.topic": "clickstream_avro", + "output.data.format": "AVRO", + "quickstart": "CLICKSTREAM", + "tasks.max": "1" + } +} \ No newline at end of file diff --git a/confluent/clickstream_change.sql b/confluent/clickstream_change.sql new file mode 100644 index 0000000..1869750 --- /dev/null +++ b/confluent/clickstream_change.sql @@ -0,0 +1,14 @@ +insert into clickstream_change +select + UUID() as id, + ip, + userid, + remote_user, + `time`, + NOW() as _time, + request, + status, + `bytes`, + referrer, + agent +from clickstream_append diff --git a/confluent/clickstream_users_last_click.sql b/confluent/clickstream_users_last_click.sql new file mode 100644 index 0000000..e691d37 --- /dev/null +++ b/confluent/clickstream_users_last_click.sql @@ -0,0 +1,17 @@ +insert into clickstream_users_last_click +select + u.userid as userid, + u.first_name as first_name, + u.last_name as last_name, + u.phone as phone, + c.ip as ip, + c.remote_user as remote_user, + c.`time` as `time`, + c._time as _time, + c.request as request, + c.status as status, + c.`bytes` as `bytes`, + c.referrer as referrer, + c.agent as agent +from clickstream_change c +join users u on c.userid=u.userid diff --git a/mtls/Makefile b/mtls/Makefile index a0fc508..109642e 100644 --- a/mtls/Makefile +++ b/mtls/Makefile @@ -128,9 +128,7 @@ create.stream: --field rank=STRING \ --field rank_delta=STRING \ --field high=STRING \ - --field high_timestamp=STRING \ - --field 1d="ROW" \ - --field 30d="ROW" + --field high_timestamp=STRING deactivate.conn.mtls: -@decodable conn deactivate $(shell decodable conn list -o json | jq -sr '.[] |select(.name=="kafka_mtls_source")|.id ' )