Skip to content

Commit 9062cc5

Browse files
authored
Merge pull request #48 from rh-marketingops/current-2023
Current 2023 + install fixes
2 parents 3006374 + 2041718 commit 9062cc5

15 files changed

+356
-5
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
Prerequisits:
2+
3+
- Python <=3.9
4+
- Kafka cluster to connect to (or navigate to ./kafka and do `docker-compose up -d` to run confluents control center locally)
5+
6+
7+
1. `bash setup.sh` for venv and fluvii install
8+
2. `bash create_topics.sh`
9+
3. `bash run_consumer.sh`
10+
4. `bash run_producer.sh`
11+
12+
Watch the magic =)
13+
14+
`Ctrl + C` to kill apps, `docker-compose down` to kill kafka fully if using confluent
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
### Connection
2+
# Kafka Consumer/Producer
3+
FLUVII_PRODUCER_URLS=localhost:9092
4+
FLUVII_CONSUMER_URLS=localhost:9092
5+
# Schema Registry
6+
FLUVII_SCHEMA_REGISTRY_URL=http://localhost:8081
7+
8+
### This sets the consumer group name in FluviiApps
9+
FLUVII_APP_NAME='account-changes-processor'
10+
11+
FLUVII_LOGLEVEL=INFO
12+
FLUVII_CONSUMER_AUTO_OFFSET_RESET=earliest
13+
FLUVII_SQLITE_TABLE_DIRECTORY=/home/tsawicki/Documents/fluvii_demo/consumer/tables
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
from fluvii import FluviiTableAppFactory
2+
3+
4+
def load_my_heavy_ml_model():
5+
def heavy_ml_model(balance_change):
6+
if balance_change < -2000:
7+
return 'FRAUD!'
8+
return heavy_ml_model
9+
10+
11+
balance_update_schema = {
12+
"name": "AccountBalanceUpdate",
13+
"type": "record",
14+
"fields": [
15+
{"name": "account_id", "type": "string", "default": ""},
16+
{"name": "update_message", "type": "string", "default": ""},
17+
]
18+
}
19+
20+
21+
def my_app_logic(transaction, fraud_detection_model):
22+
record = transaction.value()
23+
change_amount = float(record['amount'])
24+
print(f"update of {change_amount} for AID {record['account_id']}")
25+
26+
if change_amount == 0:
27+
print('System test; ignore')
28+
return # No more processing, will still commit message
29+
30+
account_balance = float((transaction.read_table_entry() or {'balance': 10000})['balance']) # looks up record via the message.key()
31+
if fraud_detection_model(change_amount):
32+
new_balance = account_balance
33+
msg = f"FRAUDULENT ACTIVITY DETECTED: purchase attempt of {change_amount} at has been rejected and balance remains {new_balance}"
34+
else:
35+
if (new_balance := account_balance + change_amount) >= 0:
36+
transaction.update_table_entry({'balance': new_balance}) # store the updated balance for later...must be a valid json object (a dict works)
37+
msg = f"{'Purchase' if change_amount < 0 else 'Deposit'} has succeeded and new balance is {new_balance}"
38+
else:
39+
new_balance = account_balance
40+
msg = f"WARNING: Not enough funds; purchase has been rejected and balance remains {new_balance}"
41+
transaction.produce({"value": {"account_id": record["account_id"], "update_message": msg}})
42+
print(f"AcctID {record['account_id']}: {msg}")
43+
44+
45+
def fluvii_table_app():
46+
return FluviiTableAppFactory(
47+
my_app_logic,
48+
['account_update_requests'],
49+
produce_topic_schema_dict={'account_notifications': balance_update_schema},
50+
app_function_arglist=[load_my_heavy_ml_model()]
51+
)
52+
53+
54+
fluvii_table_app().run()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
source venv/bin/activate
2+
3+
set -a
4+
source ./kafka/creds.env
5+
set +a
6+
7+
cat /home/tsawicki/Documents/fluvii_demo/kafka/topics.json | fluvii topics create \
8+
&& sleep 2 \
9+
&& fluvii topics list | grep account
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
source venv/bin/activate
2+
3+
set -a
4+
source ./kafka/creds.env
5+
set +a
6+
7+
fluvii topics consume --topic-offset-dict '{"account_notifications": {}}' --output-filepath ./consumer/msgs.json \
8+
9+
echo "dumped to ./consumer/msgs.json"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
### Connection
2+
FLUVII_ADMIN_URLS=localhost:9092
3+
4+
# for consuming/producing
5+
FLUVII_CONSUMER_URLS=localhost:9092
6+
FLUVII_PRODUCER_URLS=localhost:9092
7+
FLUVII_SCHEMA_REGISTRY_URL=http://localhost:8081
8+
FLUVII_APP_NAME=toolbox
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
---
2+
version: '2'
3+
services:
4+
5+
broker:
6+
image: confluentinc/cp-kafka:7.5.0
7+
hostname: broker
8+
container_name: broker
9+
ports:
10+
- "9092:9092"
11+
- "9101:9101"
12+
environment:
13+
KAFKA_NODE_ID: 1
14+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
15+
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
16+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
17+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
18+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
19+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
20+
KAFKA_JMX_PORT: 9101
21+
KAFKA_JMX_HOSTNAME: localhost
22+
KAFKA_PROCESS_ROLES: 'broker,controller'
23+
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
24+
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
25+
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
26+
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
27+
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
28+
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
29+
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
30+
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
31+
32+
schema-registry:
33+
image: confluentinc/cp-schema-registry:7.5.0
34+
hostname: schema-registry
35+
container_name: schema-registry
36+
depends_on:
37+
- broker
38+
ports:
39+
- "8081:8081"
40+
environment:
41+
SCHEMA_REGISTRY_HOST_NAME: schema-registry
42+
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
43+
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
44+
45+
connect:
46+
image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0
47+
hostname: connect
48+
container_name: connect
49+
depends_on:
50+
- broker
51+
- schema-registry
52+
ports:
53+
- "8083:8083"
54+
environment:
55+
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
56+
CONNECT_REST_ADVERTISED_HOST_NAME: connect
57+
CONNECT_GROUP_ID: compose-connect-group
58+
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
59+
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
60+
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
61+
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
62+
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
63+
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
64+
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
65+
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
66+
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
67+
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
68+
# CLASSPATH required due to CC-2422
69+
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.0.jar
70+
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
71+
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
72+
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
73+
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
74+
75+
control-center:
76+
image: confluentinc/cp-enterprise-control-center:7.5.0
77+
hostname: control-center
78+
container_name: control-center
79+
depends_on:
80+
- broker
81+
- schema-registry
82+
- connect
83+
- ksqldb-server
84+
ports:
85+
- "9021:9021"
86+
environment:
87+
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
88+
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
89+
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
90+
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
91+
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
92+
CONTROL_CENTER_REPLICATION_FACTOR: 1
93+
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
94+
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
95+
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
96+
PORT: 9021
97+
98+
ksqldb-server:
99+
image: confluentinc/cp-ksqldb-server:7.5.0
100+
hostname: ksqldb-server
101+
container_name: ksqldb-server
102+
depends_on:
103+
- broker
104+
- connect
105+
ports:
106+
- "8088:8088"
107+
environment:
108+
KSQL_CONFIG_DIR: "/etc/ksql"
109+
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
110+
KSQL_HOST_NAME: ksqldb-server
111+
KSQL_LISTENERS: "http://0.0.0.0:8088"
112+
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
113+
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
114+
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
115+
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
116+
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
117+
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
118+
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
119+
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
120+
121+
ksqldb-cli:
122+
image: confluentinc/cp-ksqldb-cli:7.5.0
123+
container_name: ksqldb-cli
124+
depends_on:
125+
- broker
126+
- connect
127+
- ksqldb-server
128+
entrypoint: /bin/sh
129+
tty: true
130+
131+
ksql-datagen:
132+
image: confluentinc/ksqldb-examples:7.5.0
133+
hostname: ksql-datagen
134+
container_name: ksql-datagen
135+
depends_on:
136+
- ksqldb-server
137+
- broker
138+
- schema-registry
139+
- connect
140+
command: "bash -c 'echo Waiting for Kafka to be ready... && \
141+
cub kafka-ready -b broker:29092 1 40 && \
142+
echo Waiting for Confluent Schema Registry to be ready... && \
143+
cub sr-ready schema-registry 8081 40 && \
144+
echo Waiting a few seconds for topic creation to finish... && \
145+
sleep 11 && \
146+
tail -f /dev/null'"
147+
environment:
148+
KSQL_CONFIG_DIR: "/etc/ksql"
149+
STREAMS_BOOTSTRAP_SERVERS: broker:29092
150+
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
151+
STREAMS_SCHEMA_REGISTRY_PORT: 8081
152+
153+
rest-proxy:
154+
image: confluentinc/cp-kafka-rest:7.5.0
155+
depends_on:
156+
- broker
157+
- schema-registry
158+
ports:
159+
- 8082:8082
160+
hostname: rest-proxy
161+
container_name: rest-proxy
162+
environment:
163+
KAFKA_REST_HOST_NAME: rest-proxy
164+
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
165+
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
166+
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"account_update_requests": {
3+
"partitions": 3,
4+
"replication.factor": 1
5+
},
6+
"account_notifications": {
7+
"partitions": 3,
8+
"replication.factor": 1
9+
},
10+
"account-changes-processor__changelog": {
11+
"partitions": 3,
12+
"replication.factor": 1,
13+
"cleanup.policy": "compact"
14+
}
15+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
### Connection
2+
# Kafka Consumer/Producer
3+
FLUVII_PRODUCER_URLS=localhost:9092
4+
FLUVII_CONSUMER_URLS=localhost:9092
5+
# Schema Registry
6+
FLUVII_SCHEMA_REGISTRY_URL=http://localhost:8081
7+
8+
### This sets the consumer group name in FluviiApps
9+
FLUVII_APP_NAME='balance-changes-request-maker'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from fluvii.components.producer import ProducerFactory
2+
from time import sleep, time
3+
from random import randint, random
4+
5+
purchase_schema = {
6+
"name": "AccountBalanceChangeRequest",
7+
"type": "record",
8+
"fields": [
9+
{"name": "account_id", "type": "string", "default": ""},
10+
{"name": "timestamp", "type": "float", "default": 0},
11+
{"name": "amount", "type": "int", "default": 0},
12+
]
13+
}
14+
15+
produce_topic = "account_update_requests"
16+
producer = ProducerFactory(topic_schema_dict={produce_topic: purchase_schema})
17+
producer.produce(
18+
{"account_id": "TEST", "timestamp": time(), "amount": 0},
19+
key="TEST",
20+
topic=produce_topic
21+
)
22+
count = 0
23+
try:
24+
while count < 100000:
25+
account = f"A{randint(0, 10)}"
26+
producer.produce(
27+
{
28+
"account_id": account,
29+
"timestamp": time(),
30+
"amount": randint(-2500, 500)
31+
},
32+
key=account,
33+
topic=produce_topic
34+
)
35+
sleep(random()*2)
36+
count += 1
37+
finally:
38+
producer.close()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
source venv/bin/activate
2+
3+
set -a
4+
source consumer/creds.env
5+
set +a
6+
7+
python consumer/fluvii_consumer_ex.py
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
source venv/bin/activate
2+
3+
set -a
4+
source producer/creds.env
5+
set +a
6+
7+
python producer/fluvii_producer_ex.py
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
python -m venv venv
2+
3+
venv/bin/pip install fluvii

requirements.txt

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#
2-
# This file is autogenerated by pip-compile with python 3.8
3-
# To update, run:
2+
# This file is autogenerated by pip-compile with Python 3.9
3+
# by the following command:
44
#
55
# pip-compile --extra=dev
66
#
@@ -191,7 +191,6 @@ typing-extensions==4.4.0
191191
# via
192192
# black
193193
# pydantic
194-
# rich
195194
urllib3==1.26.12
196195
# via
197196
# requests

0 commit comments

Comments
 (0)