Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions examples/openbb-apachebeam/README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
# OBB Dataflow Sample

This is a sample on how to invoke OBB fetchers in an Apache Beam pipeline. (GCP Dataflow is built on Apache Beam)

This is a sample how to invoke OBB fetchers in an Apache Beam pipeline. (GCP dataflow is build on Apache Beam)
Pre-requisites
- You need to create a Conda environment (or a virtual env) using requirements.txt in this directory
- The script exercise 3 OBB endpoints, all of which require no credentials
- Run the test from the main directory
python -m unittest .\tests\test_obb_pipeline.py
- You need to create a Conda environment (or a virtual env) using `requirements.txt` in this directory
- The script exercises three OBB endpoints, all of which require no credentials
- Run the test from this directory:
cd examples/openbb-apachebeam
python -m unittest tests/test_obb_pipeline.py

The script will run a pipeline consisting of 3 task which will fetch AAPL quote, profile and news
This is just a very basic sample which can be used as building block to create more complex scenarios
The script will run a pipeline consisting of three tasks which will fetch an AAPL quote, profile, and news.
This is just a very basic sample which can be used as a building block to create more complex scenarios
35 changes: 22 additions & 13 deletions examples/openbb-apachebeam/tests/test_obb_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,35 @@ async def fetch_data(self, element: str):
def process(self, element: str):
return asyncio.run(self.fetch_data(element))

class MyTestCase(unittest.TestCase):

class MyTestCase(unittest.TestCase):

def test_sample_pipeline(self):
credentials = {} # Running OBB endpoints which do not require credentials
credentials = {} # Running OBB endpoints which do not require credentials
debug_sink = beam.Map(print)
ticker = 'AAPL'

with TestPipeline(options=PipelineOptions()) as p:
quote = (p | 'Start Quote' >> beam.Create([ticker])
| 'Run Quote' >> beam.ParDo(AsyncProcess(credentials, quote_fetcher))
| 'Print quote' >> debug_sink)

profile = (p | 'Start Profile' >> beam.Create([ticker])
| 'Run Profile' >> beam.ParDo(AsyncProcess(credentials, profile_fetcher))
| 'Print profile' >> debug_sink)

news = (p | 'Start News' >> beam.Create([ticker])
| 'Run News' >> beam.ParDo(AsyncProcess(credentials, news_fetcher))
| 'Print nes' >> debug_sink)
quote = (
p
| 'Start Quote' >> beam.Create([ticker])
| 'Run Quote' >> beam.ParDo(AsyncProcess(credentials, quote_fetcher))
| 'Print quote' >> debug_sink
)

profile = (
p
| 'Start Profile' >> beam.Create([ticker])
| 'Run Profile' >> beam.ParDo(AsyncProcess(credentials, profile_fetcher))
| 'Print profile' >> debug_sink
)

news = (
p
| 'Start News' >> beam.Create([ticker])
| 'Run News' >> beam.ParDo(AsyncProcess(credentials, news_fetcher))
| 'Print news' >> debug_sink
)


if __name__ == '__main__':
Expand Down