Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: External Integration Issues caused by multiprocessing.fork #7620

Open
ColtAllen opened this issue Dec 20, 2024 · 2 comments
Open

BUG: External Integration Issues caused by multiprocessing.fork #7620

ColtAllen opened this issue Dec 20, 2024 · 2 comments
Labels

Comments

@ColtAllen
Copy link

Describe the issue:

I'm one of the developers for pymc-marketing, and the way pymc currently handles multiprocessing is creating problems in data pipelines for some users:

Polars

RunTimeWarning

./lib/python3.11/multiprocessing/popen_fork.py:66: RuntimeWarning: Using fork() can cause Polars to deadlock in the child process.
In addition, using fork() with Python in general is a recipe for mysterious
deadlocks and crashes.

The most likely reason you are seeing this error is because you are using the
multiprocessing module on Linux, which uses fork() by default. This will be
fixed in Python 3.14. Until then, you want to use the "spawn" context instead.

See https://docs.pola.rs/user-guide/misc/multiprocessing/ for details.

If you really know what your doing, you can silence this warning with the warning module
or by setting POLARS_ALLOW_FORKING_THREAD=1.

  self.pid = os.fork()

Prefect Orchestration

PrefectHQ/prefect#15762

The issue I was running into was caused by the mp_ctx configuration in PyMC (reference: https://github.com/pymc-devs/pymc/blob/main/pymc/sampling/parallel.py#L400).

Reproduceable code example:

from pathlib import Path
from prefect import flow
from pymc_marketing.mmm import MMM, GeometricAdstock, LogisticSaturation
import pandas as pd 

@flow()
def acme():
    if Path("data.csv").exists():
        data = pd.read_csv('data.csv')
    else:
        data_url = 'https://raw.githubusercontent.com/pymc-labs/pymc-marketing/main/data/mmm_example.csv'
        data = pd.read_csv(data_url, parse_dates=['date_week'])
        data.to_csv('data.csv')
    
    mmm = MMM(
        adstock=GeometricAdstock(l_max=1),
        saturation=LogisticSaturation(),
        date_column='date_week',
        channel_columns=['x1', 'x2'],
        control_columns=['event_1', 'event_2', 't'],
        yearly_seasonality=1,
    )
    X = data.drop('y', axis=1)
    y = data['y']
    mmm.fit(X, y)

if __name__ == '__main__':
    acme()

Error message:

python -m acme
Using NumPy C-API based implementation for BLAS functions.
Sampling 4 chains, 1 divergences ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╺━━  92% 0:00:01 / 0:00:39
^C^CCrash detected! Execution was aborted by an interrupt signal.
Finished in state Crashed('Execution was aborted by an interrupt signal.')
Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/opt/opensource/dummy_folder/acme.py", line 31, in <module>
    hi()
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/prefect/flows.py", line 1334, in __call__
    return run_flow(
           ^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/prefect/flow_engine.py", line 810, in run_flow
    return run_flow_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/prefect/flow_engine.py", line 688, in run_flow_sync
    engine.call_flow_fn()
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/prefect/flow_engine.py", line 667, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/prefect/utilities/callables.py", line 206, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/opt/opensource/dummy_folder/acme.py", line 27, in hi
    mmm.fit(X, y)
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc_marketing/model_builder.py", line 606, in fit
    idata = pm.sample(**sampler_kwargs)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/sampling/mcmc.py", line 870, in sample
    return _sample_return(
           ^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/sampling/mcmc.py", line 938, in _sample_return
    idata = pm.to_inference_data(mtrace, **ikwargs)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/backends/arviz.py", line 520, in to_inference_data
    return InferenceDataConverter(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/backends/arviz.py", line 208, in __init__
    self.posterior_trace, self.warmup_trace = self.split_trace()
                                              ^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/backends/arviz.py", line 253, in split_trace
    trace_posterior = self.trace[self.ntune :]
                      ~~~~~~~~~~^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/backends/base.py", line 362, in __getitem__
    return self._slice(idx)
           ^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/backends/base.py", line 529, in _slice
    new_traces = [trace._slice(slice) for trace in self._straces.values()]
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/backends/base.py", line 529, in <listcomp>
    new_traces = [trace._slice(slice) for trace in self._straces.values()]
                  ^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/backends/ndarray.py", line 168, in _slice
    sliced = NDArray(model=self.model, vars=self.vars)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/backends/ndarray.py", line 44, in __init__
    super().__init__(name, model, vars, test_point)
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/backends/base.py", line 158, in __init__
    self.fn = model.compile_fn(vars, inputs=model.value_vars, on_unused_input="ignore")
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/model/core.py", line 1648, in compile_fn
    fn = compile_pymc(
         ^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/pytensorf.py", line 1040, in compile_pymc
    pytensor_function = pytensor.function(
                        ^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/compile/function/__init__.py", line 315, in function
    fn = pfunc(
         ^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/compile/function/pfunc.py", line 465, in pfunc
    return orig_function(
           ^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/compile/function/types.py", line 1750, in orig_function
    m = Maker(
        ^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/compile/function/types.py", line 1523, in __init__
    self.prepare_fgraph(inputs, outputs, found_updates, fgraph, mode, profile)
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/compile/function/types.py", line 1411, in prepare_fgraph
    rewriter_profile = rewriter(fgraph)
                       ^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/graph/rewriting/basic.py", line 125, in __call__
    return self.rewrite(fgraph)
           ^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/graph/rewriting/basic.py", line 121, in rewrite
    return self.apply(fgraph, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/graph/rewriting/basic.py", line 291, in apply
    sub_prof = rewriter.apply(fgraph)
               ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/graph/rewriting/basic.py", line 2427, in apply
    node_rewriter_change = self.process_node(
                           ^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/graph/rewriting/basic.py", line 1965, in process_node
    fgraph.replace_all_validate_remove(  # type: ignore
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/graph/features.py", line 628, in replace_all_validate_remove
    chk = fgraph.replace_all_validate(replacements, reason=reason, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/graph/features.py", line 603, in replace_all_validate
    fgraph.validate()
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/graph/features.py", line 478, in validate_
    ret = fgraph.execute_callbacks("validate")
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/graph/fg.py", line 712, in execute_callbacks
    fn = getattr(feature, name)
         ^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt
make: *** [run] Interrupt: 2

PyMC version information:

Code example lifted from PrefectHQ/prefect#15762 (comment) which was written wrt Prefect, but here are the assumed dependencies:

pymc-marketing==0.10.0
python==3.11.9
pymc>=5.13.0,<5.16.0

Context for the issue:

This could be a blocker for industry adoption of pymc in more areas than we know. polars is a popular alternative to pandas, and prefect is an increasingly popular orchestration engine. It's possible users are encountering issues with other orchestration services like airflow, and anywhere else where multiprocessing is applied.

@ricardoV94
Copy link
Member

ricardoV94 commented Dec 20, 2024

pm.sample accepts mp_ctx keyword argument to define the multiprocessing strategy that is used.

@ColtAllen
Copy link
Author

pm.sample accepts mp_ctx keyword argument to define the multiprocessing strategy that is used.

The Prefect issue arose in both MacOS and Linux. For the latter maybe the fix is as simple as a conditional hereabouts to make spawn the new default?

forkserver may be a better option per the multiprocessing docs, but has some nuances for Linux.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants