Make sure you have Kafka running locally. If you don't already have a local cluster then you can use the docker compose in this repo.
docker-compose up
The dependencies are set up just using a dumb requirements.txt file.
pip install -r requirements.txt
Then to run the agent(s) use:
faust -A app.demo worker -l info
The intent of this repo is to demonstrate a simple usage of agents, topics, tables, and records in Faust, and to give developers interested in Faust a simple but working toy system to modify and fool around with.
Overall this is a very simple toy pipeline that listens to a stream of purchases and aggregates them into a mapping from the customer to the total amount of money spent. The service also exposes an endpoint that allows a client to make a request to see how much money a given customer has spent (an example request would be a GET request to http://localhost:6066/customer/brian).
It listens to a topic ("purchase_topic") and consumes messages that are json representations of the type PurchaseRecord
. PurchaseRecord
is a subtype of a faust Record
, which is basically a struct or named tuple with some optional niceities (in this example we declare the class using the argument serializer="json"
which takes care of de/serialization for us).
It then invokes handle_purchase
on each PurchaseRecord, which fetches the customer record from the customer_table and adds the purchase amount to the customer's total. Tables can largely be treated like dictionaries but they are persistant across restarts due to Faust storing a changelog of the collection on an autogenerated topic. Documentation is available here.
The purchase_processor then yields the purchase, which is passed to the sink
declared in the agent decorator. The sink in this case is the purchase_note_topic
, which is being listened to by a second agent, the note_processor
. This agent simply prints the customer and the contents of their purchase's note if there is one.
demo_producer has some simple functions for sending PurchaseRecords to the topic that the agent is listening to. In a new terminal window you can open up a new ipython terminal and do the following:
In [3]: from app.demo_producer import *
In [4]: send_purchases(make_random_purchases(10))
and you should see something like the following in the agent output:
[2020-04-02 12:33:01,443] [36672] [WARNING] meher has spent 225 so far.
[2020-04-02 12:33:01,443] [36672] [WARNING] Processing:
[2020-04-02 12:33:01,444] [36672] [WARNING] <PurchaseRecord: msg_id='27325376-03b3-4bc8-988b-bfc1abe61ff3', customer='brian', amount=18, note='Baz'>
[2020-04-02 12:33:01,444] [36672] [WARNING] brian has spent 290 so far.
[2020-04-02 12:33:01,444] [36672] [WARNING] Processing:
[2020-04-02 12:33:01,444] [36672] [WARNING] <PurchaseRecord: msg_id='d41dd1cd-c803-477e-a232-75d7069145cb', customer='brian', amount=5, note='Baz'>
[2020-04-02 12:33:01,445] [36672] [WARNING] brian has spent 295 so far.
[2020-04-02 12:33:01,445] [36672] [WARNING] Processing:
[2020-04-02 12:33:01,446] [36672] [WARNING] <PurchaseRecord: msg_id='314b6a01-305d-4b9d-b622-5b9870803486', customer='meher', amount=2, note='Foo'>
[2020-04-02 12:33:01,446] [36672] [WARNING] meher has spent 227 so far.
[2020-04-02 12:33:02,422] [36672] [WARNING] brian says: Bar.
[2020-04-02 12:33:02,423] [36672] [WARNING] brian says: Foo.
Resources