Skip to content

FoCVS - Apache Beam failing to run with InteractiveRunner (instead of DirectRunner / Dataflowrunner) #34

@NumesSanguis

Description

@NumesSanguis

To better understand what is going on in the Future-Customer-Value-Segments (FoCVS) solution and to do additional exploration on the data in intermediate steps, I'm trying to run fcvs_pipeline_csv.py in a Jupyter Notebook step by step with InteractiveRunner. Unfortunately I've been running into some issues, which don't appear when the runner is set to DirectRunner.

Notebook showing attempt so far

.ipynb hosted on Colab (originally GCP, see below):
https://colab.research.google.com/drive/1sbKkh2goXQU1NhYOQ8MWH_vT1GTvqVgw?usp=sharing

InteractiveRunner error

When you remove the with statement, so we can execute it step by step, by replacing this line:

with beam.Pipeline(options=options) as pipeline:

with:

# --runner=InteractiveRunner
# ...

pipeline = beam.Pipeline(options=options)

it goes well until this step in the pipeline:

limits_dates = (
    min_max_dates
    | beam.FlatMap(c.limit_dates_boundaries, pvalue.AsSingleton(options))
)

which will throw the following error:

ValueError: No producer for ref_PCollection_PCollection_25
Full error (CLICK ME)

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-20-5ba7514f8494> in <module>
      4 )
      5 
----> 6 ib.show(limits_dates)

~/apache-beam-2.31.0/lib/python3.7/site-packages/apache_beam/runners/interactive/utils.py in run_within_progress_indicator(*args, **kwargs)
    243   def run_within_progress_indicator(*args, **kwargs):
    244     with ProgressIndicator('Processing...', 'Done.'):
--> 245       return func(*args, **kwargs)
    246 
    247   return run_within_progress_indicator

~/apache-beam-2.31.0/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_beam.py in show(include_window_info, visualize_data, n, duration, *pcolls)
    473   recording_manager = ie.current_env().get_recording_manager(
    474       user_pipeline, create_if_absent=True)
--> 475   recording = recording_manager.record(pcolls, max_n=n, max_duration=duration)
    476 
    477   # Catch a KeyboardInterrupt to gracefully cancel the recording and

~/apache-beam-2.31.0/lib/python3.7/site-packages/apache_beam/runners/interactive/recording_manager.py in record(self, pcolls, max_n, max_duration)
    453           category=DeprecationWarning)
    454       pf.PipelineFragment(list(uncomputed_pcolls),
--> 455                           self.user_pipeline.options).run()
    456       result = ie.current_env().pipeline_result(self.user_pipeline)
    457     else:

~/apache-beam-2.31.0/lib/python3.7/site-packages/apache_beam/runners/interactive/pipeline_fragment.py in run(self, display_pipeline_graph, use_cache, blocking)
    111       self._runner_pipeline.runner._force_compute = not use_cache
    112       self._runner_pipeline.runner._blocking = blocking
--> 113       return self.deduce_fragment().run()
    114     finally:
    115       self._runner_pipeline.runner._skip_display = preserved_skip_display

~/apache-beam-2.31.0/lib/python3.7/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
    562         finally:
    563           shutil.rmtree(tmpdir)
--> 564       return self.runner.run_pipeline(self, self._options)
    565     finally:
    566       shutil.rmtree(self.local_tempdir, ignore_errors=True)

~/apache-beam-2.31.0/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_runner.py in run_pipeline(self, pipeline, options)
    166 
    167     pipeline_to_execute = beam.pipeline.Pipeline.from_runner_api(
--> 168         pipeline_instrument.instrumented_pipeline_proto(),
    169         self._underlying_runner,
    170         options)

~/apache-beam-2.31.0/lib/python3.7/site-packages/apache_beam/runners/interactive/pipeline_instrument.py in instrumented_pipeline_proto(self)
    124       # instrumented pipeline run cares.
    125       return pf.PipelineFragment(
--> 126           list(targets)).deduce_fragment().to_runner_api()
    127     return self._pipeline.to_runner_api()
    128 

~/apache-beam-2.31.0/lib/python3.7/site-packages/apache_beam/runners/interactive/pipeline_fragment.py in deduce_fragment(self)
     98         self._runner_pipeline.to_runner_api(),
     99         self._runner_pipeline.runner,
--> 100         self._options)
    101     ie.current_env().add_derived_pipeline(self._runner_pipeline, fragment)
    102     return fragment

~/apache-beam-2.31.0/lib/python3.7/site-packages/apache_beam/pipeline.py in from_runner_api(proto, runner, options, return_context)
    932       pcollection.pipeline = p
    933       if not pcollection.producer:
--> 934         raise ValueError('No producer for %s' % id)
    935 
    936     # Inject PBegin input where necessary.

ValueError: No producer for ref_PCollection_PCollection_25

Questions

  1. Would it be possible to fix this error (and probably more after this), so it will be possible to fully run this pipeline in a Jupyter Notebook with InteractiveRunner?
  2. How did you develop this pipeline? The DAG generated from this pipeline has many connected parts. It is hard to imagine that this is written in one go, instead of interactively testing out each part.
  3. If question 1 is not possible, how would you suggest to learn about the internals of this solution, and how to play around with the intermediate data transforms?

FoCVS-pipeline-graph_xxs

Execution environment

  • GCP Dataflow Notebook with Kernel: apache-beam-2.31.0 (last updated 2021.08.30)
  • Python 3.7

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions