Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] duckdb read_csv_auto kwarg columns clashes with load_df #376

Open
rdmolony opened this issue Oct 19, 2022 · 6 comments
Open

[BUG] duckdb read_csv_auto kwarg columns clashes with load_df #376

rdmolony opened this issue Oct 19, 2022 · 6 comments

Comments

@rdmolony
Copy link
Collaborator

rdmolony commented Oct 19, 2022

Minimal Code To Reproduce

import os
import tempfile
import textwrap
import typing

import duckdb
from fugue import DataFrame
from fugue import ExecutionEngine
from fugue import FugueWorkflow
from fugue_duckdb import DuckExecutionEngine
from fugue_sql import fsql
import pandas as pd


def create_temporary_file(
    _content: str, suffix: str, prefix: str="fugue_example_"
) -> str:
    text_file = tempfile.NamedTemporaryFile(
        suffix=suffix, prefix=prefix, delete=False
    )
    text_file.write(_content)
    return text_file.name


def read_header(filepath: str) -> typing.List[str]:
    row_1 = pd.read_csv(filepath, skiprows=1, nrows=0).columns
    header = [row_1[0].replace("columns: ", ""), *row_1[1:]]
    return header


# Let's say I have a non-standard `csv` ...
content = textwrap.dedent("""\
    date: 2022-10-17
    columns: a,b,c
    1,2,3
    1,2,3"""
).encode("utf-8")


# ... I can read it no problem using the `duckdb` execution engine directly ...
def read_text_file(filepath: str) -> DataFrame:
    headers = read_header(filepath)
    engine = DuckExecutionEngine()
    return engine.load_df(csv_filepath, skip=2, columns=headers)

csv_filepath = create_temporary_file(content, suffix=".csv")

dag = FugueWorkflow()
df = dag.create(read_text_file, params={"filepath": csv_filepath})
df.show()
dag.run(engine="duck")

os.unlink(csv_filepath)


# ... I can't easily use `fsql` instead as `columns` clashes ...
def read_text_file(filepath: str) -> DataFrame:
    headers = read_header(filepath)
    return fsql(f"LOAD '{filepath}' (skip=2, columns={headers})")

csv_filepath = create_temporary_file(content, suffix=".csv")

dag = FugueWorkflow()
df = dag.create(read_text_file, params={"filepath": csv_filepath})
df.show()

try:
    dag.run(engine="duck")
except Exception as e:
    print(e)

os.unlink(csv_filepath)

Describe the bug
It seems that the fugue execution engine kwarg columns clashes with duckdb columns on parsing the sql ...

def visitFugueLoadTask(self, ctx: fp.FugueLoadTaskContext) -> WorkflowDataFrame:
        data = self.get_dict(ctx, "fmt", "path", "params", "columns")
        __modified_exception__ = self.to_runtime_error(ctx)  # noqa
        return self.workflow.load(
            path=data["path"],
            fmt=data.get("fmt", ""),
            columns=data.get("columns"),
            **data.get("params", {}),
        )

... columns is passed within params and not as columns

Expected behavior
fsql should parse LOAD ... so that engine.load_df receives the same arguments as using it directly

Environment (please complete the following information):

  • Backend: duckdb
  • Backend version: duckdb 0.5.1
  • Python version: 3.10.6
  • OS: linux (WSL2)

Mentioned in fugue-project/tutorials#170
Example adapted from fugue-project/tutorials#178 &

@rdmolony
Copy link
Collaborator Author

The easiest thing might be to do something like ...

def visitFugueLoadTask(self, ctx: fp.FugueLoadTaskContext) -> WorkflowDataFrame:
        data = self.get_dict(ctx, "fmt", "path", "params", "columns")
        __modified_exception__ = self.to_runtime_error(ctx)  # noqa
        
        params = data.get("params", {})
        try:
            columns = data["columns"]
        except:
            columns = params.pop("columns", "")
    
        return self.workflow.load(
            path=data["path"],
            fmt=data.get("fmt", ""),
            columns=columns,
            **params,
        )

... though I'm not too familiar with testing this under fugue_sql, it looks like test_workflow_parse.test_load is the right place for this sort of test as it is a LOAD related sql bug

@goodwanghan
Copy link
Collaborator

goodwanghan commented Oct 25, 2022

Ok, I see there are a couple of issues, look at this code

def read_text_file(filepath: str) -> DataFrame:
    headers = read_header(filepath)
    engine = DuckExecutionEngine()
    return engine.load_df(csv_filepath, skip=2, columns=headers)

csv_filepath = create_temporary_file(content, suffix=".csv")

dag = FugueWorkflow()
df = dag.create(read_text_file, params={"filepath": csv_filepath})
df.show()
dag.run(engine="duck")

It may work, but it works with luck, the best way is:

def read_text_file(engine:ExecutionEngine, filepath: str) -> DataFrame:
    headers = read_header(filepath)
    return engine.load_df(csv_filepath, skip=2, columns=headers)

So you don't instantiate an engine by yourself. And this could also work with different engines. The workflow part stays the same.

@kvnkho
Copy link
Collaborator

kvnkho commented Oct 25, 2022

I think he had to instantiate the engine because there was a bit of inconsistent behavior between the Pandas and DuckDB engines when reading multi-header CSVs. In this issue in the tutorials repo, he wrote the following code to make things consistent:

def read_text_file(engine: ExecutionEngine, filepath: str) -> DataFrame:
    headers = read_header(filepath)
    if isinstance(engine, NativeExecutionEngine):
        # load_df uses pandas.read_csv
        df = engine.load_df(filepath, infer_schema=True, header=True, skiprows=3, names=headers)
    elif isinstance(engine, DuckExecutionEngine):
        # load_df uses duckdb read_csv_auto
        df = engine.load_df(filepath, infer_schema=True, skip=4, columns=headers)
    elif isinstance(engine, DaskExecutionEngine):
        # load_df uses dask.dataframe.read_csv
        df = engine.load_df(filepath, infer_schema=True, header=True, skiprows=3, names=headers)
    else:
        supported_engines = {NativeExecutionEngine, DuckExecutionEngine, DaskExecutionEngine}   
        raise ValueError(f"Engine {engine} is not supported, must be one of {supported_engines}")
    return df

Native engine and Duck engine have different values for skip to make it work.

I think the takeaway here is that using the engine method is deferred and we won't design (for now) to accommodate such multi-header files until we see them more? Making a uniform interface for all the kwargs of CSVs is quite a bit of effort so I suggest we just rename and keep the issue open.

@goodwanghan
Copy link
Collaborator

This code has multiple issues:

# ... I can't easily use `fsql` instead as `columns` clashes ...
def read_text_file(filepath: str) -> DataFrame:
    headers = read_header(filepath)
    return fsql(f"LOAD '{filepath}' (skip=2, columns={headers})")

csv_filepath = create_temporary_file(content, suffix=".csv")

dag = FugueWorkflow()
df = dag.create(read_text_file, params={"filepath": csv_filepath})
df.show()

First, I think your first solution is better, CSV needs heavy customization, using a creator to wrap the complexity is the better way.

Second, you could do something like:

schema = ",".join([x+":str" for x in headers])
fsql(f"LOAD '{filepath}' (skip=2) COLUMNS {schema}")

But, you already see it's tedious and not so intuitive, and actually it still can't work, this is because

Third, fsql creates a FugueWorkflow, so it's not supposed to be used inside another FugueWorkflow.

So what you could do, if you really want a programmatical solution, may be like this:

from fugue_sql import FugueSQLWorkflow

schema = ",".join([x+":str" for x in headers])

dag = FugueSQLWorkflow()
df = dag(f"LOAD '{filepath}' (skip=2) COLUMNS {schema}")
df.show()

FugueSQLWorkflow is the object fsql created.

But this interface is not supposed to be used by end users, and actually we are going to merge FugueSQLWorkflow into FugueWorkflow very soon. So I don't recommend using it directly.

@goodwanghan
Copy link
Collaborator

goodwanghan commented Oct 25, 2022

I think he had to instantiate the engine because there was a bit of inconsistent behavior between the Pandas and DuckDB engines when reading multi-header CSVs. In this issue in the tutorials repo, he wrote the following code to make things consistent:

def read_text_file(engine: ExecutionEngine, filepath: str) -> DataFrame:
    headers = read_header(filepath)
    if isinstance(engine, NativeExecutionEngine):
        # load_df uses pandas.read_csv
        df = engine.load_df(filepath, infer_schema=True, header=True, skiprows=3, names=headers)
    elif isinstance(engine, DuckExecutionEngine):
        # load_df uses duckdb read_csv_auto
        df = engine.load_df(filepath, infer_schema=True, skip=4, columns=headers)
    elif isinstance(engine, DaskExecutionEngine):
        # load_df uses dask.dataframe.read_csv
        df = engine.load_df(filepath, infer_schema=True, header=True, skiprows=3, names=headers)
    else:
        supported_engines = {NativeExecutionEngine, DuckExecutionEngine, DaskExecutionEngine}   
        raise ValueError(f"Engine {engine} is not supported, must be one of {supported_engines}")
    return df

Native engine and Duck engine have different values for skip to make it work.

I think the takeaway here is that using the engine method is deferred and we won't design (for now) to accommodate such multi-header files until we see them more? Making a uniform interface for all the kwargs of CSVs is quite a bit of effort so I suggest we just rename and keep the issue open.

Reading CSV is extremely hard to unify, by unifying it, we actually just create a new rule that may or may not be reasonable. And as you can see none of the CSV reading functions from different backends are perfect, and they are all drastically different.

On the other hand, unifying CSV reading makes people want to stay with CSV, that is what we don't want to see. We want people to move away from CSV as early as possible.

So it is hard to justify the effort to further improve the CSV features, at least for now, we can't prioritize it, users can create Creators to read their special CSVs.

By the way, I think to have if-else on Engines inside a custom function is not a good practice. Remember Fugue should make the coupling very loose, but this code is doing the opposite.

Instead, if you know you will only use DuckDB, you can do this:

from duckdb import DuckDBPyRelation, DuckDBPyConnection

def read_text_file(engine:DuckDBPyConnection, filepath: str) -> DuckDBPyRelation:
    headers = read_header(filepath)
    return engine.from_csv_auto(...)

This way, your creator is totally independent from Fugue, and can only work with duckdb backend. DuckDB backend can recognize (convert) DuckDBPyConnection as an ExecutionEngine, and DuckDBPyRelation as a DataFrame. See

def _register_annotation_converters() -> None:

@rdmolony
Copy link
Collaborator Author

Thanks @goodwanghan & @kvnkho

I'm happy for my particular job to be duckdb only so

from duckdb import DuckDBPyRelation, DuckDBPyConnection

def read_text_file(engine:DuckDBPyConnection, filepath: str) -> DuckDBPyRelation:
    headers = read_header(filepath)
    return engine.from_csv_auto(...)

is a good fit :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants