Skip to content

Latest commit

 

History

History
65 lines (43 loc) · 3.63 KB

README.md

File metadata and controls

65 lines (43 loc) · 3.63 KB

Gettting Started with Faust

Make sure you have Kafka running locally. If you don't already have a local cluster then you can use the docker compose in this repo.

docker-compose up

The dependencies are set up just using a dumb requirements.txt file.

pip install -r requirements.txt

Then to run the agent(s) use:

faust -A app.demo worker -l info

How to use this Repo

The intent of this repo is to demonstrate a simple usage of agents, topics, tables, and records in Faust, and to give developers interested in Faust a simple but working toy system to modify and fool around with.

What the Demo Agents Do

Overall this is a very simple toy pipeline that listens to a stream of purchases and aggregates them into a mapping from the customer to the total amount of money spent. The service also exposes an endpoint that allows a client to make a request to see how much money a given customer has spent (an example request would be a GET request to http://localhost:6066/customer/brian).

It listens to a topic ("purchase_topic") and consumes messages that are json representations of the type PurchaseRecord. PurchaseRecord is a subtype of a faust Record, which is basically a struct or named tuple with some optional niceities (in this example we declare the class using the argument serializer="json" which takes care of de/serialization for us).

It then invokes handle_purchase on each PurchaseRecord, which fetches the customer record from the customer_table and adds the purchase amount to the customer's total. Tables can largely be treated like dictionaries but they are persistant across restarts due to Faust storing a changelog of the collection on an autogenerated topic. Documentation is available here.

The purchase_processor then yields the purchase, which is passed to the sink declared in the agent decorator. The sink in this case is the purchase_note_topic, which is being listened to by a second agent, the note_processor. This agent simply prints the customer and the contents of their purchase's note if there is one.

Sending Messages to the Agent

demo_producer has some simple functions for sending PurchaseRecords to the topic that the agent is listening to. In a new terminal window you can open up a new ipython terminal and do the following:

In [3]: from app.demo_producer import *
In [4]: send_purchases(make_random_purchases(10))

and you should see something like the following in the agent output:

[2020-04-02 12:33:01,443] [36672] [WARNING] meher has spent 225 so far. 
[2020-04-02 12:33:01,443] [36672] [WARNING] Processing: 
[2020-04-02 12:33:01,444] [36672] [WARNING] <PurchaseRecord: msg_id='27325376-03b3-4bc8-988b-bfc1abe61ff3', customer='brian', amount=18, note='Baz'> 
[2020-04-02 12:33:01,444] [36672] [WARNING] brian has spent 290 so far. 
[2020-04-02 12:33:01,444] [36672] [WARNING] Processing: 
[2020-04-02 12:33:01,444] [36672] [WARNING] <PurchaseRecord: msg_id='d41dd1cd-c803-477e-a232-75d7069145cb', customer='brian', amount=5, note='Baz'> 
[2020-04-02 12:33:01,445] [36672] [WARNING] brian has spent 295 so far. 
[2020-04-02 12:33:01,445] [36672] [WARNING] Processing: 
[2020-04-02 12:33:01,446] [36672] [WARNING] <PurchaseRecord: msg_id='314b6a01-305d-4b9d-b622-5b9870803486', customer='meher', amount=2, note='Foo'> 
[2020-04-02 12:33:01,446] [36672] [WARNING] meher has spent 227 so far. 
[2020-04-02 12:33:02,422] [36672] [WARNING] brian says: Bar. 
[2020-04-02 12:33:02,423] [36672] [WARNING] brian says: Foo.

Resources

Faust Documentation