Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow gets stuck when sending logs to UI from some libraries #15762

Open
williamjamir opened this issue Oct 19, 2024 · 8 comments
Open

Flow gets stuck when sending logs to UI from some libraries #15762

williamjamir opened this issue Oct 19, 2024 · 8 comments
Labels
bug Something isn't working

Comments

@williamjamir
Copy link
Contributor

williamjamir commented Oct 19, 2024

Bug summary

I'm having trouble using Prefect with the pymc library. When I send logs to the UI, the flow gets stuck and doesn't finish, so I have to cancel it manually. This problem happens with both Prefect versions 2 and 3.

I've encountered similar issues with other situations, like with Dask and other integrations, but I couldn't figure out the problem. I think solving this issue will also help with other similar unnoticed problems.

Here's a minimal reproducible example. When it's working properly (without sending logs to the UI), it should run in 5 to 6 seconds.

logging.yaml

version: 1
disable_existing_loggers: False
handlers:
  console:
    level: WARNING
    class: prefect.logging.handlers.PrefectConsoleHandler

  api:
    class: prefect.logging.handlers.APILogHandler

loggers:
    pymc:
        level: INFO
        handlers: [ api ]
        propagate: false

root:
  level: INFO
  handlers: [ console ]

acme.py

from pathlib import Path
from prefect import flow
from pymc_marketing.mmm import MMM, GeometricAdstock, LogisticSaturation
import pandas as pd 

@flow()
def acme():
    if Path("data.csv").exists():
        data = pd.read_csv('data.csv')
    else:
        data_url = 'https://raw.githubusercontent.com/pymc-labs/pymc-marketing/main/data/mmm_example.csv'
        data = pd.read_csv(data_url, parse_dates=['date_week'])
        data.to_csv('data.csv')
    
    mmm = MMM(
        adstock=GeometricAdstock(l_max=1),
        saturation=LogisticSaturation(),
        date_column='date_week',
        channel_columns=['x1', 'x2'],
        control_columns=['event_1', 'event_2', 't'],
        yearly_seasonality=1,
    )
    X = data.drop('y', axis=1)
    y = data['y']
    mmm.fit(X, y)

if __name__ == '__main__':
    acme()
pip install prefect pymc-marketing
PREFECT_LOGGING_SETTINGS_PATH =<path>/logging.yml python acme.py

Version info (prefect version output)

Version:             3.0.0
API version:         0.8.4
Python version:      3.11.9
Git commit:          c40d069d
Built:               Tue, Sep 3, 2024 11:13 AM
OS/Arch:             darwin/arm64
Profile:             local
Server type:         server
Pydantic version:    2.9.2

Additional context

I already tried disabling the progress bar from Pymc, but the issue is still persisting (flow gets stuck)

    mmm.fit(X, y,  progressbar=False)

Also, this is the stack trace when interrupting the execution on Prefect 3

python -m acme
Using NumPy C-API based implementation for BLAS functions.
Sampling 4 chains, 1 divergences ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╺━━  92% 0:00:01 / 0:00:39
^C^CCrash detected! Execution was aborted by an interrupt signal.
Finished in state Crashed('Execution was aborted by an interrupt signal.')
Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/opt/opensource/dummy_folder/acme.py", line 31, in <module>
    hi()
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/prefect/flows.py", line 1334, in __call__
    return run_flow(
           ^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/prefect/flow_engine.py", line 810, in run_flow
    return run_flow_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/prefect/flow_engine.py", line 688, in run_flow_sync
    engine.call_flow_fn()
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/prefect/flow_engine.py", line 667, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/prefect/utilities/callables.py", line 206, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/opt/opensource/dummy_folder/acme.py", line 27, in hi
    mmm.fit(X, y)
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc_marketing/model_builder.py", line 606, in fit
    idata = pm.sample(**sampler_kwargs)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/sampling/mcmc.py", line 870, in sample
    return _sample_return(
           ^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/sampling/mcmc.py", line 938, in _sample_return
    idata = pm.to_inference_data(mtrace, **ikwargs)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/backends/arviz.py", line 520, in to_inference_data
    return InferenceDataConverter(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/backends/arviz.py", line 208, in __init__
    self.posterior_trace, self.warmup_trace = self.split_trace()
                                              ^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/backends/arviz.py", line 253, in split_trace
    trace_posterior = self.trace[self.ntune :]
                      ~~~~~~~~~~^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/backends/base.py", line 362, in __getitem__
    return self._slice(idx)
           ^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/backends/base.py", line 529, in _slice
    new_traces = [trace._slice(slice) for trace in self._straces.values()]
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/backends/base.py", line 529, in <listcomp>
    new_traces = [trace._slice(slice) for trace in self._straces.values()]
                  ^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/backends/ndarray.py", line 168, in _slice
    sliced = NDArray(model=self.model, vars=self.vars)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/backends/ndarray.py", line 44, in __init__
    super().__init__(name, model, vars, test_point)
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/backends/base.py", line 158, in __init__
    self.fn = model.compile_fn(vars, inputs=model.value_vars, on_unused_input="ignore")
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/model/core.py", line 1648, in compile_fn
    fn = compile_pymc(
         ^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pymc/pytensorf.py", line 1040, in compile_pymc
    pytensor_function = pytensor.function(
                        ^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/compile/function/__init__.py", line 315, in function
    fn = pfunc(
         ^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/compile/function/pfunc.py", line 465, in pfunc
    return orig_function(
           ^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/compile/function/types.py", line 1750, in orig_function
    m = Maker(
        ^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/compile/function/types.py", line 1523, in __init__
    self.prepare_fgraph(inputs, outputs, found_updates, fgraph, mode, profile)
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/compile/function/types.py", line 1411, in prepare_fgraph
    rewriter_profile = rewriter(fgraph)
                       ^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/graph/rewriting/basic.py", line 125, in __call__
    return self.rewrite(fgraph)
           ^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/graph/rewriting/basic.py", line 121, in rewrite
    return self.apply(fgraph, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/graph/rewriting/basic.py", line 291, in apply
    sub_prof = rewriter.apply(fgraph)
               ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/graph/rewriting/basic.py", line 2427, in apply
    node_rewriter_change = self.process_node(
                           ^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/graph/rewriting/basic.py", line 1965, in process_node
    fgraph.replace_all_validate_remove(  # type: ignore
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/graph/features.py", line 628, in replace_all_validate_remove
    chk = fgraph.replace_all_validate(replacements, reason=reason, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/graph/features.py", line 603, in replace_all_validate
    fgraph.validate()
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/graph/features.py", line 478, in validate_
    ret = fgraph.execute_callbacks("validate")
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/william/.pyenv/versions/3.11.9/envs/dummy_env/lib/python3.11/site-packages/pytensor/graph/fg.py", line 712, in execute_callbacks
    fn = getattr(feature, name)
         ^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt
make: *** [run] Interrupt: 2
@williamjamir williamjamir added the bug Something isn't working label Oct 19, 2024
@desertaxle
Copy link
Member

Thanks for the bug report @williamjamir! Have you tried using the PREFECT_EXTRA_LOGGERS setting? That setting is the recommended way to include logs from other libraries (here's where it's referenced in the docs).

If that doesn't work we can dig in more. My hunch is that when creating a custom logging.yaml you need to include some parts of Prefect's default logging.yaml.

@williamjamir
Copy link
Contributor Author

Hi @desertaxle , thanks for the prompt response!

If you meant PREFECT_LOGGING_EXTRA_LOGGERS, then yes, I have tried that, but unfortunately, it didn’t resolve the issue.

For reference, I used the following command:

PREFECT_LOGGING_EXTRA_LOGGERS='pymc' python -m acme

Regarding the custom logging configuration, the provided code is a minimal reproducible example.
In my actual application, I have integrated structlog, so I need to customize this file.
Instead of get_run_logger, I use structlog.get_logger, and my logs are written to a file in a structured format.

I’d appreciate any guidance on how to properly configure structlog while still incorporating parts of Prefect’s default logging behavior. Any specific instructions or examples would be incredibly helpful.

As for the original issue, I’m happy to assist in any way necessary.
I look forward to your thoughts and suggestions on how we can move forward!

@williamjamir
Copy link
Contributor Author

Hi @desertaxle,

Is there any way I can help with this investigation? Are there any leads or suggestions on where I should begin to assist in resolving this issue?

At the moment, I have the logs disabled, which is unfortunate, as it would be ideal to fully utilize the UI dashboard.

Thank you!

@williamjamir
Copy link
Contributor Author

I found the issue that was causing this error.

Underneath, pymc was explicitly using multiprocessing with fork, which is why the flow was stuck.
I managed to pass some parameters to use spawn or forkserver instead, and the logs are now working just fine.

I believe then that this issue is related to some of these tickets:

Do you think we should close this issue? Or could something from the Prefect's side be added to warn about this issue or somehow avoid this deadlock?

@ColtAllen
Copy link

Hey @williamjamir,

I'm one of the developers for pymc-marketing, and we're currently exploring orchestration integrations. Can you share some more context around the spawn and forkserver params you used?

@williamjamir
Copy link
Contributor Author

Hi @ColtAllen

The issue I was running into was caused by the mp_ctx configuration in PyMC (reference: https://github.com/pymc-devs/pymc/blob/main/pymc/sampling/parallel.py#L400).

On macOS, everything worked smoothly once I set the fit method parameter mp_ctx to either "forkserver" or "spawn."

I'm not entirely sure why my Linux server setup was problematic earlier; perhaps "fork" is the default there, or there might be another reason. 🤷‍♂️

To be on the safe side, I also added the following at the start of my pipeline:

with contextlib.suppress(RuntimeError):
    multiprocessing.set_start_method('forkserver')

Since some of our development happens on macOS, I also configured this environment variable:

os.putenv('OBJC_DISABLE_INITIALIZE_FORK_SAFETY', 'YES')

Please don't hesitate to ping me if I can clarify anything further.

On a different note, you have mentioned that it's exploring orchestration integrations, which sounds like an exciting development for the future of the library!

Currently, I am in the process of productionizing a pipeline that uses pymc-marketing. If there’s a roadmap or any ideas for improvements on the PyMC side, I would be happy to assist in any way I can.

Let me know if I can assist in any way!

@ColtAllen
Copy link

Thanks @williamjamir!

I've created an issue for this in the pymc repo: pymc-devs/pymc#7620

Another one of our developers has ran into similar issues using polars, which has a wonderful RunTimeWarning about this which you may find useful:

./lib/python3.11/multiprocessing/popen_fork.py:66: RuntimeWarning: Using fork() can cause Polars to deadlock in the child process.
In addition, using fork() with Python in general is a recipe for mysterious
deadlocks and crashes.

The most likely reason you are seeing this error is because you are using the
multiprocessing module on Linux, which uses fork() by default. This will be
fixed in Python 3.14. Until then, you want to use the "spawn" context instead.

See https://docs.pola.rs/user-guide/misc/multiprocessing/ for details.

If you really know what your doing, you can silence this warning with the warning module
or by setting POLARS_ALLOW_FORKING_THREAD=1.

  self.pid = os.fork()

@ColtAllen
Copy link

Speaking beyond pymc, I've encountered this same issue with flows getting stuck in the UI when using RayTaskRunner.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants