diff --git a/examples/openbb-apachebeam/README.md b/examples/openbb-apachebeam/README.md index 3e2d1d4ad21e..bd4f8ee24577 100644 --- a/examples/openbb-apachebeam/README.md +++ b/examples/openbb-apachebeam/README.md @@ -1,12 +1,13 @@ # OBB Dataflow Sample +This is a sample on how to invoke OBB fetchers in an Apache Beam pipeline. (GCP Dataflow is built on Apache Beam) -This is a sample how to invoke OBB fetchers in an Apache Beam pipeline. (GCP dataflow is build on Apache Beam) Pre-requisites -- You need to create a Conda environment (or a virtual env) using requirements.txt in this directory -- The script exercise 3 OBB endpoints, all of which require no credentials -- Run the test from the main directory - python -m unittest .\tests\test_obb_pipeline.py +- You need to create a Conda environment (or a virtual env) using `requirements.txt` in this directory +- The script exercises three OBB endpoints, all of which require no credentials +- Run the test from this directory: + cd examples/openbb-apachebeam + python -m unittest tests/test_obb_pipeline.py -The script will run a pipeline consisting of 3 task which will fetch AAPL quote, profile and news -This is just a very basic sample which can be used as building block to create more complex scenarios +The script will run a pipeline consisting of three tasks which will fetch an AAPL quote, profile, and news. +This is just a very basic sample which can be used as a building block to create more complex scenarios diff --git a/examples/openbb-apachebeam/tests/test_obb_pipeline.py b/examples/openbb-apachebeam/tests/test_obb_pipeline.py index 097df4237037..69ab99e90953 100644 --- a/examples/openbb-apachebeam/tests/test_obb_pipeline.py +++ b/examples/openbb-apachebeam/tests/test_obb_pipeline.py @@ -22,26 +22,35 @@ async def fetch_data(self, element: str): def process(self, element: str): return asyncio.run(self.fetch_data(element)) -class MyTestCase(unittest.TestCase): +class MyTestCase(unittest.TestCase): def test_sample_pipeline(self): - credentials = {} # Running OBB endpoints which do not require credentials + credentials = {} # Running OBB endpoints which do not require credentials debug_sink = beam.Map(print) ticker = 'AAPL' with TestPipeline(options=PipelineOptions()) as p: - quote = (p | 'Start Quote' >> beam.Create([ticker]) - | 'Run Quote' >> beam.ParDo(AsyncProcess(credentials, quote_fetcher)) - | 'Print quote' >> debug_sink) - - profile = (p | 'Start Profile' >> beam.Create([ticker]) - | 'Run Profile' >> beam.ParDo(AsyncProcess(credentials, profile_fetcher)) - | 'Print profile' >> debug_sink) - - news = (p | 'Start News' >> beam.Create([ticker]) - | 'Run News' >> beam.ParDo(AsyncProcess(credentials, news_fetcher)) - | 'Print nes' >> debug_sink) + quote = ( + p + | 'Start Quote' >> beam.Create([ticker]) + | 'Run Quote' >> beam.ParDo(AsyncProcess(credentials, quote_fetcher)) + | 'Print quote' >> debug_sink + ) + + profile = ( + p + | 'Start Profile' >> beam.Create([ticker]) + | 'Run Profile' >> beam.ParDo(AsyncProcess(credentials, profile_fetcher)) + | 'Print profile' >> debug_sink + ) + + news = ( + p + | 'Start News' >> beam.Create([ticker]) + | 'Run News' >> beam.ParDo(AsyncProcess(credentials, news_fetcher)) + | 'Print news' >> debug_sink + ) if __name__ == '__main__':